Redis
 sql >> Baza danych >  >> NoSQL >> Redis

Jak zaimplementować strumień futures dla wywołania blokującego za pomocą futures.rs i Redis PubSub?

Ciężkie zastrzeżenie Nigdy wcześniej nie korzystałem z tej biblioteki, a moja słaba wiedza na temat niektórych pojęć jest trochę... brakująca. Przeważnie czytam samouczek. Jestem prawie pewien, że każdy, kto wykonał pracę asynchroniczną, przeczyta to i będzie się śmiać, ale może to być przydatny punkt wyjścia dla innych osób. Unikaj płatnika!

Zacznijmy od czegoś prostszego, pokazując, jak Stream Pracuje. Możemy przekonwertować iterator Result s do strumienia:

extern crate futures;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())];
    let payloads = stream::iter(payloads.into_iter());

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
}

To pokazuje nam jeden sposób na wykorzystanie strumienia. Używamy and_then zrobić coś z każdym ładunkiem (tutaj po prostu go wydrukować), a następnie for_each przekonwertować Stream z powrotem do Future . Następnie możemy uruchomić przyszłość, wywołując dziwnie nazwany forget metoda.

Następnie powiąż bibliotekę Redis z miksem, obsługując tylko jedną wiadomość. Ponieważ get_message() metoda blokuje, musimy wprowadzić do miksu kilka wątków. Nie jest dobrym pomysłem wykonywanie dużej ilości pracy w tego typu systemie asynchronicznym, ponieważ wszystko inne zostanie zablokowane. Na przykład:

O ile nie ustalono inaczej, należy upewnić się, że wdrożenia tej funkcji kończą się bardzo szybko .

W idealnym świecie skrzynka redis byłaby zbudowana na bazie takiej biblioteki jak futures i eksponowałaby to wszystko natywnie.

extern crate redis;
extern crate futures;

use std::thread;
use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let msg = pubsub.get_message().expect("Unable to get message");
        let payload: Result<String, _> = msg.get_payload();
        tx.send(payload).forget();
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

Moje rozumienie staje się tutaj bardziej rozmyte. W osobnym wątku blokujemy wiadomość i wrzucamy ją do kanału, gdy ją otrzymamy. Nie rozumiem, dlaczego musimy trzymać się uchwytu wątku. Spodziewałbym się, że foo.forget blokowałby się, czekając, aż strumień będzie pusty.

W połączeniu telnetowym z serwerem Redis wyślij to:

publish rust awesome

I zobaczysz, że to działa. Dodanie instrukcji print pokazuje, że (dla mnie) foo.forget instrukcja jest uruchamiana przed pojawieniem się wątku.

Wiele wiadomości jest trudniejsze. Sender zużywa się, aby zapobiec sytuacji, w której strona generująca wyprzedza zbytnio stronę zużywającą. Osiąga się to poprzez zwrócenie innej przyszłości z send ! Musimy przenieść go z powrotem, aby użyć go ponownie w następnej iteracji pętli:

extern crate redis;
extern crate futures;

use std::thread;
use std::sync::mpsc;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let mut tx = tx;

        while let Ok(msg) = pubsub.get_message() {
            let payload: Result<String, _> = msg.get_payload();

            let (next_tx_tx, next_tx_rx) = mpsc::channel();

            tx.send(payload).and_then(move |new_tx| {
                next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
                futures::finished(())
            }).forget();

            tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
        }
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

Jestem pewien, że z biegiem czasu pojawi się więcej ekosystemu dla tego typu współdziałania. Na przykład skrzynka futures-cpupool może prawdopodobnie zostać rozszerzony o obsługę podobnego przypadku użycia do tego.




  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. W pełni zarządzany hosting ScaleGrid dla Redis™ już dostępny na platformie Azure

  2. Stan sesji z usługą Azure Redis Cache nie działa w wielu instancjach

  3. Połączenie WebSocket z adresem <URL> nie powiodło się:Błąd podczas uzgadniania WebSocket:Nieoczekiwany kod odpowiedzi:521

  4. Jak uruchomić Redis na platformie Azure?

  5. Nie znaleziono gniazda modułu lua