MongoDB
 sql >> Baza danych >  >> NoSQL >> MongoDB

Sink Kafka Stream do MongoDB za pomocą PySpark Structured Streaming

Znalazłem rozwiązanie.Ponieważ nie mogłem znaleźć odpowiedniego sterownika Mongo do Strumienia Strukturyzowanego, pracowałem nad innym rozwiązaniem.Teraz używam bezpośredniego połączenia z mongoDb i używam "foreach(...)" zamiast foreachbatch(. ...). Mój kod wygląda tak w pliku testSpark.py:

....
import pymongo
from pymongo import MongoClient

local_url = "mongodb://localhost:27017"


def write_machine_df_mongo(target_df):

    cluster = MongoClient(local_url)
    db = cluster["test_db"]
    collection = db.test1

    post = {
            "machine_id": target_df.machine_id,
            "proc_type": target_df.proc_type,
            "sensor1_id": target_df.sensor1_id,
            "sensor2_id": target_df.sensor2_id,
            "time": target_df.time,
            "sensor1_val": target_df.sensor1_val,
            "sensor2_val": target_df.sensor2_val,
            }

    collection.insert_one(post)

machine_df.writeStream\
    .outputMode("append")\
    .foreach(write_machine_df_mongo)\
    .start()



  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. Wzrost wydajności dzięki zastosowaniu wstawek zbiorczych w porównaniu ze zwykłymi wstawkami w MongoDB

  2. Jaka jest maksymalna głębokość osadzonych dokumentów dozwolona w MongoDb?

  3. Parse.com dodaje obiekt JSON do tablicy JSON

  4. MongoDB $ne Operator potoku agregacji

  5. MongoDB C# - Pobieranie BsonDocument dla elementu, który nie istnieje