Przede wszystkim rodzaj zapytania, które wykonujesz, jest wyjątkowo nieefektywny. Jak na razie (Spark 1.5.0*), aby wykonać takie złącze, obie tabele muszą być przetasowane / podzielone na partycje haszowe za każdym razem, gdy wykonywane jest zapytanie. Nie powinno to stanowić problemu w przypadku users
tabela, w której user_id = 123
predykat jest najprawdopodobniej przesuwany w dół, ale nadal wymaga pełnego przetasowania na user_address
.
Co więcej, jeśli tabele są tylko zarejestrowane, a nie buforowane, każde wykonanie tego zapytania spowoduje pobranie całego user_address
tabela z MySQL na Spark.
Nie jest do końca jasne, dlaczego chcesz używać Sparka do aplikacji, ale konfiguracja pojedynczej maszyny, małe dane i rodzaj zapytań sugerują, że Spark nie pasuje tutaj.
Ogólnie rzecz biorąc, jeśli logika aplikacji wymaga dostępu do pojedynczego rekordu, Spark SQL nie będzie działał dobrze. Jest przeznaczony do zapytań analitycznych, a nie jako zamiennik bazy danych OLTP.
Jeśli pojedyncza tabela/ramka danych jest znacznie mniejsza, możesz spróbować nadawać.
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.broadcast
val user: DataFrame = ???
val user_address: DataFrame = ???
val userFiltered = user.where(???)
user_addresses.join(
broadcast(userFiltered), $"address_id" === $"user_address_id")
* Powinno to zmienić się w Spark 1.6.0 z SPARK-11410 co powinno umożliwić trwałe partycjonowanie tabeli.