Możesz także podzielić dokumenty u źródła, zasadniczo łącząc każdą wartość w tablicę wpisów według „typu” dla „wejścia” i „wyjścia”. Możesz to zrobić po prostu za pomocą $map
i $cond
aby wybrać pola, a następnie $unwind
tablicę, a następnie określ, które pole ponownie „zliczyć”, sprawdzając za pomocą $cond
:
collection.aggregate([
{ "$project": {
"dates": {
"$filter": {
"input": {
"$map": {
"input": [ "in", "out" ],
"as": "type",
"in": {
"type": "$$type",
"date": {
"$cond": {
"if": { "$eq": [ "$$type", "in" ] },
"then": "$inDate",
"else": "$outDate"
}
}
}
}
},
"as": "dates",
"cond": { "$ne": [ "$$dates.date", null ] }
}
}
}},
{ "$unwind": "$dates" },
{ "$group": {
"_id": {
"year": { "$year": "$dates.date" },
"month": { "$month": "$dates.date" },
"day": { "$dayOfMonth": "$dates.date" }
},
"countIn": {
"$sum": {
"$cond": {
"if": { "$eq": [ "$dates.type", "in" ] },
"then": 1,
"else": 0
}
}
},
"countOut": {
"$sum": {
"$cond": {
"if": { "$eq": [ "$dates.type", "out" ] },
"then": 1,
"else": 0
}
}
}
}}
])
Jest to bezpieczny sposób na zrobienie tego, który nie grozi przekroczeniem limitu BSON, bez względu na rozmiar przesyłanych danych.
Osobiście wolałbym uruchamiać jako oddzielne procesy i „łączyć” zagregowane wyniki osobno, ale to zależy od środowiska, w którym pracujesz, o czym nie wspomniano w pytaniu.
Jako przykład wykonania "równoległego" możesz utworzyć strukturę w Meteorze w następujący sposób:
import { Meteor } from 'meteor/meteor';
import { Source } from '../imports/source';
import { Target } from '../imports/target';
Meteor.startup(async () => {
// code to run on server at startup
await Source.remove({});
await Target.remove({});
console.log('Removed');
Source.insert({
"_id" : "XBpNKbdGSgGfnC2MJ",
"po" : 72134185,
"machine" : 40940,
"location" : "02A01",
"inDate" : new Date("2017-07-19T06:10:13.059Z"),
"requestDate" : new Date("2017-07-19T06:17:04.901Z"),
"outDate" : new Date("2017-07-19T06:30:34Z")
});
console.log('Inserted');
await Promise.all(
["In","Out"].map( f => new Promise((resolve,reject) => {
let cursor = Source.rawCollection().aggregate([
{ "$match": { [`${f.toLowerCase()}Date`]: { "$exists": true } } },
{ "$group": {
"_id": {
"year": { "$year": `$${f.toLowerCase()}Date` },
"month": { "$month": `$${f.toLowerCase()}Date` },
"day": { "$dayOfYear": `$${f.toLowerCase()}Date` }
},
[`count${f}`]: { "$sum": 1 }
}}
]);
cursor.on('data', async (data) => {
cursor.pause();
data.date = data._id;
delete data._id;
await Target.upsert(
{ date: data.date },
{ "$set": data }
);
cursor.resume();
});
cursor.on('end', () => resolve('done'));
cursor.on('error', (err) => reject(err));
}))
);
console.log('Mapped');
let targets = await Target.find().fetch();
console.log(targets);
});
Który zasadniczo będzie wyprowadzany do kolekcji docelowej, jak wspomniano w komentarzach takich jak:
{
"_id" : "XdPGMkY24AcvTnKq7",
"date" : {
"year" : 2017,
"month" : 7,
"day" : 200
},
"countIn" : 1,
"countOut" : 1
}