MongoDB
 sql >> Baza danych >  >> NoSQL >> MongoDB

Strumieniowanie danych w czasie rzeczywistym za pomocą strumieni zmian MongoDB

Niedawno MongoDB wydał nową funkcję, począwszy od wersji 3.6, Change Streams. Daje to natychmiastowy dostęp do danych, co pomaga być na bieżąco ze zmianami danych. W dzisiejszym świecie każdy chce otrzymywać natychmiastowe powiadomienia, zamiast otrzymywać je po kilku godzinach lub minutach. W przypadku niektórych aplikacji bardzo ważne jest wysyłanie powiadomień w czasie rzeczywistym do wszystkich subskrybowanych użytkowników o każdej aktualizacji. MongoDB bardzo ułatwił ten proces, wprowadzając tę ​​funkcję. W tym artykule dowiemy się o strumieniu zmian MongoDB i jego zastosowaniach na kilku przykładach.

Definiowanie strumieni zmian

Strumienie zmian to nic innego jak strumień w czasie rzeczywistym wszelkich zmian zachodzących w bazie danych lub kolekcji, a nawet we wdrożeniach. Na przykład za każdym razem, gdy w określonej kolekcji wystąpi jakakolwiek aktualizacja (wstaw, aktualizuj lub usuń), MongoDB wyzwala zdarzenie zmiany ze wszystkimi danymi, które zostały zmodyfikowane.

Strumienie zmian można zdefiniować w dowolnej kolekcji, podobnie jak inne normalne operatory agregacji, za pomocą operatora $changeStream i metody watch(). Możesz także zdefiniować strumień zmian za pomocą metody MongoCollection.watch().

Przykład

db.myCollection.watch()

Zmień funkcje strumieni

  • Filtrowanie zmian

    Możesz filtrować zmiany, aby otrzymywać powiadomienia o zdarzeniach tylko dla wybranych danych.

    Przykład:

    pipeline = [
       {
         $match: { name: "Bob" }
       } ];
    changeStream = collection.watch(pipeline);

    Ten kod zapewni, że będziesz otrzymywać aktualizacje tylko dla rekordów, których imię jest równe Bobowi. W ten sposób możesz napisać dowolne potoki do filtrowania strumieni zmian.

  • Wznawianie strumieni zmian

    Ta funkcja zapewnia brak utraty danych w przypadku jakichkolwiek awarii. Każda odpowiedź w strumieniu zawiera token wznowienia, którego można użyć do ponownego uruchomienia strumienia z określonego punktu. W przypadku niektórych częstych awarii sieci sterownik mongodb będzie próbował ponownie nawiązać połączenie z subskrybentami przy użyciu najnowszego tokena wznowienia. Chociaż w przypadku całkowitej awarii aplikacji, token wznowienia powinien być utrzymywany przez klientów, aby wznowić strumień.

  • Zamówione strumienie zmian

    MongoDB używa globalnego zegara logicznego do porządkowania wszystkich zdarzeń strumienia zmian we wszystkich replikach i fragmentach dowolnego klastra, dzięki czemu odbiorca zawsze otrzyma powiadomienia w tej samej kolejności, w jakiej polecenia zostały zastosowane w bazie danych.

  • Wydarzenia z pełnymi dokumentami

    MongoDB domyślnie zwraca część pasujących dokumentów. Możesz jednak zmodyfikować konfigurację strumienia zmian, aby otrzymać pełny dokument. Aby to zrobić, przekaż { fullDocument:“updateLookup”} do metody oglądania.
    Przykład:

    collection = db.collection("myColl")
    changeStream = collection.watch({ fullDocument: “updateLookup”})
  • Trwałość

    Strumienie zmian będą powiadamiać tylko o danych, które są zatwierdzone do większości replik. Zapewni to, że zdarzenia są generowane przez większość danych trwałości, zapewniając trwałość wiadomości.

  • Bezpieczeństwo/Kontrola dostępu

    Strumienie zmian są bardzo bezpieczne. Użytkownicy mogą tworzyć strumienie zmian tylko w kolekcjach, do których mają uprawnienia do odczytu. Możesz tworzyć strumienie zmian w oparciu o role użytkowników.

Kilkadziesiąt — Zostań administratorem baz danych MongoDB — wprowadzenie MongoDB do produkcjiDowiedz się, co trzeba wiedzieć, aby wdrażać, monitorować, zarządzać i skalować MongoDB. Pobierz za darmo

Przykład strumieni zmian

W tym przykładzie utworzymy strumienie zmian w kolekcji Stock, aby otrzymywać powiadomienia, gdy jakakolwiek cena akcji przekroczy dowolny próg.

  • Skonfiguruj klaster

    Aby korzystać ze strumieni zmian, musimy najpierw stworzyć zestaw replik. Uruchom następujące polecenie, aby utworzyć zestaw replik pojedynczego węzła.

    mongod --dbpath ./data --replSet “rs”
  • Wstaw kilka rekordów w kolekcji Giełda

    var docs = [
     { ticker: "AAPL", price: 210 },
     { ticker: "AAPL", price: 260 },
     { ticker: "AAPL", price: 245 },
     { ticker: "AAPL", price: 255 },
     { ticker: "AAPL", price: 270 }
    ];
    db.Stocks.insert(docs)
  • Skonfiguruj środowisko węzła i zainstaluj zależności

    mkdir mongo-proj && cd mongo-proj
    npm init -y
    npm install mongodb --save
  • Zasubskrybuj zmiany

    Utwórz jeden plik index.js i umieść w nim następujący kod.

    const mongo = require("mongodb").MongoClient;
    mongo.connect("mongodb://localhost:27017/?replicaSet=rs0").then(client => {
     console.log("Connected to MongoDB server");
     // Select DB and Collection
     const db = client.db("mydb");
     const collection = db.collection("Stocks");
     pipeline = [
       {
         $match: { "fullDocument.price": { $gte: 250 } }
       }
     ];
     // Define change stream
     const changeStream = collection.watch(pipeline);
     // start listen to changes
     changeStream.on("change", function(event) {
       console.log(JSON.stringify(event));
     });
    });

    Teraz uruchom ten plik:

    node index.js
  • Wstaw nowy rekord w db, aby otrzymać aktualizację

    db.Stocks.insert({ ticker: “AAPL”, price: 280 })

    Teraz sprawdź swoją konsolę, otrzymasz aktualizację z MongoDB.
    Przykładowa odpowiedź:

    {
    "_id":{
    "_data":"825C5D51F70000000129295A1004E83608EE8F1B4FBABDCEE73D5BF31FC946645F696400645C5D51F73ACA83479B48DE6E0004"},
    "operationType":"insert",
    "clusterTime":"6655565945622233089",
    "fullDocument":{
    "_id":"5c5d51f73aca83479b48de6e",
    "ticker":"AAPL",
    "Price":300
    },
    "ns":{"db":"mydb","coll":"Stocks"},
    "documentKey":{"_id":"5c5d51f73aca83479b48de6e"}
    }

Tutaj możesz zmienić wartość parametru operationType za pomocą następujących operacji, aby nasłuchiwać różnych typów zmian w kolekcji:

  • Wstaw
  • Zamień (z wyjątkiem unikalnego identyfikatora)
  • Aktualizacja
  • Usuń
  • Unieważnij (za każdym razem, gdy Mongo zwraca nieprawidłowy kursor)

Inne tryby strumieni zmian

Strumienie zmian można rozpocząć w bazie danych i wdrażaniu w taki sam sposób, jak w przypadku kolekcji. Ta funkcja została wydana z MongoDB w wersji 4.0. Oto polecenia umożliwiające otwarcie strumienia zmian w bazie danych i wdrożeniach.

Against DB: db.watch()
Against deployment: Mongo.watch()

Wniosek

MongoDB Change Streams upraszcza integrację między frontendem i backendem w czasie rzeczywistym i bezproblemowo. Ta funkcja może pomóc w korzystaniu z MongoDB w modelu pubsub, dzięki czemu nie musisz już zarządzać wdrożeniami Kafka lub RabbitMQ. Jeśli Twoja aplikacja wymaga informacji w czasie rzeczywistym, musisz sprawdzić tę funkcję MongoDB. Mam nadzieję, że ten post pozwoli Ci zacząć korzystać ze strumieni zmian MongoDB.


  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. Automatyczny przyrost mangusty

  2. Jak dołączyć do zapytania w mongodb?

  3. Projektuj pierwszy element w tablicy do nowego pola (agregacja MongoDB)

  4. Jak mogę uzyskać dostęp do MongoDB Meteor z innego klienta, gdy Meteor jest uruchomiony?

  5. Meteor:Nieoczekiwany kod wyjścia mongo 100. Ponowne uruchomienie. Nie można uruchomić serwera mongo