HBase
 sql >> Baza danych >  >> NoSQL >> HBase

Spark-on-HBase:złącze HBase oparte na DataFrame

Ten wpis na blogu został opublikowany na Hortonworks.com przed fuzją z Cloudera. Niektóre linki, zasoby lub odniesienia mogą nie być już dokładne.

Z dumą ogłaszamy techniczną wersję zapoznawczą złącza Spark-HBase, opracowanego przez Hortonworks we współpracy z Bloomberg.

Łącznik Spark-HBase wykorzystuje interfejs API źródła danych (SPARK-3247) wprowadzony w Spark-1.2.0. Wypełnia lukę między prostym magazynem HBase Key Value a złożonymi relacyjnymi zapytaniami SQL i umożliwia użytkownikom przeprowadzanie złożonej analizy danych w oparciu o HBase przy użyciu Spark. HBase DataFrame to standardowa ramka Spark DataFrame, która może współpracować z dowolnymi innymi źródłami danych, takimi jak Hive, ORC, Parquet, JSON itp.

Tło

Istnieje kilka łączników Spark HBase o otwartym kodzie źródłowym dostępnych jako pakiety Spark, jako niezależne projekty lub w trunku HBase.

Spark został przeniesiony do interfejsów API Dataset/DataFrame, które zapewniają wbudowaną optymalizację planu zapytań. Teraz użytkownicy końcowi wolą używać interfejsu opartego na DataFrames/Datasets.

Złącze HBase w trunku HBase posiada bogate wsparcie na poziomie RDD m.in. BulkPut itp., ale jego obsługa DataFrame nie jest tak bogata. Złącze magistrali HBase opiera się na standardowym HadoopRDD z wbudowanym formatem TableInputFormat HBase ma pewne ograniczenia wydajności. Dodatkowo BulkGet wykonywany w sterowniku może być pojedynczym punktem awarii.

Istnieje kilka innych alternatywnych implementacji. Weź Spark-SQL-on-HBase jako przykład. Stosuje bardzo zaawansowane niestandardowe techniki optymalizacji, osadzając własny plan optymalizacji zapytań w standardowym silniku Spark Catalyst, przesyła RDD do HBase i wykonuje skomplikowane zadania, takie jak częściowa agregacja, wewnątrz koprocesora HBase. Takie podejście jest w stanie osiągnąć wysoką wydajność, ale jest trudne do utrzymania ze względu na jego złożoność i szybką ewolucję Sparka. Również zezwolenie na uruchamianie dowolnego kodu wewnątrz koprocesora może stanowić zagrożenie dla bezpieczeństwa.

Złącze Spark-on-HBase (SHC) zostało opracowane w celu przezwyciężenia tych potencjalnych wąskich gardeł i słabości. Implementuje standardowy interfejs Spark Datasource API i wykorzystuje silnik Spark Catalyst do optymalizacji zapytań. Równolegle RDD jest konstruowany od podstaw zamiast używania TableInputFormat w celu osiągnięcia wysokiej wydajności. Dzięki temu dostosowanemu RDD można zastosować i w pełni wdrożyć wszystkie krytyczne techniki, takie jak przycinanie partycji, przycinanie kolumn, przekazywanie predykatów i lokalizowanie danych. Konstrukcja sprawia, że ​​konserwacja jest bardzo łatwa, a jednocześnie zapewnia dobry kompromis między wydajnością a prostotą.

Architektura

Zakładamy, że Spark i HBase są wdrożone w tym samym klastrze, a programy wykonawcze Spark są zlokalizowane razem z serwerami regionu, jak pokazano na poniższym rysunku.

Rysunek 1. Architektura złącza Spark-on-HBase

Na wysokim poziomie łącznik traktuje funkcje Scan i Get w podobny sposób, a obie akcje są wykonywane w executorach. Sterownik przetwarza zapytanie, agreguje skany/pobiera na podstawie metadanych regionu i generuje zadania według regionu. Zadania są wysyłane do preferowanych executorów znajdujących się w tym samym miejscu co serwer regionu i są wykonywane równolegle w executorach w celu uzyskania lepszej lokalizacji danych i współbieżności. Jeśli region nie przechowuje wymaganych danych, do tego serwera regionu nie przypisano żadnego zadania. Zadanie może składać się z wielu skanowań i pobierania zbiorczego, a żądania danych przez zadanie są pobierane tylko z jednego serwera regionalnego, a ten serwer regionalny będzie również preferowaną lokalizacją dla zadania. Należy zauważyć, że sterownik nie bierze udziału w rzeczywistym wykonywaniu zadań, z wyjątkiem zadań planowania. Dzięki temu kierowca nie jest wąskim gardłem.

Katalog tabel

Aby przenieść tabelę HBase jako relacyjną tabelę do platformy Spark, definiujemy mapowanie między tabelami HBase i Spark o nazwie Katalog tabel. Ten katalog składa się z dwóch krytycznych części. Jedna to definicja klucza wiersza, a druga to mapowanie między kolumną tabeli w Spark i rodziną kolumn i kwalifikatorem kolumn w HBase. Aby uzyskać szczegółowe informacje, zapoznaj się z sekcją Wykorzystanie.

Natywna obsługa Avro

Łącznik natywnie obsługuje format Avro, ponieważ bardzo powszechną praktyką jest utrwalanie uporządkowanych danych w HBase jako tablica bajtów. Użytkownik może utrwalić rekord Avro bezpośrednio w HBase. Wewnętrznie schemat Avro jest automatycznie konwertowany na natywny typ danych Spark Catalyst. Należy zauważyć, że obie części klucz-wartość w tabeli HBase można zdefiniować w formacie Avro. Proszę zapoznać się z przykładami/przypadkami testowymi w repozytorium w celu dokładnego użycia.

Predykat w dół

Łącznik pobiera tylko wymagane kolumny z serwera regionu, aby zmniejszyć obciążenie sieci i uniknąć nadmiarowego przetwarzania w aparacie Spark Catalyst. Istniejące standardowe filtry HBase są używane do wykonywania naciskania predykatów bez wykorzystywania możliwości koprocesora. Ponieważ HBase nie zna typu danych z wyjątkiem tablicy bajtów i niespójności kolejności między typami pierwotnymi Java a tablicą bajtów, musimy wstępnie przetworzyć warunek filtru przed ustawieniem filtru w operacji skanowania, aby uniknąć utraty danych. Na serwerze regionu rekordy niespełniające warunku zapytania są odfiltrowywane.

Przycinanie partycji

Wyodrębniając klucz wiersza z predykatów, dzielimy funkcję Scan/BulkGet na wiele nienakładających się zakresów, tylko serwery regionu, które mają żądane dane, wykonają funkcję Scan/BulkGet. Obecnie przycinanie partycji odbywa się na pierwszym wymiarze kluczy wierszy. Na przykład, jeśli klucz wiersza to „klucz1:klucz2:klucz3”, czyszczenie partycji będzie oparte tylko na „kluczu1”. Zwróć uwagę, że warunki WHERE muszą być dokładnie określone. W przeciwnym razie przycinanie partycji może nie zadziałać. Na przykład, WHERE rowkey1> „abc” OR kolumna =„xyz” (gdzie rowkey1 to pierwszy wymiar klucza wiersza, a kolumna to zwykła kolumna hbase) spowoduje pełne skanowanie, ponieważ musimy pokryć wszystkie zakresy, ponieważ z LUB logika.

Lokalizacja danych

Gdy wykonawca Spark jest współlokowany z serwerami regionu HBase, lokalizacja danych jest osiągana przez zidentyfikowanie lokalizacji serwera regionu i dokłada wszelkich starań, aby zlokalizować zadanie razem z serwerem regionu. Każdy executor wykonuje Scan/BulkGet na części danych znajdujących się na tym samym hoście.

Skanuj i pobieraj zbiorczo

Te dwa operatory są widoczne dla użytkowników przez określenie WHERE CLAUSE, np. WHERE kolumna> x i kolumna for scan i WHERE column =x zapominać. Operacje są wykonywane w executorach, a sterownik tylko je konstruuje. Wewnętrznie są one konwertowane na skanowanie i/lub pobieranie, a Iterator[Row] jest zwracany do silnika katalizatora w celu przetwarzania górnej warstwy.

Użycie

Poniżej przedstawiono podstawową procedurę korzystania ze złącza. Aby uzyskać więcej szczegółów i zaawansowanych przypadków użycia, takich jak Avro i obsługa kluczy kompozytowych, zapoznaj się z przykładami w repozytorium.

1) Zdefiniuj katalog mapowania schematu:

[code language="scala"]def catalog = s"""{
         |"table":{"namespace":"default", "name":"table1"},
         |"rowkey":"key",
         |"columns":{
           |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
           |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
           |"col2":{"cf":"cf2", "col":"col2", "type":"double"},
           |"col3":{"cf":"cf3", "col":"col3", "type":"float"},
           |"col4":{"cf":"cf4", "col":"col4", "type":"int"},
           |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
           |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
           |"col7":{"cf":"cf7", "col":"col7", "type":"string"},
           |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
         |}
       |}""".stripMargin
[/code]

2) Przygotuj dane i wypełnij tabelę HBase:
case class HBaseRecord(col0:String, col1:Boolean,col2:Double, col3:Float,col4:Int,       col5:Long, col6:Short, col7:String, col8:Byte)

object HBaseRecord {def apply(i:Int, t:String):HBaseRecord ={ val s =s”””row${„%03d”.format(i)}”””       HBaseRecord(s, i % 2 ==0, i.toDouble, i.toFloat,  i, i.toLong, i.toShort,  s”String$i:$t”,      i.toByte) }}

val data =(0 do 255).map { i =>  HBaseRecord(i, “extra”)}

sc.parallelize(data).toDF.write.options(
 Map(HBaseTableCatalog.tableCatalog -> katalog, HBaseTableCatalog.newTable -> „5”))
 .format(„org.apache.spark. sql.execution.datasources.hbase”)
 .save()
 
3) Załaduj DataFrame:
def withCatalog(cat:String):DataFrame ={
 sqlContext
 .read
 .options(Map(HBaseTableCatalog.tableCatalog->cat))
 .format( „org.apache.spark.sql.execution.datasources.hbase”)
 .load()
}

val df =withCatalog(katalog)

4) Zapytanie zintegrowane w języku:
val s =df.filter((($”kol0″ <=“row050″ &&$”kol0”> “row040”) ||
 $”kol0″ ===“row005” ||
 $”col0″ ===„row020” ||
 $”col0″ === „r20” ||
 $”col0″ <=„row005”) &&
 ($”col4″ ===1 ||
 $”col4″ ===42))
 .select(„col0”, „col1”, „col4”)
s .pokaż

5) Zapytanie SQL:
df.registerTempTable(„tabela”)
sqlContext.sql(„wybierz liczbę(kolumna1) z tabeli”).pokaż

Konfigurowanie pakietu Spark

Użytkownicy mogą używać łącznika Spark-on-HBase jako standardowego pakietu Spark. Aby dołączyć pakiet do aplikacji Spark, użyj:

spark-shell, pyspark lub spark-submit

> $SPARK_HOME/bin/spark-shell –pakiety zhzhan:shc:0.0.11-1.6.1-s_2.10

Użytkownicy mogą również uwzględnić pakiet jako zależność w pliku SBT. Format to nazwa-pakietu-iskry:wersja

spDependencies +=„zhzhan/shc:0.0.11-1.6.1-s_2.10”

Uruchamianie w bezpiecznym klastrze

Aby uruchomić w klastrze z włączoną obsługą protokołu Kerberos, użytkownik musi dołączyć pliki jar związane z HBase do ścieżki klasy, ponieważ pobieranie i odnawianie tokenu HBase jest wykonywane przez platformę Spark i jest niezależne od łącznika. Innymi słowy, użytkownik musi zainicjować środowisko w normalny sposób, albo przez kinit, albo przez podanie principal/keytab. Poniższe przykłady pokazują, jak uruchomić w bezpiecznym klastrze w trybie zarówno przędza-klient, jak i przędza-klaster. Zwróć uwagę, że SPARK_CLASSPATH musi być ustawiony dla obu trybów, a przykładowy jar jest tylko symbolem zastępczym dla Sparka.

eksportuj SPARK_CLASSPATH=/usr/hdp/current/hbase-client/lib/hbase-common.jar:/usr/hdp/current/hbase-client/lib/hbase-client.jar:/usr/hdp/current/hbase- client/lib/hbase-server.jar:/usr/hdp/current/hbase-client/lib/hbase-protocol.jar:/usr/hdp/current/hbase-client/lib/guava-12.0.1.jar

Załóżmy, że hrt_qa jest kontem bezgłowym, użytkownik może użyć następującego polecenia dla kinit:

kinit -k -t /tmp/hrt_qa.headless.keytab hrt_qa

/usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master yarn-client –packages zhzhan:shc:0.0.11- 1.6.1-s_2.10 –num-executors 4 –driver-memory 512m –executor-memory 512m –executor-cores 1 /usr/hdp/current/spark-client/lib/spark-examples-1.6.1.2.4.2. 0-106-hadoop2.7.1.2.4.2.0-106.jar

/usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master yarn-cluster –files /etc/hbase/conf/hbase -site.xml –pakiety zhzhan:shc:0.0.11-1.6.1-s_2.10 –num-executors 4 –driver-memory 512m –executor-memory 512m –executor-cores 1 /usr/hdp/current/spark- client/lib/spark-examples-1.6.1.2.4.2.0-106-hadoop2.7.1.2.4.2.0-106.jar

Łączenie wszystkiego w całość

Właśnie przedstawiliśmy krótki przegląd tego, jak HBase obsługuje Spark na poziomie DataFrame. Dzięki DataFrame API Spark aplikacje mogą pracować z danymi przechowywanymi w tabeli HBase tak łatwo, jak z danymi przechowywanymi w innych źródłach danych. Dzięki tej nowej funkcji dane w tabelach HBase mogą być łatwo wykorzystywane przez aplikacje Spark i inne interaktywne narzędzia, np. użytkownicy mogą uruchamiać złożone zapytanie SQL na górze tabeli HBase w Spark, wykonywać łączenie tabel w Dataframe lub integrować się z Spark Streaming, aby zaimplementować bardziej skomplikowany system.

Co dalej?

Obecnie łącznik jest hostowany w repozytorium Hortonworks i publikowany jako pakiet Spark. Jest w trakcie migracji do łącza Apache HBase. Podczas migracji zidentyfikowaliśmy kilka krytycznych błędów w trunku HBase, które zostaną naprawione wraz z scaleniem. Praca społeczności jest śledzona przez parasol HBase JIRA HBASE-14789, w tym HBASE-14795 i HBASE-14796, aby zoptymalizować podstawową architekturę obliczeniową dla skanowania i BulkGet, HBASE-14801, aby zapewnić interfejs użytkownika JSON ułatwiający użytkowanie, HBASE-15336 dla ścieżka zapisu DataFrame, HBASE-15334 do obsługi Avro, HBASE-15333 do obsługi prostych typów Java, takich jak short, int, long, float i double itp., HBASE-15335 do obsługi złożonego klucza wiersza i HBASE-15572 aby dodać opcjonalną semantykę znacznika czasu. Nie możemy się doczekać produkcji przyszłej wersji złącza, która sprawi, że praca z nim będzie jeszcze łatwiejsza.

Potwierdzenie

Chcielibyśmy podziękować Hamelowi Kothari, Sudarshan Kadambi i zespołowi Bloomberg za prowadzenie nas w tej pracy, a także pomaganie nam w walidacji tej pracy. Chcemy również podziękować społeczności HBase za przekazanie opinii i ulepszenie tego. Wreszcie, ta praca wykorzystała lekcje z wcześniejszych integracji Spark HBase i chcemy podziękować ich programistom za utorowanie drogi.

Odniesienie:

SHC:https://github.com/hortonworks/shc-release

Pakiet Spark:http://spark-packages.org/package/zhzhan/shc

Apache HBase:https://hbase.apache.org/

Apache Spark:http://spark.apache.org/


  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. Tworzenie aplikacji do uczenia maszynowego za pomocą środowiska pracy i operacyjnej bazy danych Cloudera Data Science, część 1:Konfiguracja i podstawy

  2. Podejścia do tworzenia kopii zapasowych i odzyskiwania po awarii w HBase

  3. Wykorzystanie inżynierii danych Cloudera do analizy danych programu ochrony wypłat

  4. Jak HBase w CDP może wykorzystać S3 firmy Amazon?

  5. Apache Hadoop Ochrona ozonowa — uwierzytelnianie