Zgodnie z błędem, masz już ciąg znaków (już zrobiłeś df.selectExpr("CAST(value AS STRING)")
), więc powinieneś spróbować uzyskać zdarzenie Row jako String
, a nie Array[Byte]
Zacznij od zmiany
val valueStr = new String(record.getAs[Array[Byte]]("value"))
do
val valueStr = record.getAs[String]("value")
Rozumiem, że możesz już mieć klaster do uruchamiania kodu Spark, ale sugerowałbym nadal sprawdzanie Złącze Kafka Connect Mongo do zlewu dzięki czemu nie musisz pisać i utrzymywać własnego edytora Mongo w kodzie Spark.
Możesz też zapisać również zestawy danych Spark bezpośrednio do mongo