Mysql
 sql >> Baza danych >  >> RDS >> Mysql

Google Dataflow (Apache beam) JdbcIO zbiorczo wstawiamy do bazy danych mysql

EDYTUJ 27.01.2018:

Okazuje się, że ten problem jest związany z DirectRunnerem. Jeśli uruchomisz ten sam potok przy użyciu DataflowRunner, powinieneś otrzymać partie, które w rzeczywistości mają maksymalnie 1000 rekordów. DirectRunner zawsze tworzy pakiety o rozmiarze 1 po operacji grupowania.

Oryginalna odpowiedź:

Ten sam problem napotkałem podczas pisania do baz danych w chmurze przy użyciu JdbcIO Apache Beam. Problem polega na tym, że chociaż JdbcIO obsługuje zapisywanie do 1000 rekordów w jednej partii, tak naprawdę nigdy nie widziałem, aby zapisywał więcej niż 1 wiersz na raz (muszę przyznać:zawsze używałem DirectRunnera w środowisku programistycznym).

Dlatego dodałem funkcję do JdbcIO, dzięki której możesz samodzielnie kontrolować rozmiar partii, grupując dane i zapisując każdą grupę jako jedną partię. Poniżej znajduje się przykład korzystania z tej funkcji na podstawie oryginalnego przykładu WordCount z Apache Beam.

p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
    // Count words in input file(s)
    .apply(new CountWords())
    // Format as text
    .apply(MapElements.via(new FormatAsTextFn()))
    // Make key-value pairs with the first letter as the key
    .apply(ParDo.of(new FirstLetterAsKey()))
    // Group the words by first letter
    .apply(GroupByKey.<String, String> create())
    // Get a PCollection of only the values, discarding the keys
    .apply(ParDo.of(new GetValues()))
    // Write the words to the database
    .apply(JdbcIO.<String> writeIterable()
            .withDataSourceConfiguration(
                JdbcIO.DataSourceConfiguration.create(options.getJdbcDriver(), options.getURL()))
            .withStatement(INSERT_OR_UPDATE_SQL)
            .withPreparedStatementSetter(new WordCountPreparedStatementSetter()));

Różnica w stosunku do normalnej metody zapisu JdbcIO polega na nowej metodzie writeIterable() który przyjmuje PCollection<Iterable<RowT>> jako dane wejściowe zamiast PCollection<RowT> . Każdy element iterowany jest zapisywany w bazie danych jako jedna partia.

Wersję JdbcIO z tym dodatkiem można znaleźć tutaj:https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java /org/apache/beam/sdk/io/jdbc/JdbcIO.java

Cały przykładowy projekt zawierający powyższy przykład można znaleźć tutaj:https://github.com/ olavloite/przykład-wiązki-klucza

(W Apache Beam istnieje również oczekujące żądanie ściągnięcia, aby uwzględnić to w projekcie)




  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. Jak uzyskać łączną liczbę użytkowników dziennie w MySQL?

  2. Wybierz i zaktualizuj w tym samym zapytaniu

  3. Kryteria hibernacji z samodołączaniem

  4. Jak wdrożyć MySQL na Ubuntu i w pełni zarządzany?

  5. Importuj duży plik MySQL .sql w systemie Windows za pomocą Force