Ciężkie zastrzeżenie Nigdy wcześniej nie korzystałem z tej biblioteki, a moja słaba wiedza na temat niektórych pojęć jest trochę... brakująca. Przeważnie czytam samouczek. Jestem prawie pewien, że każdy, kto wykonał pracę asynchroniczną, przeczyta to i będzie się śmiać, ale może to być przydatny punkt wyjścia dla innych osób. Unikaj płatnika!
Zacznijmy od czegoś prostszego, pokazując, jak Stream
Pracuje. Możemy przekonwertować iterator Result
s do strumienia:
extern crate futures;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())];
let payloads = stream::iter(payloads.into_iter());
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
}
To pokazuje nam jeden sposób na wykorzystanie strumienia. Używamy and_then
zrobić coś z każdym ładunkiem (tutaj po prostu go wydrukować), a następnie for_each
przekonwertować Stream
z powrotem do Future
. Następnie możemy uruchomić przyszłość, wywołując dziwnie nazwany forget
metoda.
Następnie powiąż bibliotekę Redis z miksem, obsługując tylko jedną wiadomość. Ponieważ get_message()
metoda blokuje, musimy wprowadzić do miksu kilka wątków. Nie jest dobrym pomysłem wykonywanie dużej ilości pracy w tego typu systemie asynchronicznym, ponieważ wszystko inne zostanie zablokowane. Na przykład:
O ile nie ustalono inaczej, należy upewnić się, że wdrożenia tej funkcji kończą się bardzo szybko .
W idealnym świecie skrzynka redis byłaby zbudowana na bazie takiej biblioteki jak futures i eksponowałaby to wszystko natywnie.
extern crate redis;
extern crate futures;
use std::thread;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
let (tx, payloads) = stream::channel();
let redis_thread = thread::spawn(move || {
let msg = pubsub.get_message().expect("Unable to get message");
let payload: Result<String, _> = msg.get_payload();
tx.send(payload).forget();
});
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
redis_thread.join().expect("unable to join to thread");
}
Moje rozumienie staje się tutaj bardziej rozmyte. W osobnym wątku blokujemy wiadomość i wrzucamy ją do kanału, gdy ją otrzymamy. Nie rozumiem, dlaczego musimy trzymać się uchwytu wątku. Spodziewałbym się, że foo.forget
blokowałby się, czekając, aż strumień będzie pusty.
W połączeniu telnetowym z serwerem Redis wyślij to:
publish rust awesome
I zobaczysz, że to działa. Dodanie instrukcji print pokazuje, że (dla mnie) foo.forget
instrukcja jest uruchamiana przed pojawieniem się wątku.
Wiele wiadomości jest trudniejsze. Sender
zużywa się, aby zapobiec sytuacji, w której strona generująca wyprzedza zbytnio stronę zużywającą. Osiąga się to poprzez zwrócenie innej przyszłości z send
! Musimy przenieść go z powrotem, aby użyć go ponownie w następnej iteracji pętli:
extern crate redis;
extern crate futures;
use std::thread;
use std::sync::mpsc;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
let (tx, payloads) = stream::channel();
let redis_thread = thread::spawn(move || {
let mut tx = tx;
while let Ok(msg) = pubsub.get_message() {
let payload: Result<String, _> = msg.get_payload();
let (next_tx_tx, next_tx_rx) = mpsc::channel();
tx.send(payload).and_then(move |new_tx| {
next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
futures::finished(())
}).forget();
tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
}
});
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
redis_thread.join().expect("unable to join to thread");
}
Jestem pewien, że z biegiem czasu pojawi się więcej ekosystemu dla tego typu współdziałania. Na przykład skrzynka futures-cpupool może prawdopodobnie zostać rozszerzony o obsługę podobnego przypadku użycia do tego.