Rozwiązanie 1:
def changeModelAndInsertToNewCollection(person:Person) : Future[Boolean] ={
//Todo : call mongo api to update the person
???
}
def processPeople()(implicit m: Materializer): Future[Done] = {
val numberOfConcurrentUpdate = 10
val peopleSource: Source[Person, Future[State]] =
collection
.find(json())
.cursor[Person]()
.documentSource()
peopleSource
.mapAsync(numberOfConcurrentUpdate)(changeModelAndInsertToNewCollection)
withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.runWith(Sink.ignore)}
Rozwiązanie 2: przy użyciu Alpakka jako złącze strumienia akka dla mongo
val source: Source[Document, NotUsed] =
MongoSource(collection.find(json()).cursor[Person]().documentSource())
source.runWith(MongoSink.updateOne(2, collection))