W Spark funkcje na RDD
s (jak map
tutaj) są serializowane i wysyłane do wykonawców w celu przetworzenia. Oznacza to, że wszystkie elementy zawarte w tych operacjach powinny być możliwe do serializacji.
Połączenie Redis nie jest tutaj możliwe do serializacji, ponieważ otwiera połączenia TCP z docelową bazą danych, które są powiązane z maszyną, na której zostało utworzone.
Rozwiązaniem jest utworzenie tych połączeń na executorach, w lokalnym kontekście wykonania. Jest na to kilka sposobów. Dwie, które przychodzą mi na myśl to:
rdd.mapPartitions
:pozwala przetwarzać całą partycję na raz, a tym samym amortyzować koszty tworzenia połączeń)- Singletonowe menedżery połączeń:utwórz połączenie raz na executor
mapPartitions
jest łatwiejsze, ponieważ wymaga jedynie niewielkiej zmiany w strukturze programu:
val perhit = perhitFile.mapPartitions{partition =>
val r = new RedisClient("192.168.1.101", 6379) // create the connection in the context of the mapPartition operation
val res = partition.map{ x =>
...
val refStr = r.hmget(...) // use r to process the local data
}
r.close // take care of resources
res
}
Menedżer połączeń singleton może być modelowany za pomocą obiektu, który przechowuje leniwe odniesienie do połączenia (uwaga:zmienny ref również zadziała).
object RedisConnection extends Serializable {
lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379)
}
Ten obiekt może być następnie użyty do utworzenia instancji 1 połączenia na maszynę JVM procesu roboczego i jest używany jako Serializable
obiekt w operacji zamknięcia.
val perhit = perhitFile.map{x =>
val param = f(x)
val refStr = RedisConnection.conn.hmget(...) // use RedisConnection to get a connection to the local data
}
}
Zaletą korzystania z obiektu singleton jest mniejsze obciążenie, ponieważ połączenia są tworzone tylko raz przez JVM (w przeciwieństwie do 1 na partycję RDD)
Są też pewne wady:
- czyszczenie połączeń jest trudne (zamykanie haka/timery)
- należy zapewnić bezpieczeństwo wątków współdzielonych zasobów
(*) kod podany w celach ilustracyjnych. Nie skompilowany ani przetestowany.