Niedawno MongoDB wydał nową funkcję, począwszy od wersji 3.6, Change Streams. Daje to natychmiastowy dostęp do danych, co pomaga być na bieżąco ze zmianami danych. W dzisiejszym świecie każdy chce otrzymywać natychmiastowe powiadomienia, zamiast otrzymywać je po kilku godzinach lub minutach. W przypadku niektórych aplikacji bardzo ważne jest wysyłanie powiadomień w czasie rzeczywistym do wszystkich subskrybowanych użytkowników o każdej aktualizacji. MongoDB bardzo ułatwił ten proces, wprowadzając tę funkcję. W tym artykule dowiemy się o strumieniu zmian MongoDB i jego zastosowaniach na kilku przykładach.
Definiowanie strumieni zmian
Strumienie zmian to nic innego jak strumień w czasie rzeczywistym wszelkich zmian zachodzących w bazie danych lub kolekcji, a nawet we wdrożeniach. Na przykład za każdym razem, gdy w określonej kolekcji wystąpi jakakolwiek aktualizacja (wstaw, aktualizuj lub usuń), MongoDB wyzwala zdarzenie zmiany ze wszystkimi danymi, które zostały zmodyfikowane.
Strumienie zmian można zdefiniować w dowolnej kolekcji, podobnie jak inne normalne operatory agregacji, za pomocą operatora $changeStream i metody watch(). Możesz także zdefiniować strumień zmian za pomocą metody MongoCollection.watch().
Przykład
db.myCollection.watch()
Zmień funkcje strumieni
-
Filtrowanie zmian
Możesz filtrować zmiany, aby otrzymywać powiadomienia o zdarzeniach tylko dla wybranych danych.
Przykład:
pipeline = [ { $match: { name: "Bob" } } ]; changeStream = collection.watch(pipeline);
Ten kod zapewni, że będziesz otrzymywać aktualizacje tylko dla rekordów, których imię jest równe Bobowi. W ten sposób możesz napisać dowolne potoki do filtrowania strumieni zmian.
-
Wznawianie strumieni zmian
Ta funkcja zapewnia brak utraty danych w przypadku jakichkolwiek awarii. Każda odpowiedź w strumieniu zawiera token wznowienia, którego można użyć do ponownego uruchomienia strumienia z określonego punktu. W przypadku niektórych częstych awarii sieci sterownik mongodb będzie próbował ponownie nawiązać połączenie z subskrybentami przy użyciu najnowszego tokena wznowienia. Chociaż w przypadku całkowitej awarii aplikacji, token wznowienia powinien być utrzymywany przez klientów, aby wznowić strumień.
-
Zamówione strumienie zmian
MongoDB używa globalnego zegara logicznego do porządkowania wszystkich zdarzeń strumienia zmian we wszystkich replikach i fragmentach dowolnego klastra, dzięki czemu odbiorca zawsze otrzyma powiadomienia w tej samej kolejności, w jakiej polecenia zostały zastosowane w bazie danych.
-
Wydarzenia z pełnymi dokumentami
MongoDB domyślnie zwraca część pasujących dokumentów. Możesz jednak zmodyfikować konfigurację strumienia zmian, aby otrzymać pełny dokument. Aby to zrobić, przekaż { fullDocument:“updateLookup”} do metody oglądania.
Przykład:collection = db.collection("myColl") changeStream = collection.watch({ fullDocument: “updateLookup”})
-
Trwałość
Strumienie zmian będą powiadamiać tylko o danych, które są zatwierdzone do większości replik. Zapewni to, że zdarzenia są generowane przez większość danych trwałości, zapewniając trwałość wiadomości.
-
Bezpieczeństwo/Kontrola dostępu
Strumienie zmian są bardzo bezpieczne. Użytkownicy mogą tworzyć strumienie zmian tylko w kolekcjach, do których mają uprawnienia do odczytu. Możesz tworzyć strumienie zmian w oparciu o role użytkowników.
Przykład strumieni zmian
W tym przykładzie utworzymy strumienie zmian w kolekcji Stock, aby otrzymywać powiadomienia, gdy jakakolwiek cena akcji przekroczy dowolny próg.
-
Skonfiguruj klaster
Aby korzystać ze strumieni zmian, musimy najpierw stworzyć zestaw replik. Uruchom następujące polecenie, aby utworzyć zestaw replik pojedynczego węzła.
mongod --dbpath ./data --replSet “rs”
-
Wstaw kilka rekordów w kolekcji Giełda
var docs = [ { ticker: "AAPL", price: 210 }, { ticker: "AAPL", price: 260 }, { ticker: "AAPL", price: 245 }, { ticker: "AAPL", price: 255 }, { ticker: "AAPL", price: 270 } ]; db.Stocks.insert(docs)
-
Skonfiguruj środowisko węzła i zainstaluj zależności
mkdir mongo-proj && cd mongo-proj npm init -y npm install mongodb --save
-
Zasubskrybuj zmiany
Utwórz jeden plik index.js i umieść w nim następujący kod.
const mongo = require("mongodb").MongoClient; mongo.connect("mongodb://localhost:27017/?replicaSet=rs0").then(client => { console.log("Connected to MongoDB server"); // Select DB and Collection const db = client.db("mydb"); const collection = db.collection("Stocks"); pipeline = [ { $match: { "fullDocument.price": { $gte: 250 } } } ]; // Define change stream const changeStream = collection.watch(pipeline); // start listen to changes changeStream.on("change", function(event) { console.log(JSON.stringify(event)); }); });
Teraz uruchom ten plik:
node index.js
-
Wstaw nowy rekord w db, aby otrzymać aktualizację
db.Stocks.insert({ ticker: “AAPL”, price: 280 })
Teraz sprawdź swoją konsolę, otrzymasz aktualizację z MongoDB.
Przykładowa odpowiedź:{ "_id":{ "_data":"825C5D51F70000000129295A1004E83608EE8F1B4FBABDCEE73D5BF31FC946645F696400645C5D51F73ACA83479B48DE6E0004"}, "operationType":"insert", "clusterTime":"6655565945622233089", "fullDocument":{ "_id":"5c5d51f73aca83479b48de6e", "ticker":"AAPL", "Price":300 }, "ns":{"db":"mydb","coll":"Stocks"}, "documentKey":{"_id":"5c5d51f73aca83479b48de6e"} }
Tutaj możesz zmienić wartość parametru operationType za pomocą następujących operacji, aby nasłuchiwać różnych typów zmian w kolekcji:
- Wstaw
- Zamień (z wyjątkiem unikalnego identyfikatora)
- Aktualizacja
- Usuń
- Unieważnij (za każdym razem, gdy Mongo zwraca nieprawidłowy kursor)
Inne tryby strumieni zmian
Strumienie zmian można rozpocząć w bazie danych i wdrażaniu w taki sam sposób, jak w przypadku kolekcji. Ta funkcja została wydana z MongoDB w wersji 4.0. Oto polecenia umożliwiające otwarcie strumienia zmian w bazie danych i wdrożeniach.
Against DB: db.watch()
Against deployment: Mongo.watch()
Wniosek
MongoDB Change Streams upraszcza integrację między frontendem i backendem w czasie rzeczywistym i bezproblemowo. Ta funkcja może pomóc w korzystaniu z MongoDB w modelu pubsub, dzięki czemu nie musisz już zarządzać wdrożeniami Kafka lub RabbitMQ. Jeśli Twoja aplikacja wymaga informacji w czasie rzeczywistym, musisz sprawdzić tę funkcję MongoDB. Mam nadzieję, że ten post pozwoli Ci zacząć korzystać ze strumieni zmian MongoDB.