Redis
 sql >> Baza danych >  >> NoSQL >> Redis

Redis na Spark:Zadanie nie do serializacji

W Spark funkcje na RDD s (jak map tutaj) są serializowane i wysyłane do wykonawców w celu przetworzenia. Oznacza to, że wszystkie elementy zawarte w tych operacjach powinny być możliwe do serializacji.

Połączenie Redis nie jest tutaj możliwe do serializacji, ponieważ otwiera połączenia TCP z docelową bazą danych, które są powiązane z maszyną, na której zostało utworzone.

Rozwiązaniem jest utworzenie tych połączeń na executorach, w lokalnym kontekście wykonania. Jest na to kilka sposobów. Dwie, które przychodzą mi na myśl to:

  • rdd.mapPartitions :pozwala przetwarzać całą partycję na raz, a tym samym amortyzować koszty tworzenia połączeń)
  • Singletonowe menedżery połączeń:utwórz połączenie raz na executor

mapPartitions jest łatwiejsze, ponieważ wymaga jedynie niewielkiej zmiany w strukturze programu:

val perhit = perhitFile.mapPartitions{partition => 
    val r = new RedisClient("192.168.1.101", 6379) // create the connection in the context of the mapPartition operation
    val res = partition.map{ x =>
        ...
        val refStr = r.hmget(...) // use r to process the local data
    }
    r.close // take care of resources
    res
}

Menedżer połączeń singleton może być modelowany za pomocą obiektu, który przechowuje leniwe odniesienie do połączenia (uwaga:zmienny ref również zadziała).

object RedisConnection extends Serializable {
   lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379)
}

Ten obiekt może być następnie użyty do utworzenia instancji 1 połączenia na maszynę JVM procesu roboczego i jest używany jako Serializable obiekt w operacji zamknięcia.

val perhit = perhitFile.map{x => 
    val param = f(x)
    val refStr = RedisConnection.conn.hmget(...) // use RedisConnection to get a connection to the local data
    }
}

Zaletą korzystania z obiektu singleton jest mniejsze obciążenie, ponieważ połączenia są tworzone tylko raz przez JVM (w przeciwieństwie do 1 na partycję RDD)

Są też pewne wady:

  • czyszczenie połączeń jest trudne (zamykanie haka/timery)
  • należy zapewnić bezpieczeństwo wątków współdzielonych zasobów

(*) kod podany w celach ilustracyjnych. Nie skompilowany ani przetestowany.



  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. jak mogę uruchomić sesje za pomocą redis, express i socket.io?

  2. Laravel:Jak sprawdzić dostępność Redisa?

  3. Pod DigitalOcean ma niezwiązane natychmiastowe trwałe roszczenia dotyczące woluminów

  4. Jak naprawić ostrzeżenia podczas uruchamiania obrazu redis:alpine Docker

  5. Dobry sposób na użycie socket.io z klastrem na serwerze wielordzeniowym?