EDYTUJ 27.01.2018:
Okazuje się, że ten problem jest związany z DirectRunnerem. Jeśli uruchomisz ten sam potok przy użyciu DataflowRunner, powinieneś otrzymać partie, które w rzeczywistości mają maksymalnie 1000 rekordów. DirectRunner zawsze tworzy pakiety o rozmiarze 1 po operacji grupowania.
Oryginalna odpowiedź:
Ten sam problem napotkałem podczas pisania do baz danych w chmurze przy użyciu JdbcIO Apache Beam. Problem polega na tym, że chociaż JdbcIO obsługuje zapisywanie do 1000 rekordów w jednej partii, tak naprawdę nigdy nie widziałem, aby zapisywał więcej niż 1 wiersz na raz (muszę przyznać:zawsze używałem DirectRunnera w środowisku programistycznym).
Dlatego dodałem funkcję do JdbcIO, dzięki której możesz samodzielnie kontrolować rozmiar partii, grupując dane i zapisując każdą grupę jako jedną partię. Poniżej znajduje się przykład korzystania z tej funkcji na podstawie oryginalnego przykładu WordCount z Apache Beam.
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
// Count words in input file(s)
.apply(new CountWords())
// Format as text
.apply(MapElements.via(new FormatAsTextFn()))
// Make key-value pairs with the first letter as the key
.apply(ParDo.of(new FirstLetterAsKey()))
// Group the words by first letter
.apply(GroupByKey.<String, String> create())
// Get a PCollection of only the values, discarding the keys
.apply(ParDo.of(new GetValues()))
// Write the words to the database
.apply(JdbcIO.<String> writeIterable()
.withDataSourceConfiguration(
JdbcIO.DataSourceConfiguration.create(options.getJdbcDriver(), options.getURL()))
.withStatement(INSERT_OR_UPDATE_SQL)
.withPreparedStatementSetter(new WordCountPreparedStatementSetter()));
Różnica w stosunku do normalnej metody zapisu JdbcIO polega na nowej metodzie writeIterable()
który przyjmuje PCollection<Iterable<RowT>>
jako dane wejściowe zamiast PCollection<RowT>
. Każdy element iterowany jest zapisywany w bazie danych jako jedna partia.
Wersję JdbcIO z tym dodatkiem można znaleźć tutaj:https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java /org/apache/beam/sdk/io/jdbc/JdbcIO.java
Cały przykładowy projekt zawierający powyższy przykład można znaleźć tutaj:https://github.com/ olavloite/przykład-wiązki-klucza
(W Apache Beam istnieje również oczekujące żądanie ściągnięcia, aby uwzględnić to w projekcie)