MongoDB
 sql >> Baza danych >  >> NoSQL >> MongoDB

Importuj plik CSV za pomocą schematu Mongoose

Możesz to zrobić za pomocą fast-csv, pobierając headers z definicji schematu, która zwróci przeanalizowane linie jako „obiekty”. Właściwie masz pewne niedopasowania, więc oznaczyłem je poprawkami:

const fs = require('mz/fs');
const csv = require('fast-csv');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,         // <-- You have this as Number but it's a string
  networth: Number,
  tag: String,
  stuff: String,        // the empty field in the csv
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    console.log(headers);

    await new Promise((resolve,reject) => {

      let buffer = [],
          counter = 0;

      let stream = fs.createReadStream('input.csv')
        .pipe(csv({ headers }))
        .on("error", reject)
        .on("data", async doc => {
          stream.pause();
          buffer.push(doc);
          counter++;
          log(doc);
          try {
            if ( counter > 10000 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
            }
          } catch(e) {
            stream.destroy(e);
          }

          stream.resume();

        })
        .on("end", async () => {
          try {
            if ( counter > 0 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
              resolve();
            }
          } catch(e) {
            stream.destroy(e);
          }
        });

    });


  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }


})()

Tak długo, jak schemat faktycznie zgadza się z dostarczonym plikiem CSV, jest w porządku. To są poprawki, które widzę, ale jeśli potrzebujesz wyrównania rzeczywistych nazw pól, musisz to zmienić. Ale w zasadzie był Number w miejscu, w którym znajduje się String i zasadniczo dodatkowe pole, które, jak zakładam, jest puste w pliku CSV.

Ogólne rzeczy to pobranie tablicy nazw pól ze schematu i przekazanie jej do opcji podczas tworzenia instancji parsera csv:

let headers = Object.keys(Rank.schema.paths)
  .filter(k => ['_id','__v'].indexOf(k) === -1);

let stream = fs.createReadStream('input.csv')
  .pipe(csv({ headers }))

Gdy już to zrobisz, otrzymasz z powrotem „Obiekt” zamiast tablicy:

{
  "serverid": "9",
  "resetid": "1557",
  "rank": "358",
  "name": "286",
  "land": "Mutantville",
  "networth": "4368",
  "tag": "2358026",
  "stuff": "",
  "gov": "M",
  "gdi": "0",
  "protection": "0",
  "vacation": "0",
  "alive": "1",
  "deleted": "0"
}

Nie martw się o „typy”, ponieważ Mongoose będzie rzutować wartości zgodnie ze schematem.

Reszta dzieje się w ramach obsługi dla data wydarzenie. Dla maksymalnej wydajności używamy insertMany() pisać do bazy danych tylko raz na 10 000 wierszy. Sposób, w jaki to faktycznie trafia do serwera i procesów, zależy od wersji MongoDB, ale 10 000 powinno być całkiem rozsądne w oparciu o średnią liczbę pól, które można zaimportować dla pojedynczej kolekcji pod względem „kompromisu” na użycie pamięci i zapisanie rozsądne żądanie sieciowe. W razie potrzeby zmniejsz liczbę.

Ważną częścią jest oznaczenie tych wywołań jako async funkcje i await wynik insertMany() przed kontynuowaniem. Również musimy pause() strumień i resume() na każdym elemencie, w przeciwnym razie ryzykujemy nadpisanie buffer dokumentów do wstawienia przed ich faktycznym wysłaniem. pause() i resume() są konieczne, aby umieścić „przeciwciśnienie” na rurze, w przeciwnym razie elementy po prostu „wychodzą” i uruchamiają data wydarzenie.

Oczywiście kontrola 10 000 wpisów wymaga sprawdzenia tego zarówno w każdej iteracji, jak i po zakończeniu strumienia w celu opróżnienia bufora i wysłania pozostałych dokumentów na serwer.

To jest naprawdę to, co chcesz zrobić, ponieważ z pewnością nie chcesz uruchamiać żądania asynchronicznego do serwera zarówno w "każdej" iteracji przez data zdarzenia lub zasadniczo bez czekania na zakończenie każdego żądania. Unikniesz niesprawdzania tego dla „bardzo małych plików”, ale dla każdego rzeczywistego obciążenia na pewno przekroczysz stos wywołań z powodu wywołań asynchronicznych „w locie”, które jeszcze się nie zakończyły.

FYI - package.json używany. mz jest opcjonalne, ponieważ jest to tylko zmodernizowana Promise włączona biblioteka standardowych "wbudowanych" bibliotek, do których jestem po prostu przyzwyczajony. Kod jest oczywiście całkowicie wymienny z fs moduł.

{
  "description": "",
  "main": "index.js",
  "dependencies": {
    "fast-csv": "^2.4.1",
    "mongoose": "^5.1.1",
    "mz": "^2.7.0"
  },
  "keywords": [],
  "author": "",
  "license": "ISC"
}

Właściwie w Node v8.9.x i nowszych możemy to nawet znacznie uprościć dzięki implementacji AsyncIterator przez stream-to-iterator moduł. Nadal jest w Iterator<Promise<T>> tryb, ale powinno wystarczyć, dopóki Node v10.x nie stanie się stabilnym LTS:

const fs = require('mz/fs');
const csv = require('fast-csv');
const streamToIterator = require('stream-to-iterator');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,
  networth: Number,
  tag: String,
  stuff: String,        // the empty field
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    //console.log(headers);

    let stream = fs.createReadStream('input.csv')
      .pipe(csv({ headers }));

    const iterator = await streamToIterator(stream).init();

    let buffer = [],
        counter = 0;

    for ( let docPromise of iterator ) {
      let doc = await docPromise;
      buffer.push(doc);
      counter++;

      if ( counter > 10000 ) {
        await Rank.insertMany(buffer);
        buffer = [];
        counter = 0;
      }
    }

    if ( counter > 0 ) {
      await Rank.insertMany(buffer);
      buffer = [];
      counter = 0;
    }

  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }

})()

Zasadniczo cała obsługa „zdarzeń” strumienia, wstrzymywanie i wznawianie zostaje zastąpiona prostym for pętla:

const iterator = await streamToIterator(stream).init();

for ( let docPromise of iterator ) {
  let doc = await docPromise;
  // ... The things in the loop
}

Łatwo! Zostanie to wyczyszczone w późniejszej implementacji węzła za pomocą for..await..of kiedy stanie się bardziej stabilny. Ale powyższe działa dobrze w określonej wersji i nowszych.



  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. Jak wyszukiwać dokumenty za pomocą pola _id w sterowniku Java mongodb?

  2. Przewodnik po wdrożeniu i utrzymaniu MongoDB za pomocą Puppet:część 1

  3. MongooseError [MongooseServerSelectionError]:połączenie <monitor> z 52.6.250.237:27017 zamknięte

  4. MongoDB:jak policzyć liczbę kluczy w dokumencie?

  5. Błąd przetwarzania JSON mongoexport