Witamy w streamingu. To, czego naprawdę potrzebujesz, to „strumień zdarzeń”, który przetwarza dane wejściowe „po jednym kawałku na raz”, i oczywiście najlepiej za pomocą wspólnego ogranicznika, takiego jak znak „nowej linii”, którego obecnie używasz.
Aby uzyskać naprawdę wydajne rzeczy, możesz dodać użycie MongoDB "Bulk API" wstawek, aby ładowanie było tak szybkie, jak to możliwe, bez zużywania całej pamięci maszyny lub cykli procesora.
Nie popieram, ponieważ dostępne są różne rozwiązania, ale oto lista, która wykorzystuje line- pakiet input-stream aby uprościć część "zakończenia linii".
Definicje schematów tylko według „przykładu”:
var LineInputStream = require("line-input-stream"),
fs = require("fs"),
async = require("async"),
mongoose = require("mongoose"),
Schema = mongoose.Schema;
var entrySchema = new Schema({},{ strict: false })
var Entry = mongoose.model( "Schema", entrySchema );
var stream = LineInputStream(fs.createReadStream("data.txt",{ flags: "r" }));
stream.setDelimiter("\n");
mongoose.connection.on("open",function(err,conn) {
// lower level method, needs connection
var bulk = Entry.collection.initializeOrderedBulkOp();
var counter = 0;
stream.on("error",function(err) {
console.log(err); // or otherwise deal with it
});
stream.on("line",function(line) {
async.series(
[
function(callback) {
var row = line.split(","); // split the lines on delimiter
var obj = {};
// other manipulation
bulk.insert(obj); // Bulk is okay if you don't need schema
// defaults. Or can just set them.
counter++;
if ( counter % 1000 == 0 ) {
stream.pause();
bulk.execute(function(err,result) {
if (err) callback(err);
// possibly do something with result
bulk = Entry.collection.initializeOrderedBulkOp();
stream.resume();
callback();
});
} else {
callback();
}
}
],
function (err) {
// each iteration is done
}
);
});
stream.on("end",function() {
if ( counter % 1000 != 0 )
bulk.execute(function(err,result) {
if (err) throw err; // or something
// maybe look at result
});
});
});
Ogólnie rzecz biorąc, interfejs „strumienia” tam „rozbija dane wejściowe”, aby przetwarzać „jedną linię na raz”. To powstrzymuje Cię przed załadowaniem wszystkiego na raz.
Główne części to "Interfejs API operacji zbiorczych" z MongoDB. Pozwala to na „kolejkowanie” wielu operacji naraz przed faktycznym wysłaniem na serwer. Czyli w tym przypadku przy użyciu „modulo” zapisy są wysyłane tylko na 1000 przetworzonych wpisów. Naprawdę możesz zrobić wszystko, aż do limitu 16 MB BSON, ale zachowaj możliwość zarządzania.
Oprócz operacji przetwarzanych zbiorczo istnieje dodatkowy „ogranicznik” z async biblioteka. Nie jest to naprawdę wymagane, ale zapewnia to, że zasadniczo nie jest w trakcie przetwarzania więcej niż „limit modulo” dokumentów. Ogólne „wkładki” wsadowe nie mają żadnych kosztów we/wy innych niż pamięć, ale wywołania „wykonywania” oznaczają, że operacja we/wy jest przetwarzana. Więc raczej czekamy, niż ustawiamy w kolejce więcej rzeczy.
Z pewnością istnieją lepsze rozwiązania, które można znaleźć dla danych typu CSV „przetwarzania strumieniowego”, które wydaje się być. Ale ogólnie daje to koncepcje, jak to zrobić w sposób wydajny pod względem pamięci, bez jedzenia cykli procesora.