Operacje agregacji w MongoDB umożliwiają przetwarzanie rekordów danych, grupowanie ich i zwracanie obliczonych wyników. MongoDB obsługuje trzy rodzaje operacji agregacji:
- Polecenia agregacji jednego celu
- Map-Reduce
- Potok agregacji
Możesz użyć tego dokumentu porównawczego MongoDB, aby zobaczyć, który odpowiada Twoim potrzebom.
Potok agregacji
Potok agregacji to platforma MongoDB, która umożliwia agregację danych za pośrednictwem potoku przetwarzania danych. Oznacza to, że dokumenty są wysyłane wieloetapowym potokiem, filtrując, grupując i w inny sposób przekształcając dokumenty na każdym kroku. Zapewnia SQL „GROUP BY…”. typ konstrukcji dla MongoDB, które działają w samej bazie danych. Dokumentacja agregacji dostarcza użytecznych przykładów tworzenia takich potoków.
Po co uruchamiać agregacje na serwerze pomocniczym?
Potoki agregacji to operacje intensywnie korzystające z zasobów — sensowne jest odciążanie zadań agregacji do elementów pomocniczych zestawu replik MongoDB, gdy można operować na nieco przestarzałych danych. Jest to zwykle prawdziwe w przypadku operacji „wsadowych”, ponieważ nie oczekuje się, że będą działać na najnowszych danych. Jeśli dane wyjściowe muszą zostać zapisane w kolekcji, zadania agregacji działają tylko na podstawowym, ponieważ tylko podstawowy jest zapisywalny w MongoDB.
W tym poście pokażemy, jak upewnić się, że potoki agregacji są wykonywane na serwerze pomocniczym zarówno z powłoki mongo, jak i Javy.
Uruchom potoki agregacji na serwerze pomocniczym z Mongo Shell i Java w MongoDBCKliknij, aby tweetowaćUwaga:używamy przykładowego zestawu danych dostarczonego przez MongoDB w ich przykładzie agregacji kodów pocztowych, aby zaprezentować nasze przykłady. Możesz go pobrać zgodnie z instrukcjami w przykładzie.
Potok agregacji w zestawach replik
Powłoka MongoDB
Ustawienie preferencji odczytu na dodatkowe wykonuje tę sztuczkę podczas uruchamiania zadania agregacji z powłoki mongo. Spróbujmy pobrać cały stan z populacją większą niż 10 milionów (pierwsza agregacja w przykładzie z kodami pocztowymi). Zarówno powłoka, jak i serwer działają pod kontrolą MongoDB w wersji 3.2.10.
mongo -u admin -p <pwd> --authenticationDatabase admin --host RS-repl0-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017 RS-repl0-0:PRIMARY> use test switched to db test RS-repl0-0:PRIMARY> db.setSlaveOk() // Ok to run commands on a slave RS-repl0-0:PRIMARY> db.getMongo().setReadPref('secondary') // Set read pref RS-repl0-0:PRIMARY> db.getMongo().getReadPrefMode() secondary RS-repl0-0:PRIMARY> db.zips.aggregate( [ ... { $group: { _id: "$state", totalPop: { $sum: "$pop" } } }, ... { $match: { totalPop: { $gte: 10*1000*1000 } } } ... ] ) { "_id" : "CA", "totalPop" : 29754890 } { "_id" : "FL", "totalPop" : 12686644 } { "_id" : "PA", "totalPop" : 11881643 } { "_id" : "NY", "totalPop" : 17990402 } { "_id" : "OH", "totalPop" : 10846517 } { "_id" : "IL", "totalPop" : 11427576 } { "_id" : "TX", "totalPop" : 16984601 }
Spojrzenie na dzienniki MongoDB (z włączonym logowaniem dla poleceń) na serwerze pomocniczym pokazuje, że agregacja rzeczywiście działała na serwerze pomocniczym:
... 2016-12-05T06:20:14.783+0000 I COMMAND [conn200] command test.zips command: aggregate { aggregate: "zips", pipeline: [ { $group: { _id: "$state", totalPop: { $sum: "$pop" } } }, { $match: { totalPop: { $gte: 10000000.0 } } } ], cursor: {} } keyUpdates:0 writeConflicts:0 numYields:229 reslen:338 locks:{ Global: { acquireCount: { r: 466 } }, Database: { acquire Count: { r: 233 } }, Collection: { acquireCount: { r: 233 } } } protocol:op_command 49ms ...
Jawa
W sterowniku MongoDB Java, ponowne ustawienie odczytu Preferencji załatwia sprawę. Oto przykład z użyciem sterownika w wersji 3.2.2:
public class AggregationChecker { /* * Data and code inspired from: * https://docs.mongodb.com/v3.2/tutorial/aggregation-zip-code-data-set/#return-states-with-populations-above-10-million */ private static final String MONGO_END_POINT = "mongodb://admin:[email protected]:27017,server-2.servers.example.com:27017/admin?replicaSet=RS-repl0-0"; private static final String COL_NAME = "zips"; private static final String DEF_DB = "test"; public AggregationChecker() { } public static void main(String[] args) { AggregationChecker writer = new AggregationChecker(); writer.aggregationJob(); } private void aggregationJob() { printer("Initializing..."); Builder options = MongoClientOptions.builder().readPreference(ReadPreference.secondary()); MongoClientURI uri = new MongoClientURI(MONGO_END_POINT, options); MongoClient client = new MongoClient(uri); try { final DB db = client.getDB(DEF_DB); final DBCollection coll = db.getCollection(COL_NAME); // Avg city pop by state: https://docs.mongodb.com/manual/tutorial/aggregation-zip-code-data-set/#return-average-city-population-by-state Iterable iterable = coll.aggregate( Arrays.asList( new BasicDBObject("$group", new BasicDBObject("_id", new BasicDBObject("state", "$state").append("city", "$city")).append("pop", new BasicDBObject("$sum", "$pop"))), new BasicDBObject("$group", new BasicDBObject("_id", "$_id.state").append("avgCityPop", new BasicDBObject("$avg", "$pop"))))).results(); for (DBObject entry : iterable) { printer(entry.toString()); } } finally { client.close(); } printer("Done..."); } ... }
Logi w drugim:
... 2016-12-01T10:54:18.667+0000 I COMMAND [conn4113] command test.zips command: aggregate { aggregate: "zipcodes", pipeline: [ { $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } }, { $group: { _id: "$_id.state", avgCityPop: { $avg: "$pop" } } } ] } keyUpdates:0 writeConflicts:0 numYields:229 reslen:2149 locks:{ Global: { acquireCount: { r: 466 } }, Database: { acquireCount: { r: 233 } }, Collection: { acquireCount: { r: 233 } } } protocol:op_query 103ms ...
Żadna operacja nie została zarejestrowana na podstawowym.
Potok agregacji w klastrach podzielonych na fragmenty
Potoki agregacji są obsługiwane w klastrach podzielonych na fragmenty. Szczegółowe zachowanie jest wyjaśnione w dokumentacji. Pod względem implementacji istnieje niewielka różnica między zestawem replik a klastrem podzielonym na fragmenty podczas korzystania z potoku agregacji.
Jak skonfigurować potok agregacji w klastrach podzielonych na fragmenty w MongoDBCKliknij, aby tweetowaćPowłoka MongoDB
Przed zaimportowaniem danych do klastra podzielonego na fragmenty włącz fragmentowanie w kolekcji.
mongos> sh.enableSharding("test") mongos> sh.shardCollection("test.zips", { "_id" : "hashed" } )
Następnie operacje są takie same jak w przypadku zestawu replik:
mongos> db.setSlaveOk() mongos> db.getMongo().setReadPref('secondary') mongos> db.getMongo().getReadPrefMode() secondary mongos> db.zips.aggregate( [ ... { $group: { _id: "$state", totalPop: { $sum: "$pop" } } }, ... { $match: { totalPop: { $gte: 10*1000*1000 } } } ... ] ) { "_id" : "TX", "totalPop" : 16984601 } { "_id" : "PA", "totalPop" : 11881643 } { "_id" : "CA", "totalPop" : 29754890 } { "_id" : "FL", "totalPop" : 12686644 } { "_id" : "NY", "totalPop" : 17990402 } { "_id" : "OH", "totalPop" : 10846517 } { "_id" : "IL", "totalPop" : 11427576 }
Dzienniki z jednego z drugorzędnych:
... 2016-12-02T05:46:24.627+0000 I COMMAND [conn242] command test.zips command: aggregate { aggregate: "zips", pipeline: [ { $group: { _id: "$state", totalPop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } cursorid:44258973083 keyUpdates:0 writeConflicts:0 numYields:0 reslen:115 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 2 } }, Collection: { acquireCount: { r: 2 } } } protocol:op_query 0ms 2016-12-02T05:46:24.641+0000 I COMMAND [conn131] getmore test.zips query: { aggregate: "zips", pipeline: [ { $group: { _id: "$state", totalPop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } planSummary: PIPELINE_PROXY cursorid:44258973083 ntoreturn:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:112 nreturned:51 reslen:1601 locks:{ Global: { acquireCount: { r: 230 } }, Database: { acquireCount: { r: 115 } }, Collection: { acquireCount: { r: 115 } } } 13ms ...
Jawa
Ten sam kod, który ma zastosowanie w zestawie replik, działa dobrze z klastrem podzielonym na fragmenty. Wystarczy zastąpić parametry połączenia zestawu replik parametrami klastra podzielonego na fragmenty. Dzienniki z drugorzędnego wskazują, że zadanie rzeczywiście zostało uruchomione na drugorzędnych:
... 2016-12-02T05:39:12.339+0000 I COMMAND [conn130] command test.zips command: aggregate { aggregate: "zips", pipeline: [ { $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } cursorid:44228970872 keyUpdates:0 writeConflicts:0 numYields:0 reslen:115 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 2 } }, Collection: { acquireCount: { r: 2 } } } protocol:op_query 0ms 2016-12-02T05:39:12.371+0000 I COMMAND [conn131] getmore test.zips query: { aggregate: "zips", pipeline: [ { $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } planSummary: PIPELINE_PROXY cursorid:44228970872 ntoreturn:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:112 nreturned:12902 reslen:741403 locks:{ Global: { acquireCount: { r: 230 } }, Database: { acquireCount: { r: 115 } }, Collection: { acquireCount: { r: 115 } } } 30ms ...
Czy ta treść była pomocna? Daj nam znać, tweetując do nas @scaledgridio i jak zawsze, jeśli masz jakieś pytania, daj nam znać w komentarzach poniżej. O I! Nie zapomnij sprawdzić naszych produktów hostingowych MongoDB, które mogą zaoszczędzić do 40% na długoterminowych kosztach hostingu MongoDB®.