Możesz to zrobić za pomocą fast-csv, pobierając headers
z definicji schematu, która zwróci przeanalizowane linie jako „obiekty”. Właściwie masz pewne niedopasowania, więc oznaczyłem je poprawkami:
const fs = require('mz/fs');
const csv = require('fast-csv');
const { Schema } = mongoose = require('mongoose');
const uri = 'mongodb://localhost/test';
mongoose.Promise = global.Promise;
mongoose.set('debug', true);
const rankSchema = new Schema({
serverid: Number,
resetid: Number,
rank: Number,
name: String,
land: String, // <-- You have this as Number but it's a string
networth: Number,
tag: String,
stuff: String, // the empty field in the csv
gov: String,
gdi: Number,
protection: Number,
vacation: Number,
alive: Number,
deleted: Number
});
const Rank = mongoose.model('Rank', rankSchema);
const log = data => console.log(JSON.stringify(data, undefined, 2));
(async function() {
try {
const conn = await mongoose.connect(uri);
await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
console.log(headers);
await new Promise((resolve,reject) => {
let buffer = [],
counter = 0;
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }))
.on("error", reject)
.on("data", async doc => {
stream.pause();
buffer.push(doc);
counter++;
log(doc);
try {
if ( counter > 10000 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
} catch(e) {
stream.destroy(e);
}
stream.resume();
})
.on("end", async () => {
try {
if ( counter > 0 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
resolve();
}
} catch(e) {
stream.destroy(e);
}
});
});
} catch(e) {
console.error(e)
} finally {
process.exit()
}
})()
Tak długo, jak schemat faktycznie zgadza się z dostarczonym plikiem CSV, jest w porządku. To są poprawki, które widzę, ale jeśli potrzebujesz wyrównania rzeczywistych nazw pól, musisz to zmienić. Ale w zasadzie był Number
w miejscu, w którym znajduje się String
i zasadniczo dodatkowe pole, które, jak zakładam, jest puste w pliku CSV.
Ogólne rzeczy to pobranie tablicy nazw pól ze schematu i przekazanie jej do opcji podczas tworzenia instancji parsera csv:
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }))
Gdy już to zrobisz, otrzymasz z powrotem „Obiekt” zamiast tablicy:
{
"serverid": "9",
"resetid": "1557",
"rank": "358",
"name": "286",
"land": "Mutantville",
"networth": "4368",
"tag": "2358026",
"stuff": "",
"gov": "M",
"gdi": "0",
"protection": "0",
"vacation": "0",
"alive": "1",
"deleted": "0"
}
Nie martw się o „typy”, ponieważ Mongoose będzie rzutować wartości zgodnie ze schematem.
Reszta dzieje się w ramach obsługi dla data
wydarzenie. Dla maksymalnej wydajności używamy insertMany()
pisać do bazy danych tylko raz na 10 000 wierszy. Sposób, w jaki to faktycznie trafia do serwera i procesów, zależy od wersji MongoDB, ale 10 000 powinno być całkiem rozsądne w oparciu o średnią liczbę pól, które można zaimportować dla pojedynczej kolekcji pod względem „kompromisu” na użycie pamięci i zapisanie rozsądne żądanie sieciowe. W razie potrzeby zmniejsz liczbę.
Ważną częścią jest oznaczenie tych wywołań jako async
funkcje i await
wynik insertMany()
przed kontynuowaniem. Również musimy pause()
strumień i resume()
na każdym elemencie, w przeciwnym razie ryzykujemy nadpisanie buffer
dokumentów do wstawienia przed ich faktycznym wysłaniem. pause()
i resume()
są konieczne, aby umieścić „przeciwciśnienie” na rurze, w przeciwnym razie elementy po prostu „wychodzą” i uruchamiają data
wydarzenie.
Oczywiście kontrola 10 000 wpisów wymaga sprawdzenia tego zarówno w każdej iteracji, jak i po zakończeniu strumienia w celu opróżnienia bufora i wysłania pozostałych dokumentów na serwer.
To jest naprawdę to, co chcesz zrobić, ponieważ z pewnością nie chcesz uruchamiać żądania asynchronicznego do serwera zarówno w "każdej" iteracji przez data
zdarzenia lub zasadniczo bez czekania na zakończenie każdego żądania. Unikniesz niesprawdzania tego dla „bardzo małych plików”, ale dla każdego rzeczywistego obciążenia na pewno przekroczysz stos wywołań z powodu wywołań asynchronicznych „w locie”, które jeszcze się nie zakończyły.
FYI - package.json
używany. mz
jest opcjonalne, ponieważ jest to tylko zmodernizowana Promise
włączona biblioteka standardowych "wbudowanych" bibliotek, do których jestem po prostu przyzwyczajony. Kod jest oczywiście całkowicie wymienny z fs
moduł.
{
"description": "",
"main": "index.js",
"dependencies": {
"fast-csv": "^2.4.1",
"mongoose": "^5.1.1",
"mz": "^2.7.0"
},
"keywords": [],
"author": "",
"license": "ISC"
}
Właściwie w Node v8.9.x i nowszych możemy to nawet znacznie uprościć dzięki implementacji AsyncIterator
przez stream-to-iterator
moduł. Nadal jest w Iterator<Promise<T>>
tryb, ale powinno wystarczyć, dopóki Node v10.x nie stanie się stabilnym LTS:
const fs = require('mz/fs');
const csv = require('fast-csv');
const streamToIterator = require('stream-to-iterator');
const { Schema } = mongoose = require('mongoose');
const uri = 'mongodb://localhost/test';
mongoose.Promise = global.Promise;
mongoose.set('debug', true);
const rankSchema = new Schema({
serverid: Number,
resetid: Number,
rank: Number,
name: String,
land: String,
networth: Number,
tag: String,
stuff: String, // the empty field
gov: String,
gdi: Number,
protection: Number,
vacation: Number,
alive: Number,
deleted: Number
});
const Rank = mongoose.model('Rank', rankSchema);
const log = data => console.log(JSON.stringify(data, undefined, 2));
(async function() {
try {
const conn = await mongoose.connect(uri);
await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
//console.log(headers);
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }));
const iterator = await streamToIterator(stream).init();
let buffer = [],
counter = 0;
for ( let docPromise of iterator ) {
let doc = await docPromise;
buffer.push(doc);
counter++;
if ( counter > 10000 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
}
if ( counter > 0 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
} catch(e) {
console.error(e)
} finally {
process.exit()
}
})()
Zasadniczo cała obsługa „zdarzeń” strumienia, wstrzymywanie i wznawianie zostaje zastąpiona prostym for
pętla:
const iterator = await streamToIterator(stream).init();
for ( let docPromise of iterator ) {
let doc = await docPromise;
// ... The things in the loop
}
Łatwo! Zostanie to wyczyszczone w późniejszej implementacji węzła za pomocą for..await..of
kiedy stanie się bardziej stabilny. Ale powyższe działa dobrze w określonej wersji i nowszych.