HBase
 sql >> Baza danych >  >> NoSQL >> HBase

Instrukcje:skanowanie tabel Salted Apache HBase z zakresami kluczy specyficznymi dla regionu w MapReduce

Podziękowania dla Pengyu Wang, programisty w FINRA, za pozwolenie na ponowne opublikowanie tego posta.

Tabele Salted Apache HBase ze wstępnym podziałem to sprawdzone i skuteczne rozwiązanie HBase, które zapewnia równomierne rozłożenie obciążenia między serwerami RegionServers i zapobiega powstawaniu gorących punktów podczas zapisu zbiorczego. W tym projekcie klucz rzędu składa się z klucza logicznego i soli na początku. Jednym ze sposobów generowania soli jest obliczenie n (liczby regionów) modulo w kodzie skrótu logicznego klucza wiersza (data itp.).

Klawisze do solenia

Na przykład tabela akceptująca codzienne ładowanie danych może używać logicznych kluczy wierszy rozpoczynających się od daty, a my chcemy wstępnie podzielić tę tabelę na 1000 regionów. W tym przypadku spodziewamy się wygenerowania 1000 różnych soli. Sól może być generowana na przykład jako:

StringUtils.leftPad(Integer.toString(Math.abs(keyCore.hashCode() % 1000)), 3, "0") + "|" + logicalKey 

logicalKey = 2015-04-26|abc
rowKey = 893|2015-04-26|abc

Dane wyjściowe z hashCode() z modulo zapewnia losowość wartości soli od „000” do „999”. Dzięki tej transformacji klucza tabela jest wstępnie dzielona na granicach soli podczas tworzenia. Spowoduje to równomierne rozłożenie woluminów wierszy podczas ładowania HFiles za pomocą funkcji MapReduce bulkload. Gwarantuje to, że klucze rzędów z tą samą solą wpadają w ten sam region.

W wielu przypadkach użycia, takich jak archiwizacja danych, musisz skanować lub kopiować dane w określonym zakresie kluczy logicznych (zakres dat) za pomocą zadania MapReduce. Standardowe zadania MapReduce w tabeli są konfigurowane przez dostarczenie Scan instancja z kluczowymi atrybutami zakresu.

Scan scan = new Scan();
scan.setCaching(1000);
scan.setCacheBlocks(false);
scan.setBatch(1000);
scan.setMaxVersions(1);
scan.setStartRow(Bytes.toBytes("2015-04-26"));
scan.setStopRow(Bytes.toBytes("2015-04-27"));

/* Setup the table mapper job */
TableMapReduceUtil.initTableMapperJob(
tablename,
scan,
DataScanMapper.class,
ImmutableBytesWritable.class,
KeyValue.class,
job, 
true, 
TableInputFormat.class
);
…

Jednak konfiguracja takiej pracy staje się wyzwaniem dla solonych, wstępnie podzielonych stołów. Klawisze startu i stopu będą różne dla każdego regionu, ponieważ każdy ma inną sól. I nie możemy określić wielu zakresów do jednego Scan przykład.

Aby rozwiązać ten problem, musimy przyjrzeć się, jak działa tabela MapReduce. Ogólnie rzecz biorąc, struktura MapReduce tworzy jedno zadanie mapy do odczytywania i przetwarzania każdego podziału danych wejściowych. Każdy podział jest generowany w InputFormat podstawa klasy, metodą getSplits() .

W zadaniu MapReduce tabeli HBase, TableInputFormat jest używany jako InputFormat . Wewnątrz implementacji getSplits() metoda jest zastępowana, aby pobrać klawisze początku i końca wiersza z Scan instancja. Ponieważ klawisze początku i końca wiersza obejmują wiele regionów, zakres jest podzielony przez granice regionu i zwraca listę TableSplit obiekty, które obejmują zakres klucza skanowania. Zamiast opierać się na bloku HDFS, TableSplit s są oparte na regionie. Zastępując getSplits() jesteśmy w stanie kontrolować TableSplit .

Tworzenie niestandardowego formatu TableInputFormat

Aby zmienić zachowanie getSplits() metoda, niestandardowa klasa rozszerzająca TableInputFormat jest wymagane. Cel getSplits() tutaj jest pokrycie zakresu kluczy logicznych w każdym regionie, skonstruuj ich zakres kluczy rzędowych z ich unikalną solą. Klasa HTable udostępnia metodę getStartEndKeys() który zwraca klucze początku i końca wiersza dla każdego regionu. Z każdego klucza startowego przeanalizuj odpowiednią sól dla regionu.

Pair keys = table.getStartEndKeys();
for (int i = 0; i < keys.getFirst().length; i++) {

// The first 3 bytes is the salt, for the first region, start key is empty, so apply “000”
if (keys.getFirst()[i].length == 0) {
regionSalt = "000";
} else {
regionSalt = Bytes.toString(keys.getFirst()[i]).substring(0, 3);
}
…
}

Konfiguracja zadania spełnia zakres klucza logicznego

TableInputFormat pobiera klucz start i stop z Scan instancja. Ponieważ nie możemy użyć Scan w naszym zadaniu MapReduce moglibyśmy użyć Configuration zamiast tego należy przekazać te dwie zmienne i tylko logiczny klucz start i stop jest wystarczająco dobry (zmienną może być data lub inna informacja biznesowa). getSplits() metoda ma JobContext argument, Instancję konfiguracji można odczytać jako context.getConfiguration() .

W sterowniku MapReduce:

Configuration conf = getConf();
conf = HBaseConfiguration.addHbaseResources(conf);

conf.set("logical.scan.start", "2015-04-26");
conf.set("logical.scan.stop", "2015-04-27");

W Custom TableInputFormat :

@Override 
public List getSplits(JobContext context) throws IOException {
conf = context.getConfiguration();
String scanStart = conf.get("logical.scan.start");
String scanStop = conf.get("logical.scan.stop");
…
}

Zrekonstruuj zakres solonego klucza według regionu

Teraz, gdy mamy klucz soli i logiczny start/stop dla każdego regionu, możemy odbudować rzeczywisty zakres kluczy wiersza.

byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart);
byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop);

Tworzenie podziału tabeli dla każdego regionu

Za pomocą zakresu kluczy wiersza możemy teraz zainicjować TableSplit przykład dla regionu.

List splits = new ArrayList(keys.getFirst().length);
for (int i = 0; i < keys.getFirst().length; i++) {
…
byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart);
byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop);

InputSplit split = new TableSplit(table.getTableName(), startRowKey, endRowKey, regionLocation);
splits.add(split);
}

Jeszcze jedną rzeczą, na którą należy zwrócić uwagę, jest lokalizacja danych. Struktura wykorzystuje informacje o lokalizacji w każdym podziale wejściowym, aby przypisać zadanie mapy na swoim lokalnym hoście. Dla naszego TableInputFormat , używamy metody getTableRegionLocation() aby pobrać lokalizację regionu obsługującego klawisz wiersza.

Ta lokalizacja jest następnie przekazywana do TableSplit konstruktor. Zapewni to, że program odwzorowujący przetwarzający podział tabeli znajduje się na tym samym serwerze regionu. Jedna metoda o nazwie DNS.reverseDns() , wymaga adresu serwera nazw HBase. Ten atrybut jest przechowywany w konfiguracji „hbase.nameserver.address „.

this.nameServer = context.getConfiguration().get("hbase.nameserver.address", null);
…

public String getTableRegionLocation(HTable table, byte[] rowKey) throws IOException {
HServerAddress regionServerAddress = table.getRegionLocation(rowKey).getServerAddress();
InetAddress regionAddress = regionServerAddress.getInetSocketAddress().getAddress();
String regionLocation;
try {
regionLocation = reverseDNS(regionAddress);
} catch (NamingException e) {
regionLocation = regionServerAddress.getHostname();
}
return regionLocation;
}

protected String reverseDNS(InetAddress ipAddress) throws NamingException {
String hostName = this.reverseDNSCacheMap.get(ipAddress);
if (hostName == null) {
hostName = Strings.domainNamePointerToHostName(DNS.reverseDns(ipAddress, this.nameServer));
this.reverseDNSCacheMap.put(ipAddress, hostName);
}
return hostName;
}

Pełny kod getSplits będzie wyglądać tak:

@Override 
public List getSplits(JobContext context) throws IOException {
conf = context.getConfiguration();
table = getHTable(conf);
if (table == null) {
throw new IOException("No table was provided.");
}

// Get the name server address and the default value is null.
this.nameServer = conf.get("hbase.nameserver.address", null);
String scanStart = conf.get("region.scan.start");
String scanStop = conf.get("region.scan.stop");

Pair keys = table.getStartEndKeys();
if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
throw new RuntimeException("At least one region is expected");
}
List splits = new ArrayList(keys.getFirst().length);
for (int i = 0; i < keys.getFirst().length; i++) {

String regionLocation = getTableRegionLocation(table, keys.getFirst()[i]);

String regionSalt = null;
if (keys.getFirst()[i].length == 0) {
regionSalt = "000";
} else {
regionSalt = Bytes.toString(keys.getFirst()[i]).substring(0, 3);
}
byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart);
byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop);

InputSplit split = new TableSplit(table.getTableName(), startRowKey, endRowKey, regionLocation);
splits.add(split);
}
log.info("Total table splits: " + splits.size());
return splits;
}

Użyj niestandardowego formatu TableInoutFormat w sterowniku MapReduce

Teraz musimy zastąpić TableInputFormat klasa z niestandardową kompilacją, której użyliśmy do konfiguracji zadania MapReduce tabeli.

Configuration conf = getConf();
conf = HBaseConfiguration.addHbaseResources(conf);
HTableInterface status_table = new HTable(conf, status_tablename);

conf.set("logical.scan.start", "2015-04-26");
conf.set("logical.scan.stop", "2015-04-27");

Scan scan = new Scan();
scan.setCaching(1000);
scan.setCacheBlocks(false);
scan.setBatch(1000);
scan.setMaxVersions(1);

/* Setup the table mapper job */
TableMapReduceUtil.initTableMapperJob(
tablename,
scan,
DataScanMapper.class,
ImmutableBytesWritable.class,
KeyValue.class,
job, 
true, 
MultiRangeTableInputFormat.class
);

Podejście niestandardowego TableInputFormat zapewnia wydajne i skalowalne możliwości skanowania tabel HBase, które są zaprojektowane do używania soli w celu zrównoważonego obciążenia danych. Ponieważ skanowanie może ominąć wszelkie niepowiązane klucze wierszy, niezależnie od wielkości tabeli, złożoność skanowania jest ograniczona tylko do rozmiaru danych docelowych. W większości przypadków może to zagwarantować względnie stały czas przetwarzania w miarę wzrostu tabeli.


  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. Instrukcje:dołączanie bibliotek innych firm do zadania MapReduce

  2. Wydanie CDH 6.2:Co nowego w HBase

  3. Wprowadzenie do lokalizacji danych w Hadoop MapReduce

  4. Złącze Spark HBase – rok w przeglądzie

  5. HBase:5 wskazówek dotyczących uruchamiania na EC2 o małej ilości pamięci