Znalazłem rozwiązanie.Ponieważ nie mogłem znaleźć odpowiedniego sterownika Mongo do Strumienia Strukturyzowanego, pracowałem nad innym rozwiązaniem.Teraz używam bezpośredniego połączenia z mongoDb i używam "foreach(...)" zamiast foreachbatch(. ...). Mój kod wygląda tak w pliku testSpark.py:
....
import pymongo
from pymongo import MongoClient
local_url = "mongodb://localhost:27017"
def write_machine_df_mongo(target_df):
cluster = MongoClient(local_url)
db = cluster["test_db"]
collection = db.test1
post = {
"machine_id": target_df.machine_id,
"proc_type": target_df.proc_type,
"sensor1_id": target_df.sensor1_id,
"sensor2_id": target_df.sensor2_id,
"time": target_df.time,
"sensor1_val": target_df.sensor1_val,
"sensor2_val": target_df.sensor2_val,
}
collection.insert_one(post)
machine_df.writeStream\
.outputMode("append")\
.foreach(write_machine_df_mongo)\
.start()