Wprowadzenie
Python jest szeroko stosowany przez inżynierów danych i naukowców zajmujących się danymi do rozwiązywania wszelkiego rodzaju problemów, od potoków ETL/ELT po budowanie modeli uczenia maszynowego. Apache HBase to skuteczny system przechowywania danych dla wielu przepływów pracy, ale dostęp do tych danych w szczególności za pośrednictwem Pythona może być trudny. Dla profesjonalistów danych, którzy chcą korzystać z danych przechowywanych w HBase, najnowszy projekt „hbase-connectors” może być używany z PySpark do podstawowych operacji.
W tej serii blogów wyjaśnimy, jak skonfigurować PySpark i HBase razem do podstawowego użycia Sparka, a także do zadań utrzymywanych w CDSW. Dla tych, którzy nie są zaznajomieni z CDSW, jest to bezpieczna, samoobsługowa platforma analizy danych dla przedsiębiorstw, umożliwiająca analitykom danych zarządzanie własnymi potokami analitycznymi, przyspieszając w ten sposób projekty uczenia maszynowego od eksploracji do produkcji. Aby uzyskać więcej informacji o CDSW, odwiedź stronę produktu Cloudera Data Science Workbench.
W tym poście kilka operacji zostanie wyjaśnionych i zademonstrowanych wraz z przykładowymi danymi wyjściowymi. Dla kontekstu, wszystkie przykładowe operacje w tym konkretnym poście na blogu są uruchamiane z wdrożeniem CDSW.
Warunki wstępne:
- Miej klaster CDP z HBase i Spark
- Jeśli zamierzasz podążać za przykładami za pośrednictwem CDSW, musisz go zainstalować – Instalowanie Cloudera Data Science Workbench
- Python 3 jest zainstalowany w każdym węźle na tej samej ścieżce
Konfiguracja:
Po pierwsze, HBase i Spark muszą być skonfigurowane razem, aby zapytania Spark SQL działały poprawnie. Aby to zrobić, składa się z dwóch części:po pierwsze, skonfiguruj serwery regionu HBase za pomocą Cloudera Manager; a po drugie, upewnij się, że środowisko uruchomieniowe platformy Spark ma powiązania HBase. Należy jednak pamiętać, że Cloudera Manager już konfiguruje niektóre zmienne konfiguracyjne i środowiskowe, aby automatycznie wskazywały Sparka na HBase. Niemniej jednak pierwszy krok konfiguracji zapytań Spark SQL jest wspólny dla wszystkich typów wdrożeń w klastrach CDP, ale drugi jest nieco inny w zależności od typu wdrożenia.
Konfigurowanie serwerów regionu HBase
- Przejdź do Cloudera Manager i wybierz usługę HBase.
- Wyszukaj „środowisko regionu serwera”
- Dodaj nową zmienną środowiskową za pomocą fragmentu zaawansowanej konfiguracji RegionServer Environment (zawór bezpieczeństwa):
- Klucz:HBASE_CLASSPATH
- Wartość:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib /hbase-spark-protocol-shaded.jar:/opt/cloudera/parcels/CDH/jars/scala-library-2.11.12.jar
Upewnij się, że używasz odpowiednich numerów wersji.
- Uruchom ponownie serwery regionów.
Po wykonaniu powyższych kroków wykonaj poniższe kroki, w zależności od tego, czy chcesz wdrożyć CDSW, czy bez CDSW.
Dodawanie powiązań HBase do środowiska uruchomieniowego Spark we wdrożeniach innych niż CDSW
Aby wdrożyć powłokę lub poprawnie użyć spark-submit, użyj następujących poleceń, aby upewnić się, że spark ma odpowiednie powiązania HBase.
pyspark –jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded. słoik
spark-submit –jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol- cieniowany.jar
Dodawanie powiązań HBase do środowiska uruchomieniowego Spark we wdrożeniach CDSW
Aby skonfigurować CDSW z HBase i PySpark, musisz wykonać kilka kroków.
1) Upewnij się, że Python 3 jest zainstalowany na każdym węźle klastra i zanotuj ścieżkę do niego
2) Stwórz nowy projekt w CDSW i użyj szablonu PySpark
3) Otwórz projekt, przejdź do Ustawienia -> Silnik -> Zmienne środowiskowe.
4) Ustaw PYSPARK3_DRIVER_PYTHON i PYSPARK3_PYTHON do ścieżki, w której Python jest zainstalowany w węzłach klastra (ścieżka odnotowana w kroku 1).
Poniżej przykładowy wygląd.
5) W swoim projekcie przejdź do Pliki -> spark-defaults.conf i otwórz go w Workbenchu
6) Skopiuj i wklej poniższy wiersz w tym pliku i upewnij się, że został zapisany przed rozpoczęciem nowej sesji.
spark.jars=/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar
W tym momencie CDSW jest teraz skonfigurowane do uruchamiania zadań PySpark na HBase! Reszta tego wpisu na blogu odnosi się do niektórych przykładowych operacji na wdrożeniu CDSW.
Przykładowe operacje
Operacje umieszczania
Istnieją dwa sposoby wstawiania i aktualizowania wierszy do HBase. Pierwszą i najbardziej zalecaną metodą jest zbudowanie katalogu, który jest schematem, który mapuje kolumny tabeli HBase na ramkę danych PySpark, określając nazwę tabeli i przestrzeń nazw. Skompilowanie tego zdefiniowanego przez użytkownika formatu JSON jest najbardziej preferowaną metodą, ponieważ może być również używany z innymi operacjami. Więcej informacji na temat katalogów można znaleźć w tej dokumentacji http://hbase.apache.org/book.html#_define_catalog. Druga metoda polega na użyciu określonego parametru mapowania o nazwie „hbase.columns.mapping”, który pobiera tylko ciąg par klucz-wartość.
- Korzystanie z katalogów
from pyspark.sql import Row from pyspark.sql import SparkSession spark = SparkSession\ .builder\ .appName("SampleApplication")\ .getOrCreate() tableCatalog = ''.join("""{ "table":{"namespace":"default", "name":"tblEmployee", "tableCoder":"PrimitiveType"}, "rowkey":"key", "columns":{ "key":{"cf":"rowkey", "col":"key", "type":"int"}, "empId":{"cf":"personal","col":"empId","type":"string"}, "empName":{"cf":"personal", "col":"empName", "type":"string"}, "empState":{"cf":"personal", "col":"empWeight", "type":"string"} } }""".split()) employee = [(10, 'jonD', 'Jon Daniels', 'CA'), (6, 'billR', 'Bill Robert', 'FL')] employeeRDD = spark.sparkContext.parallelize(employee) employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empState=x[3])) employeeDF = spark.createDataFrame(employeeMap) employeeDF.write.format("org.apache.hadoop.hbase.spark") \ .options(catalog=tableCatalog, newTable=5) \ .option("hbase.spark.use.hbasecontext", False) \ .save() # newTable refers to the NumberOfRegions which has to be > 3
Sprawdź, czy nowa tabela o nazwie „tblEmployee” została utworzona w HBase, po prostu otwierając powłokę HBase i wykonując następujące polecenie:
zeskanuj ‘tblEmployee’, {‘LIMIT’ => 2}
Korzystanie z katalogów umożliwia również łatwe ładowanie tabel HBase. Zostanie to omówione w przyszłej części.
- Korzystanie z hbase.columns.mapping
Podczas pisania PySpark Dataframe można dodać opcję o nazwie „hbase.columns.mapping”, aby uwzględnić ciąg, który poprawnie mapuje kolumny. Ta opcja pozwala tylko na wstawianie wierszy do istniejących tabel.
W powłoce HBase stwórzmy najpierw tabelę, utwórz „tblEmployee2”, „osobiste”
Teraz w PySpark wstawmy 2 wiersze za pomocą „hbase.columns.mapping”
from pyspark.sql import Row from pyspark.sql import SparkSession spark = SparkSession\ .builder\ .appName("SampleApplication")\ .getOrCreate() employee = [(10, 'jonD', 'Jon Daniels', 170.7), (6, 'billR', 'Bill Robert', 200.1)] employeeRDD = spark.sparkContext.parallelize(employee) employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empWeight=float(x[3]))) employeeDF = spark.createDataFrame(employeeMap) employeeDF.write.format("org.apache.hadoop.hbase.spark") \ .option("hbase.columns.mapping", "key INTEGER :key, empId STRING personal:empId, empName STRING personal:empName, empWeight FLOAT personal:empWeight") \ .option("hbase.table", "tblEmployee2") \ .option("hbase.spark.use.hbasecontext", False) \ .save()
Ponownie, po prostu sprawdź, czy nowa tabela o nazwie „tblEmployee2” ma te nowe wiersze.
zeskanuj ‘tblEmployee2’, {‘LIMIT’ => 2}
To kończy nasze przykłady dotyczące wstawiania wierszy przez PySpark do tabel HBase. W następnej części omówię operacje pobierania i skanowania, PySpark SQL i niektóre rozwiązywanie problemów. Do tego czasu powinieneś zdobyć klaster CDP i przejść przez te przykłady.