Nie jest to jeszcze opublikowane, ale w głównej gałęzi Alpakki MongoSource.apply
przyjmuje parametr typu:
object MongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
Dlatego wraz z nadchodzącym wydaniem 0.18 Alpakki, będziesz mógł wykonać następujące czynności:
val source: Source[TodoMongo, NotUsed] = MongoSource[TodoMongo](todoCollection.find())
Zwróć uwagę, że source
tutaj zakłada, że todoCollection.find()
zwraca Observable[TodoMongo]
; dostosuj typy według potrzeb.
W międzyczasie możesz po prostu dodać powyższy kod ręcznie. Na przykład:
package akka.stream.alpakka.mongodb.scaladsl
import akka.NotUsed
import akka.stream.alpakka.mongodb.ObservableToPublisher
import akka.stream.scaladsl.Source
import org.mongodb.scala.Observable
object MyMongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
Pamiętaj, że MyMongoSource
jest zdefiniowany jako rezydent akka.stream.alpakka.mongodb.scaladsl
pakiet (np. MongoSource
), ponieważ ObservableToPublisher
jest prywatną klasą pakietu. Użyjesz MyMongoSource
w taki sam sposób, w jaki używasz MongoSource
:
val source: Source[TodoMongo, NotUsed] = MyMongoSource[TodoMongo](todoCollection.find())