Map-reduce jest prawdopodobnie najbardziej wszechstronną z operacji agregacji obsługiwanych przez MongoDB.
Map-Reduce to popularny model programowania wywodzący się z Google do równoległego przetwarzania i agregowania dużych ilości danych. Szczegółowa dyskusja na temat Map-Reduce jest poza zakresem tego artykułu, ale zasadniczo jest to wieloetapowy proces agregacji. Najważniejszymi dwoma krokami są etap mapy (przetwarzanie każdego dokumentu i publikowanie wyników) oraz etap redukcji (zestawianie wyników wyemitowanych na etapie mapy).
MongoDB obsługuje trzy rodzaje operacji agregacji:Map-Reduce, potok agregacji i polecenia agregacji jednego celu. Możesz użyć tego dokumentu porównawczego MongoDB, aby zobaczyć, który odpowiada Twoim potrzebom.https://scalegrid.io/blog/mongodb-performance-running-mongodb-map-reduce-operations-on-secondaries/
W moim ostatnim poście widzieliśmy, z przykładami, jak uruchomić potoki agregacji na drugorzędnych. W tym poście omówimy uruchamianie zadań Map-Reduce w replikach pomocniczych MongoDB.
MongoDB Map-Reduce
MongoDB obsługuje uruchamianie zadań Map-Reduce na serwerach baz danych. Zapewnia to elastyczność w pisaniu złożonych zadań agregacji, które nie są tak łatwe do wykonania za pomocą potoków agregacji. MongoDB pozwala na pisanie własnych map i redukcję funkcji w JavaScript, które mogą być przekazywane do bazy danych za pośrednictwem powłoki Mongo lub dowolnego innego klienta. W przypadku dużych i stale rosnących zestawów danych można nawet rozważyć uruchamianie przyrostowych zadań zmniejszania mapy, aby za każdym razem uniknąć przetwarzania starszych danych.
Historycznie mapa i metody zmniejszania były wykonywane w kontekście jednowątkowym. Jednak to ograniczenie zostało usunięte w wersji 2.4.
Dlaczego uruchamiać zadania Map-Reduce w drugorzędnym?
Podobnie jak inne zadania agregacji, Map-Reduce również jest zadaniem „wsadowym” intensywnie korzystającym z zasobów, więc dobrze nadaje się do uruchamiania na replikach tylko do odczytu. Zastrzeżenia w tym zakresie to:
1) Używanie nieco nieaktualnych danych powinno być w porządku. Możesz też dostosować problem dotyczący zapisu, aby upewnić się, że repliki są zawsze zsynchronizowane z podstawowym. Ta druga opcja zakłada, że obniżenie wydajności zapisu jest dopuszczalne.
2) Wynik zadania Map-Reduce nie powinien być zapisywany w innej kolekcji w bazie danych, ale raczej powinien zostać zwrócony do aplikacji (tj. bez zapisów w bazie danych).
Zobaczmy, jak to zrobić na przykładach, zarówno z powłoki mongo, jak i sterownika Java.
Map-Reduce w zestawach replik
Zbiór danych
Na przykład użyjemy dość prostego zestawu danych:codzienny zrzut rekordu transakcji od sprzedawcy. Przykładowy wpis wygląda następująco:
RS-replica-0:PRIMARY> use test switched to db test RS-replica-0:PRIMARY> show tables txns RS-replica-0:PRIMARY> db.txns.findOne() { "_id" : ObjectId("584a3b71cdc1cb061957289b"), "custid" : "cust_66", "txnval" : 100, "items" : [{"sku": sku1", "qty": 1, "pr": 100}, ...], ... }
W naszych przykładach obliczymy łączne wydatki danego klienta w tym dniu. W związku z tym, biorąc pod uwagę nasz schemat, mapa i metody zmniejszania będą wyglądać następująco:
var mapFunction = function() { emit(this.custid, this.txnval); } // Emit the custid and txn value from each record var reduceFunction = function(key, values) { return Array.sum(values); } // Sum all the txn values for a given custid
Po ustaleniu naszego schematu spójrzmy na działanie Map-Reduce.
Powłoka MongoDB
Aby upewnić się, że zadanie Map-Reduce jest wykonywane na drugorzędnym, preferencja odczytu powinna być ustawiona na dodatkowe . Jak powiedzieliśmy powyżej, aby funkcja Map-Reduce działała na drugorzędnym, wynik wyniku musi być inline (W rzeczywistości jest to jedyna dozwolona wartość na wtórnych). Zobaczmy, jak to działa.
$ mongo -u admin -p pwd --authenticationDatabase admin --host RS-replica-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017 MongoDB shell version: 3.2.10 connecting to: RS-replica-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017/test 2016-12-09T08:15:19.347+0000 I NETWORK [thread1] Starting new replica set monitor for server-1.servers.example.com:27017,server-2.servers.example.com:27017 2016-12-09T08:15:19.349+0000 I NETWORK [ReplicaSetMonitorWatcher] starting RS-replica-0:PRIMARY> db.setSlaveOk() RS-replica-0:PRIMARY> db.getMongo().setReadPref('secondary') RS-replica-0:PRIMARY> db.getMongo().getReadPrefMode() secondary RS-replica-0:PRIMARY> var mapFunc = function() { emit(this.custid, this.txnval); } RS-replica-0:PRIMARY> var reduceFunc = function(key, values) { return Array.sum(values); } RS-replica-0:PRIMARY> db.txns.mapReduce(mapFunc, reduceFunc, {out: { inline: 1 }}) { "results" : [ { "_id" : "cust_0", "value" : 72734 }, { "_id" : "cust_1", "value" : 67737 }, ... ] "timeMillis" : 215, "counts" : { "input" : 10000, "emit" : 10000, "reduce" : 909, "output" : 101 }, "ok" : 1 }
Rzut oka na dzienniki drugorzędne potwierdza, że zadanie rzeczywiście działało na drugorzędnym.
... 2016-12-09T08:17:24.842+0000 D COMMAND [conn344] mr ns: test.txns 2016-12-09T08:17:24.843+0000 I COMMAND [conn344] command test.$cmd command: listCollections { listCollections: 1, filter: { name: "txns" }, cursor: {} } keyUpdates:0 writeConflicts:0 numYields:0 reslen:150 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 1, R: 1 } }, Collection: { acquireCount: { r: 1 } } } protocol:op_query 0ms 2016-12-09T08:17:24.865+0000 I COMMAND [conn344] query test.system.js planSummary: EOF ntoreturn:0 ntoskip:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:0 nreturned:0 reslen:20 locks:{ Global: { acquireCount: { r: 6 } }, Database: { acquireCount: { r: 2, R: 1 } }, Collection: { acquireCount: { r: 2 } } } 0ms 2016-12-09T08:17:25.063+0000 I COMMAND [conn344] command test.txns command: mapReduce { mapreduce: "txns", map: function () { emit(this.custid, this.txnval); }, reduce: function (key, values) { return Array.sum(values); }, out: { inline: 1.0 } } planSummary: COUNT keyUpdates:0 writeConflicts:0 numYields:78 reslen:4233 locks:{ Global: { acquireCount: { r: 366 } }, Database: { acquireCount: { r: 3, R: 180 } }, Collection: { acquireCount: { r: 3 } } } protocol:op_command 220ms ...
Jawa
Teraz spróbujmy uruchomić zadanie Map-Reduce na replikach do odczytu z aplikacji Java. W sterowniku MongoDB Java ustawienie odczytu preferencji załatwia sprawę. Dane wyjściowe są domyślnie wbudowane, więc nie trzeba przekazywać żadnych dodatkowych parametrów. Oto przykład z użyciem sterownika w wersji 3.2.2:
public class MapReduceExample { private static final String MONGO_END_POINT = "mongodb://admin:[email protected]:27017,server-2.servers.example.com:27017/admin?replicaSet=RS-replica-0"; private static final String COL_NAME = "txns"; private static final String DEF_DB = "test"; public MapReduceExample() { } public static void main(String[] args) { MapReduceExample writer = new MapReduceExample(); writer.mapReduce(); } public static final String mapfunction = "function() { emit(this.custid, this.txnval); }"; public static final String reducefunction = "function(key, values) { return Array.sum(values); }"; private void mapReduce() { printer("Initializing..."); Builder options = MongoClientOptions.builder().readPreference(ReadPreference.secondary()); MongoClientURI uri = new MongoClientURI(MONGO_END_POINT, options); MongoClient client = new MongoClient(uri); MongoDatabase database = client.getDatabase(DEF_DB); MongoCollection collection = database.getCollection(COL_NAME); MapReduceIterable iterable = collection.mapReduce(mapfunction, reducefunction); // inline by default MongoCursor cursor = iterable.iterator(); while (cursor.hasNext()) { Document result = cursor.next(); printer("Customer: " + result.getString("_id") + ", Total Txn value: " + result.getDouble("value")); } printer("Done..."); } ... }
Jak wynika z dzienników, zadanie zostało uruchomione na serwerze pomocniczym:
... 2016-12-09T08:32:31.419+0000 D COMMAND [conn371] mr ns: test.txns 2016-12-09T08:32:31.420+0000 I COMMAND [conn371] command test.$cmd command: listCollections { listCollections: 1, filter: { name: "txns" }, cursor: {} } keyUpdates:0 writeConflicts:0 numYields:0 reslen:150 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 1, R: 1 } }, Collection: { acquireCount: { r: 1 } } } protocol:op_query 0ms 2016-12-09T08:32:31.444+0000 I COMMAND [conn371] query test.system.js planSummary: EOF ntoreturn:0 ntoskip:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:0 nreturned:0 reslen:20 locks:{ Global: { acquireCount: { r: 6 } }, Database: { acquireCount: { r: 2, R: 1 } }, Collection: { acquireCount: { r: 2 } } } 0ms 2016-12-09T08:32:31.890+0000 I COMMAND [conn371] command test.txns command: mapReduce { mapreduce: "txns", map: function() { emit(this.custid, this.txnval); }, reduce: function(key, values) { return Array.sum(values); }, out: { inline: 1 }, query: null, sort: null, finalize: null, scope: null, verbose: true } planSummary: COUNT keyUpdates:0 writeConflicts:0 numYields:156 reslen:4331 locks:{ Global: { acquireCount: { r: 722 } }, Database: { acquireCount: { r: 3, R: 358 } }, Collection: { acquireCount: { r: 3 } } } protocol:op_query 470ms ...
MongoDB Map-Reduce w klastrach podzielonych na fragmenty
MongoDB obsługuje funkcję Map-Reduce w klastrach podzielonych na fragmenty, zarówno gdy kolekcja podzielona na fragmenty jest danymi wejściowymi, jak i wyjściowymi zadania Map-Reduce. Jednak MongoDB obecnie nie obsługuje uruchamiania zadań z redukcją mapy na serwerach pomocniczych klastra podzielonego na fragmenty. Więc nawet jeśli opcja out jest ustawiony na inline , zadania Map-Reduce będą zawsze uruchamiane na elementach podstawowych klastra podzielonego na fragmenty. Ten problem jest śledzony przez ten błąd JIRA.
Składnia wykonywania zadania Map-Reduce w klastrze podzielonym na fragmenty jest taka sama, jak w zestawie replik. Tak więc przykłady podane w powyższej sekcji są aktualne. Jeśli powyższy przykład Java jest uruchamiany w klastrze podzielonym na fragmenty, komunikaty dziennika pojawiają się na elementach podstawowych, wskazując, że polecenie zostało tam uruchomione.
... 2016-11-24T08:46:30.828+0000 I COMMAND [conn357] command test.$cmd command: mapreduce.shardedfinish { mapreduce.shardedfinish: { mapreduce: "txns", map: function() { emit(this.custid, this.txnval); }, reduce: function(key, values) { return Array.sum(values); }, out: { in line: 1 }, query: null, sort: null, finalize: null, scope: null, verbose: true, $queryOptions: { $readPreference: { mode: "secondary" } } }, inputDB: "test", shardedOutputCollection: "tmp.mrs.txns_1479977190_0", shards: { Shard-0/primary.shard0.example.com:27017,secondary.shard0.example.com:27017: { result: "tmp.mrs.txns_1479977190_0", timeMillis: 123, timing: { mapTime: 51, emitLoop: 116, reduceTime: 9, mode: "mixed", total: 123 }, counts: { input: 9474, emit: 9474, reduce: 909, output: 101 }, ok: 1.0, $gleS tats: { lastOpTime: Timestamp 1479977190000|103, electionId: ObjectId('7fffffff0000000000000001') } }, Shard-1/primary.shard1.example.com:27017,secondary.shard1.example.com:27017: { result: "tmp.mrs.txns_1479977190_0", timeMillis: 71, timing: { mapTime: 8, emitLoop: 63, reduceTime: 4, mode: "mixed", total: 71 }, counts: { input: 1526, emit: 1526, reduce: 197, output: 101 }, ok: 1.0, $gleStats: { lastOpTime: Timestamp 1479977190000|103, electionId: ObjectId('7fffffff0000000000000001') } } }, shardCounts: { Sha rd-0/primary.shard0.example.com:27017,secondary.shard0.example.com:27017: { input: 9474, emit: 9474, reduce: 909, output: 101 }, Shard-1/primary.shard1.example.com:27017,secondary.shard1.example.com:27017: { inpu t: 1526, emit: 1526, reduce: 197, output: 101 } }, counts: { emit: 11000, input: 11000, output: 202, reduce: 1106 } } keyUpdates:0 writeConflicts:0 numYields:0 reslen:4368 locks:{ Global: { acquireCount: { r: 2 } }, Database: { acquireCount: { r: 1 } }, Collection: { acqu ireCount: { r: 1 } } } protocol:op_command 115ms 2016-11-24T08:46:30.830+0000 I COMMAND [conn46] CMD: drop test.tmp.mrs.txns_1479977190_0 ...
Odwiedź naszą stronę produktu MongoDB, aby dowiedzieć się o naszej obszernej liście funkcji.