Wprowadzenie
Niektóre właściwości konfiguracyjne znajdujące się w Apache Hadoop mają bezpośredni wpływ na klientów, takich jak Apache HBase. Jedna z tych właściwości nosi nazwę „dfs.datanode.max.xcievers” i należy do podprojektu HDFS. Określa liczbę wątków po stronie serwera oraz – w pewnym stopniu – gniazd używanych do połączeń danych. Zbyt niskie ustawienie tej liczby może powodować problemy w miarę rozwoju lub zwiększania wykorzystania klastra. Ten post pomoże ci zrozumieć, co dzieje się między klientem a serwerem i jak określić rozsądną liczbę dla tej właściwości.
Problem
Ponieważ HBase przechowuje wszystko, czego potrzebuje w HDFS, twarda górna granica narzucona przez właściwość konfiguracji „dfs.datanode.max.xcievers” może spowodować, że HBase będzie mieć zbyt mało zasobów, co objawia się jako wyjątki IOException po obu stronach połączenia. Oto przykład z listy mailingowej HBase [1], gdzie następujące wiadomości były początkowo rejestrowane po stronie RegionServer:
2008-11-11 19:55:52,451 INFO org.apache.hadoop.dfs.DFSClient: Wyjątek w createBlockOutputStream java.io.IOException:Nie można odczytać ze strumienia
2008-11-11 19:55:52,451 INFO org.apache.hadoop.dfs.DFSClient: Porzucanie bloku blk_-5467014108758633036_595771
2008-11- 11 19:55:58,455 WARN org.apache.hadoop.dfs.DFSClient: DataStreamer Wyjątek:java.io.IOException:nie można utworzyć nowego bloku.
2008-11-11 19:55:58,455 WARN org.apache .hadoop.dfs.DFSClient:Odzyskiwanie błędu dla bloku blk_-5467014108758633036_595771 bad datanode[0]
2008-11-11 19:55:58,482 KRYTYCZNY org.apache.hadoop.hbase.regionserver.Flusher:Wymagane ponowne odtworzenie hloga . Wymuszanie zamknięcia serwera
Korelowanie tego z dziennikami Hadoop DataNode ujawniło następujący wpis:
BŁĄD org.apache.hadoop.dfs.DataNode: DatanodeRegistration(10.10.10.53:50010,storageID=DS-1570581820-10.10.10.53-50010-1224117842339,infoPort=50075, ipcPort=50020):DataXceiver:java.io.IOException: xceiverCount 258 przekracza limit równoczesnych serwerów 256
W tym przykładzie niska wartość „dfs.datanode.max.xcievers” dla DataNodes spowodowała zamknięcie całego serwera RegionServer. To naprawdę zła sytuacja. Niestety nie ma twardej i szybkiej reguły, która wyjaśniałaby, jak obliczyć wymagany limit. Powszechnie zaleca się zwiększenie liczby z domyślnej 256 do około 4096 (patrz [1], [2], [3], [4] i [5] w celach informacyjnych). Odbywa się to poprzez dodanie tej właściwości do pliku hdfs-site.xml wszystkich DataNodes (zauważ, że jest to błędna pisownia):
Uwaga:Po wprowadzeniu tej zmiany w pliku konfiguracyjnym konieczne będzie ponowne uruchomienie DataNodes.
Powinno to pomóc w rozwiązaniu powyższego problemu, ale nadal możesz chcieć dowiedzieć się więcej o tym, jak to wszystko działa razem i co HBase robi z tymi zasobami. Omówimy to w dalszej części tego postu. Ale zanim to zrobimy, musimy wyjaśnić, dlaczego nie można po prostu ustawić tej liczby na bardzo wysoką, powiedzmy 64K i skończyć z tym.
Powód dla górnej granicy jest dwojaki:po pierwsze, wątki potrzebują własnego stosu, co oznacza, że zajmują pamięć. W przypadku obecnych serwerów oznacza to domyślnie 1 MB na wątek[6]. Innymi słowy, jeśli zużyjesz wszystkie 4096 wątków DataXceiver, potrzebujesz około 4 GB sterty, aby je pomieścić. Wcina się to w miejsce, które przydzieliłeś dla pamięci pamięci i pamięci podręcznych bloków, a także wszystkich innych ruchomych części JVM. W najgorszym przypadku możesz napotkać wyjątek OutOfMemoryException, a proces RegionServer jest ugotowany. Chcesz ustawić tę właściwość na dość wysoką liczbę, ale też nie za dużą.
Po drugie, mając tak wiele aktywnych wątków, zobaczysz również coraz większe obciążenie procesora. Będzie wiele zmian kontekstu, aby obsłużyć całą równoczesną pracę, co zabiera zasoby do rzeczywistej pracy. Podobnie jak w przypadku obaw o pamięć, chcesz, aby liczba wątków nie rosła bezgranicznie, ale zapewniła rozsądną górną granicę – i do tego służy „dfs.datanode.max.xcievers”.
Szczegóły systemu plików Hadoop
Po stronie klienta biblioteka HDFS zapewnia abstrakcję o nazwie Path. Ta klasa reprezentuje plik w systemie plików obsługiwanym przez usługę Hadoop, reprezentowanym przez klasę FileSystem. Istnieje kilka konkretnych implementacji abstrakcyjnej klasy FileSystem, z których jedną jest DistributedFileSytem, reprezentujący HDFS. Ta klasa z kolei otacza rzeczywistą klasę DFSClient, która obsługuje wszystkie interakcje ze zdalnymi serwerami, tj. NameNode i wieloma DataNode.
Kiedy klient, taki jak HBase, otwiera plik, robi to, na przykład wywołując metody open() lub create() klasy FileSystem, tutaj są to najbardziej uproszczone wcielenia
publiczny DFSInputStream open(String src) zgłasza IOException
publiczny FSDataOutputStream create(Path f) zgłasza IOException
Zwrócona instancja strumienia wymaga gniazda i wątku po stronie serwera, które są używane do odczytu i zapisu bloków danych. Stanowią część umowy o wymianę danych między klientem a serwerem. Zwróć uwagę, że istnieją inne protokoły oparte na RPC, które są używane między różnymi komputerami, ale na potrzeby tej dyskusji można je zignorować.
Zwracana instancja strumienia to wyspecjalizowana klasa DFSOutputStream lub DFSInputStream, która obsługuje wszystkie interakcje z NameNode w celu ustalenia, gdzie znajdują się kopie bloków, oraz komunikacji danych na blok na DataNode.
Po stronie serwera DataNode otacza instancję DataXceiverServer, która jest rzeczywistą klasą, która odczytuje powyższy klucz konfiguracji, a także zgłasza powyższy wyjątek po przekroczeniu limitu.
Po uruchomieniu DataNode tworzy grupę wątków i uruchamia wspomnianą instancję DataXceiverServer w następujący sposób:
this.threadGroup =new ThreadGroup(„dataXceiverServer”);
this.dataXceiverServer =new Daemon( threadGroup,
nowy DataXceiverServer(ss, conf, this));
this.threadGroup.setDaemon(true); // automatyczne zniszczenie, gdy jest puste
Pamiętaj, że wątek DataXceiverServer już zajmuje jedno miejsce w grupie wątków. DataNode ma również tę wewnętrzną klasę do pobierania liczby aktualnie aktywnych wątków w tej grupie:
/** Liczba jednoczesnych serwerów odbiorczych na węzeł. */
int getXceiverCount() {
return threadGroup ==null ? 0 :threadGroup.activeCount();
}
Bloki odczytu i zapisu zainicjowane przez klienta powodują nawiązanie połączenia, które jest opakowane przez wątek DataXceiverServer w instancję DataXceiver. Podczas tego przekazania tworzony jest wątek i rejestrowany w powyższej grupie wątków. Tak więc dla każdej aktywnej operacji odczytu i zapisu nowy wątek jest śledzony po stronie serwera. Jeśli liczba wątków w grupie przekroczy skonfigurowane maksimum, wspomniany wyjątek jest zgłaszany i rejestrowany w logach DataNode:
if (curXceiverCount> dataXceiverServer.maxXceiverCount) {
throw new IOException(„xceiverCount ” + curXceiverCount
+ ” przekracza limit jednoczesnych serwerów ”
+ dataXceiverServer.maxXceiverCount);
}
Implikacje dla klientów
Teraz pytanie brzmi, w jaki sposób klient odczytuje i zapisuje wątki po stronie serwera. Zanim jednak przejdziemy do szczegółów, użyjmy informacji debugowania, które klasa DataXceiver rejestruje podczas tworzenia i zamykania
LOG.debug(„Liczba aktywnych połączeń to:” + datanode.getXceiverCount());
…
LOG.debug(datanode.dnRegistration + “:Liczba aktywnych połączeń to:” + datanode.getXceiverCount());
i monitoruj podczas startu HBase, co jest rejestrowane w DataNode. Dla uproszczenia odbywa się to w konfiguracji pseudorozproszonej z pojedynczą instancją DataNode i RegionServer. Poniżej przedstawiono górę strony stanu serwera RegionServer.
Ważna część znajduje się w sekcji „Metryki”, gdzie jest napisane „storefiles=22”. Tak więc, zakładając, że HBase ma co najmniej tyle plików do obsłużenia, plus kilka dodatkowych plików do dziennika zapisu z wyprzedzeniem, powinniśmy zobaczyć powyższy komunikat dzienników, że mamy co najmniej 22 „aktywne połączenia”. Uruchommy HBase i sprawdźmy pliki dziennika DataNode i RegionServer:
Wiersz poleceń:
$ bin/start-hbase.sh
…
Dziennik DataNode:
2012-03-05 13:01:35,309 DEBUG org.apache.hadoop.hdfs.server.datanode. DataNode:Liczba aktywnych połączeń to:1
2012-03-05 13:01:35,315 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration(127.0.0.1:50010, storageID=DS- 1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Liczba aktywnych połączeń wynosi:2
12/03/05 13:01:35 INFO regionserver.MemStoreFlusher:globalMemStoreLimit=396.7m, globalMemStoreLimitLowMark=347.1m, maxHeap=991.7m
12/03/05 13:01:39 INFO http.HttpServer:Port zwrócony przez webServer.getConnectors()[0].getLocalPort() przed open() wynosi -1 . Otwarcie listenera na 60030
2012-03-05 13:01:40,003 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Liczba aktywnych połączeń wynosi:1
12/03/05 13:01:40 INFO regionserver.HRegionServer:Otrzymano żądanie otwarcia regionu:-ROOT-,,0.70236052
2012-03-05 13:01:40,882 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode :Liczba aktywnych połączeń to:3
2012-03-05 13:01:40,884 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448 -10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Liczba aktywnych połączeń to:4
2012-03-05 13:01:40,888 DEBUG org.apache.hadoop.hdfs.server. datanode.DataNode:Liczba aktywnych połączeń wynosi:3
…
12/03/05 13:01:40 INFO regionserver.HRegion:Onlined -ROOT-,,0.70236052; next sequenceid=63083
2012-03-05 13:01:40,982 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Liczba aktywnych połączeń wynosi:3
2012-03-05 13 :01:40,983 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Liczba aktywnych połączeń wynosi:4
…
12/03/05 13:01:41 INFO regionserver.HRegionServer:Otrzymano żądanie otwarcia regionu:.META.,,1.1028785192
2012-03 -05 13:01:41.026 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Liczba aktywnych połączeń wynosi:3
2012-03-05 13:01:41.027 DEBUG org.apache.hadoop. hdfs.server.datanode.DataNode:DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Liczba aktywnych połączeń:4
…
12/03/05 13:01:41 INFO regionserver.HRRegion:Onlined .META.,,1.1028785192; next sequenceid=63082
2012-03-05 13:01:41,109 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Liczba aktywnych połączeń wynosi:3
2012-03-05 13 :01:41,114 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Liczba aktywnych połączeń wynosi:4
2012-03-05 13:01:41,117 DEBUG org.apache.hadoop.hdfs.server .datanode.DataNode:Liczba aktywnych połączeń wynosi:5
12/03/05 13:01:41 INFO regionserver.HRegionServer:Otrzymano żądanie otwarcia 16 regionów
12/03/05 13 :01:41 INFO regionserver.HRegionServer:odebrane żądanie otwarcia regionu:usertable,,1330944810191.62a312d67981c86c42b6bc02e6ec7e3f.
12/03/05 13:01:41 INFO regionserver.HRegionServer:odebrane żądanie otwarcia regionu użytkownika112031178, 1330944810191.90d287473fe223f0ddc137020efda25d.
…
2012-03-05 13:01:41,246 DEBUG org.apache.hadoop.hdfs.server.datanode. DataNode:Liczba aktywnych połączeń to:6
2012-03-05 13:01:41,248 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Liczba aktywnych połączeń to:7
…
2012-03-05 13:01:41,257 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772 , infoPort=50075, ipcPort=50020):Liczba aktywnych połączeń to:10
2012-03-05 13:01:41,257 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration(127.0. 0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Liczba aktywnych połączeń to:9
…
12/03/05 13:01:41 INFO regionserver.HRRegion:Tabela użytkownika online, użytkownik1120311784,1330944810191.90d287473fe223f0ddc137020efda25d.; następny identyfikator sekwencji=62917
12/03/05 13:01:41 INFO regionserver.HRegion:Tablica użytkownika online,,1330944810191.62a312d67981c86c42b6bc02e6ec7e3f.; następny identyfikator sekwencji=62916
…
12/03/05 13:01:41 INFO regionserver.HRegion:Tablica użytkownika online, user1361265841,1330944811370.80663fcf291e3ce00080599964f406ba.; next sequenceid=62919
2012-03-05 13:01:41,474 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Liczba aktywnych połączeń to:6
2012-03-05 13 :01:41,491 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Liczba aktywnych połączeń wynosi:7
2012-03-05 13:01:41,495 DEBUG org.apache.hadoop.hdfs.server .datanode.DataNode:DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Liczba aktywnych połączeń:8
2012-03 -05 13:01:41,508 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Liczba aktywnych połączeń wynosi:7
…
12/03/05 13:01:41 INFO regionserver .HRregion:Onlined usertable,user1964968041,1330944848231.dd89596e9129e1caa7e07f8a491c9734.; next sequenceid=62920
2012-03-05 13:01:41,618 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Liczba aktywnych połączeń wynosi:6
2012-03-05 13 :01:41,621 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Liczba aktywnych połączeń to:7
…
2012-03-05 13:01:41,829 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration(127.0.0.1:50010, storageID =DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Liczba aktywnych połączeń to:7
12/03/05 13:01:41 INFO regionserver.HRregion:Tablica użytkowników online ,user515290649,1330944849739.d23924dc9e9d5891f332c337977af83d.; next sequenceid=62926
2012-03-05 13:01:41,832 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Liczba aktywnych połączeń wynosi:6
2012-03-05 13 :01:41,838 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Liczba aktywnych połączeń to:7
12/03/05 13:01:41 INFO regionserver.HRegion:Onlined usertable,user757669512,1330944850808.cd0d6f16d8ae9cf0c9277f5d6c6c6b9f.; next sequenceid=62929
…
2012-03-05 14:01:39,711 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Liczba aktywnych połączeń wynosi:4
2012 -03-05 22:48:41,945 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Liczba aktywnych połączeń to:4
12/03/05 22:48:41 INFO regionserver.HRegion:Onlined usertable,user757669512,1330944850808.cd0d6f16d8ae9cf0c9277f5d6c6c6b9f.; następny identyfikator sekwencji=62929
2012-03-05 22:48:41,963 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64 -50010-1321352233772, infoPort=50075, ipcPort=50020):Liczba aktywnych połączeń wynosi:4
Możesz zobaczyć, jak regiony są otwierane jeden po drugim, ale możesz też zauważyć, że liczba aktywnych połączeń nigdy nie wzrasta do 22 – ledwie dochodzi do 10. Dlaczego? Aby lepiej to zrozumieć, musimy zobaczyć, jak pliki w HDFS mapują się do instancji DataXceiver po stronie serwera – i rzeczywiste wątki, które reprezentują.
Głębokie nurkowanie w Hadoop
Wspomniane wcześniej DFSInputStream i DFSOutputStream są tak naprawdę fasadą wokół zwykłych koncepcji strumieni. Opakowują komunikację klient-serwer w te standardowe interfejsy Java, jednocześnie kierując ruch wewnętrznie do wybranego DataNode – czyli tego, który przechowuje kopię bieżącego bloku. Ma swobodę otwierania i zamykania tych połączeń w razie potrzeby. Gdy klient odczytuje plik w HDFS, klasy biblioteki klienta przełączają się niezauważalnie z bloku na blok, a zatem z DataNode na DataNode, więc musi otwierać i zamykać połączenia w razie potrzeby.
DFSInputStream ma instancję klasy DFSClient.BlockReader, która otwiera połączenie z DataNode. Instancja strumienia wywołuje funkcję blockSeekTo() przy każdym wywołaniu funkcji read(), która zajmuje się otwarciem połączenia, jeśli już nie istnieje. Po całkowitym odczytaniu bloku połączenie zostaje zamknięte. Zamknięcie strumienia ma oczywiście ten sam efekt.
DFSOutputStream ma podobną klasę pomocniczą, DataStreamer. Śledzi połączenie z serwerem, które jest inicjowane przez metodę nextBlockOutputStream(). Ma dodatkowe klasy wewnętrzne, które pomagają w wypisaniu danych bloku, które pomijamy tutaj ze względu na zwięzłość.
Zarówno bloki zapisu, jak i odczytu wymagają wątku, który przechowuje gniazdo i dane pośrednie po stronie serwera, opakowane w instancję DataXceiver. W zależności od tego, co robi twój klient, zobaczysz, że liczba połączeń oscyluje wokół liczby aktualnie dostępnych plików w HDFS.
Wracając do zagadki HBase powyżej:powodem, dla którego nie widzisz do 22 (i więcej) połączeń podczas startu, jest to, że gdy regiony są otwarte, jedynymi wymaganymi danymi jest blok informacyjny HFile. Ten blok jest odczytywany w celu uzyskania istotnych informacji o każdym pliku, ale następnie ponownie zamykany. Oznacza to, że zasób po stronie serwera jest zwalniany w krótkich odstępach czasu. Pozostałe cztery połączenia są trudniejsze do ustalenia. Możesz użyć JStack, aby zrzucić wszystkie wątki na DataNode, co w tym przykładzie pokazuje następujący wpis:
"DataXceiver dla klienta /127.0.0.1:64281 [blok wysyłania blk_5532741233443227208_4201]" demon prio=5 tid=7fb96481d000 nid=0x1178b4000 uruchamialny [1178b3000]
java.lang.Thread.State:RUNNABLE
…
"DataXceiver dla klienta /127.0.0.1:64172 [blok odbiorczy blk_-2005512129579433420_4199 client=DFSClient_hb_rs_10.0.0.29 ,60020,1330984111693_1330984118810]” demon prio=5 tid=7fb966109000 nid=0x1169cb000 uruchamialny [1169ca000]
java.lang.Thread.State:RUNNABLE
…
Są to jedyne wpisy DataXceiver (w tym przykładzie), więc liczba w grupie wątków jest nieco myląca. Przypomnijmy, że wątek demona DataXceiverServer uwzględnia już jeden dodatkowy wpis, który w połączeniu z dwoma powyższymi uwzględnia trzy aktywne połączenia – co w rzeczywistości oznacza trzy aktywne wątki. Powodem, dla którego dziennik podaje cztery, jest to, że rejestruje liczbę z aktywnego wątku, który ma się zakończyć. Tak więc, wkrótce po zarejestrowaniu liczby czterech, w rzeczywistości jest to o jeden mniej, tj. trzy, a zatem odpowiada naszej liczbie aktywnych wątków.
Zauważ również, że wewnętrzne klasy pomocnicze, takie jak PacketResponder, zajmują inny wątek w grupie, gdy są aktywne. Dane wyjściowe JStack wskazują ten fakt, wymieniając wątek jako taki:
„PacketResponder 0 for Block blk_-2005512129579433420_4199” demon prio=5 tid=7fb96384d000 nid=0x116ace000 w Object.wait () [116acd000]
java.lang.Thread.State:TIMED_WAITING (na monitorze obiektów)
w java.lang.Object.wait (metoda natywna)
w org.apache.hadoop. hdfs.server.datanode.BlockReceiver$PacketResponder \
.lastDataNodeRun(BlockReceiver.java:779)
– zablokowany (a org.apache.hadoop.hdfs.server.datanode.BlockReceiver$PacketResponder)
w org.apache.hadoop.hdfs.server.datanode.BlockReceiver$PacketResponder.run(BlockReceiver.java:870)
w java.lang.Thread.run(Thread.java:680)
Ten wątek jest obecnie w stanie TIMED_WAITING i nie jest uważany za aktywny. Dlatego licznik emitowany przez instrukcje dziennika DataXceiver nie obejmuje tego rodzaju wątków. Jeśli staną się aktywne w wyniku wysyłania danych przez klienta, liczba aktywnych wątków ponownie wzrośnie. Inną rzeczą, na którą należy zwrócić uwagę, jest to, że ten wątek nie potrzebuje oddzielnego połączenia ani gniazda między klientem a serwerem. PacketResponder to tylko wątek po stronie serwera, który odbiera dane blokowe i przesyła je strumieniowo do następnego DataNode w potoku zapisu.
Polecenie Hadoop fsck ma również opcję zgłaszania plików, które są aktualnie otwarte do zapisu:
$ hadoop fsck /hbase -openforwrite
FSCK rozpoczęte przez larsgeorge z /10.0.0.29 dla ścieżki / hbase w Mon Mar 05 22:59:47 CET 2012
……/hbase/.logs/10.0.0.29,60020,1330984111693/10.0.0.29%3A60020.1330984118842 0 bajtów, 1 blok(i), OPENFORWRITE:………………………………..Stan:ZDROWY
Łączny rozmiar: 2088783626 B
Łączna liczba katalogów: 54
Łączna liczba plików: 45
…
Nie odnosi się to od razu do zajętego wątku po stronie serwera, ponieważ są one przydzielane przez identyfikator bloku. Ale możesz z tego wywnioskować, że jest jeden otwarty blok do pisania. Polecenie Hadoop ma dodatkowe opcje drukowania rzeczywistych plików i identyfikatora bloku, z których się składają:
$ hadoop fsck /hbase -files -blocks
FSCK uruchomiony przez larsgeorge z /10.0.0.29 dla ścieżka /hbase we wtorek, 6 marca, 10:39:50 CET 2012
…
/hbase/.META./1028785192/.tmp
/hbase/.META./1028785192/info
/hbase/.META./1028785192/info/4027596949915293355 36517 bajtów, 1 blok(i):OK
0. blk_5532741233443227208_4201 len=36517 repl=1
…
Stan:ZDROWY
Całkowity rozmiar: 2088788703 B
Całkowite katalogi : 54
Łączna liczba plików: 45 (Pliki aktualnie zapisywane:1)
Łączna liczba bloków (zweryfikowane): 64 (średni rozmiar bloku 32637323 B) (Całkowita liczba bloków otwartych plików (niesprawdzone):1)
Minimalnie replikowane bloki: 64 (100.0 %)
…
To daje dwie rzeczy. Po pierwsze, podsumowanie stwierdza, że w momencie uruchomienia polecenia istnieje jeden otwarty blok pliku – zgodny z liczbą zgłoszoną przez opcję „-openforwrite” powyżej. Po drugie, lista bloków obok każdego pliku pozwala dopasować nazwę wątku do pliku, który zawiera blok, do którego uzyskujemy dostęp. W tym przykładzie blok o identyfikatorze „blk_5532741233443227208_4201” jest wysyłany z serwera do klienta, tutaj RegionServer. Ten blok należy do HBase .META. tabeli, jak pokazano w danych wyjściowych polecenia Hadoop fsck. Kombinacja JStack i fsck może służyć jako kiepski zamiennik mans dla lsof (narzędzie w wierszu poleceń Linuksa do „wyświetlania otwartych plików”).
JStack zgłasza również, że istnieje wątek DataXceiver, z towarzyszącym PacketResponder, dla bloku o identyfikatorze „blk_-2005512129579433420_4199”, ale tego identyfikatora nie ma na liście bloków zgłaszanych przez fsck. Dzieje się tak, ponieważ blok nie jest jeszcze ukończony i dlatego nie jest dostępny dla czytelników. Innymi słowy, Hadoop fsck zgłasza tylko pełne (lub zsynchronizowane[7][8], w przypadku wersji Hadoop obsługującej tę funkcję) bloki.
Powrót do HBase
Otwarcie wszystkich regionów nie wymaga tak wielu zasobów na serwerze, jak można by się spodziewać. Jeśli jednak zeskanujesz całą tabelę HBase, zmusisz HBase do odczytania wszystkich bloków we wszystkich HFiles:
Powłoka HBase:
hbase(main):003:0> skanuj 'usertable'
…
1000000 wiersze za 1460.3120 sekund
Dziennik DataNode:
2012-03-05 14:42:20,580 DEBUG org.apache.hadoop.hdfs.server.datanode. DataNode:Liczba aktywnych połączeń to:6
2012-03-05 14:43:23,293 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Liczba aktywnych połączeń to:7
2012 -03-05 14:43:23,299 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Liczba aktywnych połączeń to:8
…
2012-03-05 14:49:24,332 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration(127.0. 0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Liczba aktywnych połączeń to:11
2012-03-05 14:49:24,332 DEBUG org .apache.hadoop.hdfs.server.datanode.DataNode:Liczba aktywnych połączeń wynosi:10
2012-03-05 14:49:59,987 DEBUG org.apache.hadoop.hdfs.server.datanod e.DataNode:Liczba aktywnych połączeń wynosi:11
2012-03-05 14:51:12,603 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Liczba aktywnych połączeń to:12
2012-03-05 14:51:12,605 DEBUG org.apache.hadoop.hdfs .server.datanode.DataNode:Liczba aktywnych połączeń to:11
2012-03-05 14:51:46,473 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Liczba aktywnych połączeń to:12
…
2012-03-05 14:56:59,420 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Liczba aktywnych połączeń wynosi:15
2012-03-05 14:57:31.722 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Liczba aktywnych połączeń wynosi:16
2012-03-05 14:58:24,909 DEBUG org.apache.hadoop.hdfs. server.datanode.DataNode:DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Numer aktu pięć połączeń to:17
2012-03-05 14:58:24,910 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Liczba aktywnych połączeń to:16
…
2012-03-05 15:04:17,688 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Liczba aktywnych połączeń wynosi:21
2012-03-05 15:04:17,689 DEBUG org.apache .hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Liczba aktywnych połączeń:22
2012-03-05 15:04:54,545 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Liczba aktywnych połączeń wynosi:21
2012-03-05 15:05:55,901 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Liczba aktywnych połączeń wynosi :22
2012-03-05 15:05:55,901 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Liczba aktywnych połączeń wynosi:21
Liczba aktywnych połączeń sięga teraz nieuchwytnych 22. Zauważ, że ta liczba zawiera już wątek serwera, więc wciąż brakuje nam tego, co moglibyśmy uznać za teoretyczne maksimum – w oparciu o liczbę plików, które HBase musi obsłużyć.
Co to wszystko znaczy?
Ile więc „xcievers (sic)” potrzebujesz? Biorąc pod uwagę, że używasz tylko HBase, możesz po prostu monitorować powyższą metrykę „storefiles” (którą otrzymujesz również za pośrednictwem Ganglia lub JMX) i dodać kilka procent dla plików dziennika pośredniego i zapisu z wyprzedzeniem. Powinno to działać w przypadku systemów w ruchu. Jeśli jednak ustalisz tę liczbę w bezczynnym, w pełni skompaktowanym systemie i założysz, że jest to maksymalna wartość, może się okazać, że ta liczba jest zbyt niska, gdy zaczniesz dodawać więcej plików w sklepie podczas regularnych opróżnień memstore, tj. gdy tylko zaczniesz dodaj dane do tabel HBase. Lub jeśli używasz również MapReduce w tym samym klastrze, agregacji dzienników Flume i tak dalej. Będziesz musiał uwzględnić te dodatkowe pliki i, co ważniejsze, otwarte bloki do czytania i pisania.
Zwróć uwagę, że przykłady w tym poście wykorzystują pojedynczy DataNode, coś, czego nie będziesz miał w prawdziwym klastrze. W tym celu będziesz musiał podzielić całkowitą liczbę plików sklepu (zgodnie z metryką HBase) przez liczbę posiadanych DataNodes. Jeśli masz na przykład liczbę plików sklepu 1000, a twój klaster ma 10 DataNode, powinieneś mieć domyślną liczbę 256 wątków xceiver na DataNode.
Najgorszym przypadkiem byłaby liczba wszystkich aktywnych czytelników i pisarzy, czyli tych, którzy aktualnie wysyłają lub odbierają dane. Ale ponieważ trudno to ustalić z wyprzedzeniem, warto rozważyć budowanie przyzwoitej rezerwy. Ponadto, ponieważ proces pisania wymaga dodatkowego – choć krócej działającego – wątku (dla PacketResponder), musisz to również uwzględnić. Rozsądną, ale raczej uproszczoną formułą może być:
Ta formuła uwzględnia, że potrzebujesz około dwóch wątków dla aktywnego pisarza i jednego dla aktywnego czytelnika. Jest to następnie sumowane i dzielone przez liczbę DataNode, ponieważ musisz określić „dfs.datanode.max.xcievers” na DataNode.
Jeśli wrócisz do powyższego zrzutu ekranu HBase RegionServer, zobaczysz, że istnieją 22 pliki sklepu. These are immutable and will only be read, or in other words occupy one thread only. For all memstores that are flushed to disk you need two threads – but only until they are fully written. The files are finalized and closed for good, cleaning up any thread in the process. So these come and go based on your flush frequency. Same goes for compactions, they will read N files and write them into a single new one, then finalize the new file. As for the write-ahead logs, these will occupy a thread once you have started to add data to any table. There is a log file per server, meaning that you can only have twice as many active threads for these files as you have RegionServers.
For a pure HBase setup (HBase plus its own HDFS, with no other user), we can estimate the number of needed DataXceiver’s with the following formula:
Since you will be hard pressed to determine the active number of store files, flushes, and so on, it might be better to estimate the theoretical maximum instead. This maximum value takes into account that you can only have a single flush and compaction active per region at any time. The maximum number of logs you can have active matches the number of RegionServers, leading us to this formula:
Obviously, the number of store files will increase over time, and the number of regions typically as well. Same for the numbers of servers, so keep in mind to adjust this number over time. In practice, you can add a buffer of, for example, 20%, as shown in the formula below – in an attempt to not force you to change the value too often.
On the other hand, if you keep the number of regions fixed per server[9], and rather split them manually, while adding new servers as you grow, you should be able to keep this configuration property stable for each server.
Final Advice &TL;DR
Here is the final formula you want to use:
It computes the maximum number of threads needed, based on your current HBase vitals (no. of store files, regions, and region servers). It also adds a fudge factor of 20% to give you room for growth. Keep an eye on the numbers on a regular basis and adjust the value as needed. You might want to use Nagios with appropriate checks to warn you when any of the vitals goes over a certain percentage of change.
Note:Please make sure you also adjust the number of file handles your process is allowed to use accordingly[10]. This affects the number of sockets you can use, and if that number is too low (default is often 1024), you will get connection issues first.
Finally, the engineering devil on one of your shoulders should already have started to snicker about how horribly non-Erlang-y this is, and how you should use an event driven approach, possibly using Akka with Scala[11] – if you want to stay within the JVM world. Bear in mind though that the clever developers in the community share the same thoughts and have already started to discuss various approaches[12][13].
Links:
- [1] http://old.nabble.com/Re%3A-xceiverCount-257-exceeds-the-limit-of-concurrent-xcievers-256-p20469958.html
- [2] http://ccgtech.blogspot.com/2010/02/hadoop-hdfs-deceived-by-xciever.html
- [3] https://issues.apache.org/jira/browse/HDFS-1861 “Rename dfs.datanode.max.xcievers and bump its default value”
- [4] https://issues.apache.org/jira/browse/HDFS-1866 “Document dfs.datanode.max.transfer.threads in hdfs-default.xml”
- [5] http://hbase.apache.org/book.html#dfs.datanode.max.xcievers
- [6] http://www.oracle.com/technetwork/java/hotspotfaq-138619.html#threads_oom
- [7] https://issues.apache.org/jira/browse/HDFS-200 “In HDFS, sync() not yet guarantees data available to the new readers”
- [8] https://issues.apache.org/jira/browse/HDFS-265 “Revisit append”
- [9] http://search-hadoop.com/m/CBBoV3z24H1 “HBase, mail # user – region size/count per regionserver”
- [10] http://hbase.apache.org/book.html#ulimit “ulimit and nproc”
- [11] http://akka.io/ “Akka”
- [12] https://issues.apache.org/jira/browse/HDFS-223 “Asynchronous IO Handling in Hadoop and HDFS”
- [13] https://issues.apache.org/jira/browse/HDFS-918 “Use single Selector and small thread pool to replace many instances of BlockSender for reads”