W Apache Kafka aplikacje Java zwane producentami zapisują ustrukturyzowane wiadomości do klastra Kafka (składającego się z brokerów). Podobnie aplikacje Java zwane konsumentami odczytują te komunikaty z tego samego klastra. W niektórych organizacjach istnieją różne grupy odpowiedzialne za tworzenie i zarządzanie producentami i konsumentami. W takich przypadkach jednym z głównych problemów może być koordynacja uzgodnionego formatu wiadomości między producentami a konsumentami.
Ten przykład pokazuje, jak używać Apache Avro do serializacji rekordów, które są tworzone w Apache Kafka, jednocześnie umożliwiając ewolucję schematów i niesynchroniczną aktualizację aplikacji producentów i konsumentów.
Serializacja i deserializacja
Rekord Kafki (dawniej nazywany komunikatem) składa się z klucza, wartości i nagłówków. Kafka nie jest świadomy struktury danych w kluczu i wartości rekordów. Obsługuje je jako tablice bajtów. Ale systemy odczytujące rekordy z Kafki dbają o dane w tych rekordach. Musisz więc tworzyć dane w czytelnym formacie. Używany format danych powinien
- Bądź kompaktowy
- Szybko koduj i dekoduj
- Zezwalaj na ewolucję
- Zezwalaj systemom nadrzędnym (zapisującym do klastra Kafka) i systemom podrzędnym (odczytującym z tego samego klastra Kafka) na uaktualnianie do nowszych schematów w różnym czasie
Na przykład JSON nie wymaga wyjaśnień, ale nie jest kompaktowym formatem danych i wolno go analizować. Avro to szybka platforma serializacji, która tworzy stosunkowo kompaktowe dane wyjściowe. Ale aby odczytać rekordy Avro, potrzebujesz schematu, z którym dane zostały zserializowane.
Jedną z opcji jest przechowywanie i przesyłanie schematu z samym rekordem. Jest to w porządku w pliku, w którym przechowujesz schemat raz i używasz go do dużej liczby rekordów. Przechowywanie schematu w każdym rekordzie Kafka powoduje jednak znaczne obciążenie w zakresie przestrzeni dyskowej i wykorzystania sieci. Inną opcją jest uzgodnienie zestawu mapowań schematów identyfikatorów i odwoływanie się do schematów według ich identyfikatorów w rekordzie.
Od obiektu do nagrania Kafki iz powrotem
Aplikacje producentów nie muszą konwertować danych bezpośrednio na tablice bajtów. KafkaProducer to klasa ogólna, która wymaga od użytkownika określenia typów kluczy i wartości. Następnie producenci akceptują instancje ProducerRecord
które mają te same parametry typu. Konwersja z obiektu na tablicę bajtów jest wykonywana przez Serializer. Kafka dostarcza kilka prymitywnych serializatorów:na przykład IntegerSerializer
, ByteArraySerializer
, StringSerializer
. Po stronie konsumenta podobne Deserializatory konwertują tablice bajtów na obiekt, z którym aplikacja może sobie poradzić.
Dlatego sensowne jest podłączenie się na poziomie Serializer i Deserializer i umożliwienie programistom aplikacji producenckich i konsumenckich korzystanie z wygodnego interfejsu zapewnianego przez Kafkę. Chociaż najnowsze wersje Kafki pozwalają na ExtendedSerializers
i ExtendedDeserializers
aby uzyskać dostęp do nagłówków, postanowiliśmy uwzględnić identyfikator schematu w kluczu i wartości rekordów Kafki zamiast dodawać nagłówki rekordów.
Podstawy Avro
Avro to framework do serializacji danych (i zdalnego wywoływania procedur). Używa dokumentu JSON zwanego schematem do opisywania struktur danych. Większość zastosowań Avro odbywa się za pośrednictwem GenericRecord lub podklas SpecificRecord. Klasy Java wygenerowane ze schematów Avro są podklasami tych ostatnich, podczas gdy te pierwsze mogą być używane bez wcześniejszej znajomości struktury danych, z którą się pracuje.
Gdy dwa schematy spełniają zestaw reguł zgodności, dane zapisane za pomocą jednego schematu (nazywanego schematem zapisu) mogą być odczytywane tak, jakby zostały zapisane za pomocą drugiego (nazywanego schematem czytnika). Schematy mają formę kanoniczną, która zawiera wszystkie szczegóły nieistotne dla serializacji, takie jak komentarze, usunięte w celu ułatwienia kontroli równoważności.
VersionedSchema i SchemaProvider
Jak wspomniano wcześniej, potrzebujemy mapowania jeden do jednego między schematami i ich identyfikatorami. Czasami łatwiej jest odwoływać się do schematów po nazwach. Po utworzeniu zgodnego schematu można go uznać za kolejną wersję schematu. W ten sposób możemy odwoływać się do schematów za pomocą nazwy, pary wersji. Nazwijmy schemat, jego identyfikator, nazwę i wersję razem jako VersionedSchema
. Ten obiekt może zawierać dodatkowe metadane wymagane przez aplikację.
public class VersionedSchema { private final int id; private final String name; private final int version; private final Schema schema; public VersionedSchema(int id, String name, int version, Schema schema) { this.id = id; this.name = name; this.version = version; this.schema = schema; } public String getName() { return name; } public int getVersion() { return version; } public Schema getSchema() { return schema; } public int getId() { return id; } }
SchemaProvider
obiekty mogą wyszukiwać instancje VersionedSchema
.
public interface SchemaProvider extends AutoCloseable { public VersionedSchema get(int id); public VersionedSchema get(String schemaName, int schemaVersion); public VersionedSchema getMetadata(Schema schema); }
Sposób implementacji tego interfejsu opisano w artykule „Implementowanie sklepu ze schematami” w przyszłym poście na blogu.
Serializowanie danych ogólnych
Podczas serializacji rekordu najpierw musimy ustalić, którego schematu użyć. Każdy rekord ma getSchema
metoda. Ale znalezienie identyfikatora ze schematu może być czasochłonne. Ogólnie rzecz biorąc, bardziej wydajne jest ustawienie schematu w czasie inicjalizacji. Można to zrobić bezpośrednio według identyfikatora lub nazwy i wersji. Co więcej, podczas tworzenia dla wielu tematów, możemy chcieć ustawić różne schematy dla różnych tematów i znaleźć schemat na podstawie nazwy tematu dostarczonej jako parametr do metody serialize(T, String)
. Ta logika została pominięta w naszych przykładach ze względu na zwięzłość i prostotę.
private VersionedSchema getSchema(T data, String topic) { return schemaProvider.getMetadata( data.getSchema()); }
Mając w ręku schemat, musimy go przechowywać w naszej wiadomości. Serializacja identyfikatora jako części wiadomości daje nam kompaktowe rozwiązanie, ponieważ cała magia dzieje się w serializatorze/deserializatorze. Umożliwia również bardzo łatwą integrację z innymi frameworkami i bibliotekami, które już obsługują Kafkę i pozwala użytkownikowi używać własnego serializatora (takiego jak Spark).
Stosując to podejście, najpierw zapisujemy identyfikator schematu na pierwszych czterech bajtach.
private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException { try (DataOutputStream os = new DataOutputStream(stream)) { os.writeInt(id); } }
Następnie możemy utworzyć DatumWriter
i zserializuj obiekt.
private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException { BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(stream, null); DatumWriter<T> datumWriter = new GenericDatumWriter<>(schema); datumWriter.write(data, encoder); encoder.flush(); }
Podsumowując, zaimplementowaliśmy ogólny serializator danych.
public class KafkaAvroSerializer<T extends GenericContainer> implements Serializer<T> { private SchemaProvider schemaProvider; @Override public void configure(Map<String, ?> configs, boolean isKey) { schemaProvider = SchemaUtils.getSchemaProvider(configs); } @Override public byte[] serialize(String topic, T data) { try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) { VersionedSchema schema = getSchema(data, topic); writeSchemaId(stream, schema.getId()); writeSerializedAvro(stream, data, schema.getSchema()); return stream.toByteArray(); } catch (IOException e) { throw new RuntimeException("Could not serialize data", e); } } private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException {...} private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException {...} private VersionedSchema getSchema(T data, String topic) {...} @Override public void close() { try { schemaProvider.close(); } catch (Exception e) { throw new RuntimeException(e); } } }
Deserializacja danych ogólnych
Deserializacja może działać z pojedynczym schematem (za pomocą którego zapisano dane schematu), ale można określić inny schemat czytnika. Schemat czytnika musi być zgodny ze schematem, z którym dane zostały serializowane, ale nie musi być równoważny. Z tego powodu wprowadziliśmy nazwy schematów. Możemy teraz określić, że chcemy odczytywać dane z określoną wersją schematu. W czasie inicjalizacji odczytujemy żądane wersje schematu na nazwę schematu i przechowujemy metadane w readerSchemasByName
dla szybkiego dostępu. Teraz możemy odczytać każdy rekord zapisany w kompatybilnej wersji schematu tak, jakby był zapisany w określonej wersji.
@Override public void configure(Map<String, ?> configs, boolean isKey) { this.schemaProvider = SchemaUtils.getSchemaProvider(configs); this.readerSchemasByName = SchemaUtils.getVersionedSchemas(configs, schemaProvider); }
Gdy rekord wymaga deserializacji, najpierw odczytujemy identyfikator schematu programu piszącego. Umożliwia to wyszukanie schematu czytnika według nazwy. Mając dostępne oba schematy, możemy utworzyć GeneralDatumReader
i przeczytaj zapis.
@Override public GenericData.Record deserialize(String topic, byte[] data) { try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) { int schemaId = readSchemaId(stream); VersionedSchema writerSchema = schemaProvider.get(schemaId); VersionedSchema readerSchema = readerSchemasByName.get(writerSchema.getName()); GenericData.Record avroRecord = readAvroRecord(stream, writerSchema.getSchema(), readerSchema.getSchema()); return avroRecord; } catch (IOException e) { throw new RuntimeException(e); } } private int readSchemaId(InputStream stream ) throws IOException { try(DataInputStream is = new DataInputStream(stream)) { return is.readInt(); } } private GenericData.Record readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException { DatumReader<Object> datumReader = new GenericDatumReader<>(writerSchema, readerSchema); BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null); GenericData.Record record = new GenericData.Record(readerSchema); datumReader.read(record, decoder); return record; }
Radzenie sobie z określonymi rekordami
Najczęściej jest jedna klasa, której chcemy użyć do naszych rekordów. Ta klasa jest następnie zwykle generowana na podstawie schematu Avro. Apache Avro udostępnia narzędzia do generowania kodu Java ze schematów. Jednym z takich narzędzi jest wtyczka Avro Maven. Wygenerowane klasy mają schemat, z którego zostały wygenerowane, dostępny w czasie wykonywania. Dzięki temu serializacja i deserializacja są prostsze i bardziej efektywne. Do serializacji możemy użyć klasy, aby dowiedzieć się, jaki identyfikator schematu ma zostać użyty.
@Override public void configure(Map<String, ?> configs, boolean isKey) { String className = configs.get(isKey ? KEY_RECORD_CLASSNAME : VALUE_RECORD_CLASSNAME).toString(); try (SchemaProvider schemaProvider = SchemaUtils.getSchemaProvider(configs)) { Class<?> recordClass = Class.forName(className); Schema writerSchema = new SpecificData(recordClass.getClassLoader()).getSchema(recordClass); this.writerSchemaId = schemaProvider.getMetadata(writerSchema).getId(); } catch (Exception e) { throw new RuntimeException(e); } }
W ten sposób nie potrzebujemy logiki, aby określić schemat z tematu i danych. Używamy schematu dostępnego w klasie rekordów do zapisywania rekordów.
Podobnie w przypadku deserializacji schemat czytnika można znaleźć w samej klasie. Logika deserializacji staje się prostsza, ponieważ schemat czytnika jest ustalany w czasie konfiguracji i nie trzeba go wyszukiwać według nazwy schematu.
@Override public T deserialize(String topic, byte[] data) { try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) { int schemaId = readSchemaId(stream); VersionedSchema writerSchema = schemaProvider.get(schemaId); return readAvroRecord(stream, writerSchema.getSchema(), readerSchema); } catch (IOException e) { throw new RuntimeException(e); } } private T readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException { DatumReader<T> datumReader = new SpecificDatumReader<>(writerSchema, readerSchema); BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null); return datumReader.read(null, decoder); }
Dodatkowe czytanie
Aby uzyskać więcej informacji na temat zgodności schematów, zapoznaj się ze specyfikacją Avro dotyczącą rozwiązywania schematów.
Aby uzyskać więcej informacji na temat formularzy kanonicznych, zapoznaj się ze specyfikacją Avro dotyczącą Parsing Canonical Form for Schemas.
Następnym razem…
Część 2 pokaże implementację systemu do przechowywania definicji schematów Avro.