Ten wpis na blogu został opublikowany na Hortonworks.com przed fuzją z Cloudera. Niektóre linki, zasoby lub odniesienia mogą nie być już dokładne.
Z dumą ogłaszamy techniczną wersję zapoznawczą złącza Spark-HBase, opracowanego przez Hortonworks we współpracy z Bloomberg.
Łącznik Spark-HBase wykorzystuje interfejs API źródła danych (SPARK-3247) wprowadzony w Spark-1.2.0. Wypełnia lukę między prostym magazynem HBase Key Value a złożonymi relacyjnymi zapytaniami SQL i umożliwia użytkownikom przeprowadzanie złożonej analizy danych w oparciu o HBase przy użyciu Spark. HBase DataFrame to standardowa ramka Spark DataFrame, która może współpracować z dowolnymi innymi źródłami danych, takimi jak Hive, ORC, Parquet, JSON itp.
Tło
Istnieje kilka łączników Spark HBase o otwartym kodzie źródłowym dostępnych jako pakiety Spark, jako niezależne projekty lub w trunku HBase.
Spark został przeniesiony do interfejsów API Dataset/DataFrame, które zapewniają wbudowaną optymalizację planu zapytań. Teraz użytkownicy końcowi wolą używać interfejsu opartego na DataFrames/Datasets.
Złącze HBase w trunku HBase posiada bogate wsparcie na poziomie RDD m.in. BulkPut itp., ale jego obsługa DataFrame nie jest tak bogata. Złącze magistrali HBase opiera się na standardowym HadoopRDD z wbudowanym formatem TableInputFormat HBase ma pewne ograniczenia wydajności. Dodatkowo BulkGet wykonywany w sterowniku może być pojedynczym punktem awarii.
Istnieje kilka innych alternatywnych implementacji. Weź Spark-SQL-on-HBase jako przykład. Stosuje bardzo zaawansowane niestandardowe techniki optymalizacji, osadzając własny plan optymalizacji zapytań w standardowym silniku Spark Catalyst, przesyła RDD do HBase i wykonuje skomplikowane zadania, takie jak częściowa agregacja, wewnątrz koprocesora HBase. Takie podejście jest w stanie osiągnąć wysoką wydajność, ale jest trudne do utrzymania ze względu na jego złożoność i szybką ewolucję Sparka. Również zezwolenie na uruchamianie dowolnego kodu wewnątrz koprocesora może stanowić zagrożenie dla bezpieczeństwa.
Złącze Spark-on-HBase (SHC) zostało opracowane w celu przezwyciężenia tych potencjalnych wąskich gardeł i słabości. Implementuje standardowy interfejs Spark Datasource API i wykorzystuje silnik Spark Catalyst do optymalizacji zapytań. Równolegle RDD jest konstruowany od podstaw zamiast używania TableInputFormat w celu osiągnięcia wysokiej wydajności. Dzięki temu dostosowanemu RDD można zastosować i w pełni wdrożyć wszystkie krytyczne techniki, takie jak przycinanie partycji, przycinanie kolumn, przekazywanie predykatów i lokalizowanie danych. Konstrukcja sprawia, że konserwacja jest bardzo łatwa, a jednocześnie zapewnia dobry kompromis między wydajnością a prostotą.
Architektura
Zakładamy, że Spark i HBase są wdrożone w tym samym klastrze, a programy wykonawcze Spark są zlokalizowane razem z serwerami regionu, jak pokazano na poniższym rysunku.
Rysunek 1. Architektura złącza Spark-on-HBase
Na wysokim poziomie łącznik traktuje funkcje Scan i Get w podobny sposób, a obie akcje są wykonywane w executorach. Sterownik przetwarza zapytanie, agreguje skany/pobiera na podstawie metadanych regionu i generuje zadania według regionu. Zadania są wysyłane do preferowanych executorów znajdujących się w tym samym miejscu co serwer regionu i są wykonywane równolegle w executorach w celu uzyskania lepszej lokalizacji danych i współbieżności. Jeśli region nie przechowuje wymaganych danych, do tego serwera regionu nie przypisano żadnego zadania. Zadanie może składać się z wielu skanowań i pobierania zbiorczego, a żądania danych przez zadanie są pobierane tylko z jednego serwera regionalnego, a ten serwer regionalny będzie również preferowaną lokalizacją dla zadania. Należy zauważyć, że sterownik nie bierze udziału w rzeczywistym wykonywaniu zadań, z wyjątkiem zadań planowania. Dzięki temu kierowca nie jest wąskim gardłem.
Katalog tabel
Aby przenieść tabelę HBase jako relacyjną tabelę do platformy Spark, definiujemy mapowanie między tabelami HBase i Spark o nazwie Katalog tabel. Ten katalog składa się z dwóch krytycznych części. Jedna to definicja klucza wiersza, a druga to mapowanie między kolumną tabeli w Spark i rodziną kolumn i kwalifikatorem kolumn w HBase. Aby uzyskać szczegółowe informacje, zapoznaj się z sekcją Wykorzystanie.
Natywna obsługa Avro
Łącznik natywnie obsługuje format Avro, ponieważ bardzo powszechną praktyką jest utrwalanie uporządkowanych danych w HBase jako tablica bajtów. Użytkownik może utrwalić rekord Avro bezpośrednio w HBase. Wewnętrznie schemat Avro jest automatycznie konwertowany na natywny typ danych Spark Catalyst. Należy zauważyć, że obie części klucz-wartość w tabeli HBase można zdefiniować w formacie Avro. Proszę zapoznać się z przykładami/przypadkami testowymi w repozytorium w celu dokładnego użycia.
Predykat w dół
Łącznik pobiera tylko wymagane kolumny z serwera regionu, aby zmniejszyć obciążenie sieci i uniknąć nadmiarowego przetwarzania w aparacie Spark Catalyst. Istniejące standardowe filtry HBase są używane do wykonywania naciskania predykatów bez wykorzystywania możliwości koprocesora. Ponieważ HBase nie zna typu danych z wyjątkiem tablicy bajtów i niespójności kolejności między typami pierwotnymi Java a tablicą bajtów, musimy wstępnie przetworzyć warunek filtru przed ustawieniem filtru w operacji skanowania, aby uniknąć utraty danych. Na serwerze regionu rekordy niespełniające warunku zapytania są odfiltrowywane.
Przycinanie partycji
Wyodrębniając klucz wiersza z predykatów, dzielimy funkcję Scan/BulkGet na wiele nienakładających się zakresów, tylko serwery regionu, które mają żądane dane, wykonają funkcję Scan/BulkGet. Obecnie przycinanie partycji odbywa się na pierwszym wymiarze kluczy wierszy. Na przykład, jeśli klucz wiersza to „klucz1:klucz2:klucz3”, czyszczenie partycji będzie oparte tylko na „kluczu1”. Zwróć uwagę, że warunki WHERE muszą być dokładnie określone. W przeciwnym razie przycinanie partycji może nie zadziałać. Na przykład, WHERE rowkey1> „abc” OR kolumna =„xyz” (gdzie rowkey1 to pierwszy wymiar klucza wiersza, a kolumna to zwykła kolumna hbase) spowoduje pełne skanowanie, ponieważ musimy pokryć wszystkie zakresy, ponieważ z LUB logika.
Lokalizacja danych
Gdy wykonawca Spark jest współlokowany z serwerami regionu HBase, lokalizacja danych jest osiągana przez zidentyfikowanie lokalizacji serwera regionu i dokłada wszelkich starań, aby zlokalizować zadanie razem z serwerem regionu. Każdy executor wykonuje Scan/BulkGet na części danych znajdujących się na tym samym hoście.
Skanuj i pobieraj zbiorczo
Te dwa operatory są widoczne dla użytkowników przez określenie WHERE CLAUSE, np. WHERE kolumna> x i kolumna
Użycie
Poniżej przedstawiono podstawową procedurę korzystania ze złącza. Aby uzyskać więcej szczegółów i zaawansowanych przypadków użycia, takich jak Avro i obsługa kluczy kompozytowych, zapoznaj się z przykładami w repozytorium.
1) Zdefiniuj katalog mapowania schematu:
[code language="scala"]def catalog = s"""{ |"table":{"namespace":"default", "name":"table1"}, |"rowkey":"key", |"columns":{ |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"}, |"col2":{"cf":"cf2", "col":"col2", "type":"double"}, |"col3":{"cf":"cf3", "col":"col3", "type":"float"}, |"col4":{"cf":"cf4", "col":"col4", "type":"int"}, |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"}, |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"}, |"col7":{"cf":"cf7", "col":"col7", "type":"string"}, |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"} |} |}""".stripMargin [/code]
2) Przygotuj dane i wypełnij tabelę HBase:
case class HBaseRecord(col0:String, col1:Boolean,col2:Double, col3:Float,col4:Int, col5:Long, col6:Short, col7:String, col8:Byte)
object HBaseRecord {def apply(i:Int, t:String):HBaseRecord ={ val s =s”””row${„%03d”.format(i)}””” HBaseRecord(s, i % 2 ==0, i.toDouble, i.toFloat, i, i.toLong, i.toShort, s”String$i:$t”, i.toByte) }}
val data =(0 do 255).map { i => HBaseRecord(i, “extra”)}
sc.parallelize(data).toDF.write.options(
Map(HBaseTableCatalog.tableCatalog -> katalog, HBaseTableCatalog.newTable -> „5”))
.format(„org.apache.spark. sql.execution.datasources.hbase”)
.save()
3) Załaduj DataFrame:
def withCatalog(cat:String):DataFrame ={
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog->cat))
.format( „org.apache.spark.sql.execution.datasources.hbase”)
.load()
}
val df =withCatalog(katalog)
4) Zapytanie zintegrowane w języku:
val s =df.filter((($”kol0″ <=“row050″ &&$”kol0”> “row040”) ||
$”kol0″ ===“row005” ||
$”col0″ ===„row020” ||
$”col0″ === „r20” ||
$”col0″ <=„row005”) &&
($”col4″ ===1 ||
$”col4″ ===42))
.select(„col0”, „col1”, „col4”)
s .pokaż
5) Zapytanie SQL:
df.registerTempTable(„tabela”)
sqlContext.sql(„wybierz liczbę(kolumna1) z tabeli”).pokaż
Konfigurowanie pakietu Spark
Użytkownicy mogą używać łącznika Spark-on-HBase jako standardowego pakietu Spark. Aby dołączyć pakiet do aplikacji Spark, użyj:
spark-shell, pyspark lub spark-submit
> $SPARK_HOME/bin/spark-shell –pakiety zhzhan:shc:0.0.11-1.6.1-s_2.10
Użytkownicy mogą również uwzględnić pakiet jako zależność w pliku SBT. Format to nazwa-pakietu-iskry:wersja
spDependencies +=„zhzhan/shc:0.0.11-1.6.1-s_2.10”
Uruchamianie w bezpiecznym klastrze
Aby uruchomić w klastrze z włączoną obsługą protokołu Kerberos, użytkownik musi dołączyć pliki jar związane z HBase do ścieżki klasy, ponieważ pobieranie i odnawianie tokenu HBase jest wykonywane przez platformę Spark i jest niezależne od łącznika. Innymi słowy, użytkownik musi zainicjować środowisko w normalny sposób, albo przez kinit, albo przez podanie principal/keytab. Poniższe przykłady pokazują, jak uruchomić w bezpiecznym klastrze w trybie zarówno przędza-klient, jak i przędza-klaster. Zwróć uwagę, że SPARK_CLASSPATH musi być ustawiony dla obu trybów, a przykładowy jar jest tylko symbolem zastępczym dla Sparka.
eksportuj SPARK_CLASSPATH=/usr/hdp/current/hbase-client/lib/hbase-common.jar:/usr/hdp/current/hbase-client/lib/hbase-client.jar:/usr/hdp/current/hbase- client/lib/hbase-server.jar:/usr/hdp/current/hbase-client/lib/hbase-protocol.jar:/usr/hdp/current/hbase-client/lib/guava-12.0.1.jar
Załóżmy, że hrt_qa jest kontem bezgłowym, użytkownik może użyć następującego polecenia dla kinit:
kinit -k -t /tmp/hrt_qa.headless.keytab hrt_qa
/usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master yarn-client –packages zhzhan:shc:0.0.11- 1.6.1-s_2.10 –num-executors 4 –driver-memory 512m –executor-memory 512m –executor-cores 1 /usr/hdp/current/spark-client/lib/spark-examples-1.6.1.2.4.2. 0-106-hadoop2.7.1.2.4.2.0-106.jar
/usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master yarn-cluster –files /etc/hbase/conf/hbase -site.xml –pakiety zhzhan:shc:0.0.11-1.6.1-s_2.10 –num-executors 4 –driver-memory 512m –executor-memory 512m –executor-cores 1 /usr/hdp/current/spark- client/lib/spark-examples-1.6.1.2.4.2.0-106-hadoop2.7.1.2.4.2.0-106.jar
Łączenie wszystkiego w całość
Właśnie przedstawiliśmy krótki przegląd tego, jak HBase obsługuje Spark na poziomie DataFrame. Dzięki DataFrame API Spark aplikacje mogą pracować z danymi przechowywanymi w tabeli HBase tak łatwo, jak z danymi przechowywanymi w innych źródłach danych. Dzięki tej nowej funkcji dane w tabelach HBase mogą być łatwo wykorzystywane przez aplikacje Spark i inne interaktywne narzędzia, np. użytkownicy mogą uruchamiać złożone zapytanie SQL na górze tabeli HBase w Spark, wykonywać łączenie tabel w Dataframe lub integrować się z Spark Streaming, aby zaimplementować bardziej skomplikowany system.
Co dalej?
Obecnie łącznik jest hostowany w repozytorium Hortonworks i publikowany jako pakiet Spark. Jest w trakcie migracji do łącza Apache HBase. Podczas migracji zidentyfikowaliśmy kilka krytycznych błędów w trunku HBase, które zostaną naprawione wraz z scaleniem. Praca społeczności jest śledzona przez parasol HBase JIRA HBASE-14789, w tym HBASE-14795 i HBASE-14796, aby zoptymalizować podstawową architekturę obliczeniową dla skanowania i BulkGet, HBASE-14801, aby zapewnić interfejs użytkownika JSON ułatwiający użytkowanie, HBASE-15336 dla ścieżka zapisu DataFrame, HBASE-15334 do obsługi Avro, HBASE-15333 do obsługi prostych typów Java, takich jak short, int, long, float i double itp., HBASE-15335 do obsługi złożonego klucza wiersza i HBASE-15572 aby dodać opcjonalną semantykę znacznika czasu. Nie możemy się doczekać produkcji przyszłej wersji złącza, która sprawi, że praca z nim będzie jeszcze łatwiejsza.
Potwierdzenie
Chcielibyśmy podziękować Hamelowi Kothari, Sudarshan Kadambi i zespołowi Bloomberg za prowadzenie nas w tej pracy, a także pomaganie nam w walidacji tej pracy. Chcemy również podziękować społeczności HBase za przekazanie opinii i ulepszenie tego. Wreszcie, ta praca wykorzystała lekcje z wcześniejszych integracji Spark HBase i chcemy podziękować ich programistom za utorowanie drogi.
Odniesienie:
SHC:https://github.com/hortonworks/shc-release
Pakiet Spark:http://spark-packages.org/package/zhzhan/shc
Apache HBase:https://hbase.apache.org/
Apache Spark:http://spark.apache.org/