PostgreSQL
 sql >> Baza danych >  >> RDS >> PostgreSQL

WSTAW lub AKTUALIZUJ dane zbiorcze z dataframe/CSV do bazy danych PostgreSQL

W tym konkretnym przypadku lepiej jest zejść na poziom DB-API, ponieważ potrzebujesz narzędzi, które nie są ujawniane nawet bezpośrednio przez SQLAlchemy Core, takich jak copy_expert() . Można to zrobić za pomocą raw_connection() . Jeśli Twoje dane źródłowe to plik CSV, w tym przypadku nie potrzebujesz w ogóle pand. Zacznij od utworzenia tymczasowej tabeli pomostowej, skopiuj dane do tabeli tymczasowej i wstaw do tabeli docelowej z obsługą konfliktów:

conn = engine.raw_connection()

try:
    with conn.cursor() as cur:
        cur.execute("""CREATE TEMPORARY TABLE TEST_STAGING ( LIKE TEST_TABLE )
                       ON COMMIT DROP""")

        with open("your_source.csv") as data:
            cur.copy_expert("""COPY TEST_STAGING ( itemid, title, street, pincode )
                               FROM STDIN WITH CSV""", data)

        cur.execute("""INSERT INTO TEST_TABLE ( itemid, title, street, pincode )
                       SELECT itemid, title, street, pincode
                       FROM TEST_STAGING
                       ON CONFLICT ( itemid )
                       DO UPDATE SET title = EXCLUDED.title
                                   , street = EXCLUDED.street
                                   , pincode = EXCLUDED.pincode""")

except:
    conn.rollback()
    raise

else:
    conn.commit()

finally:
    conn.close()

Jeśli z drugiej strony Twoje dane źródłowe to DataFrame , nadal możesz użyć COPY przez przekazanie funkcji jako metody method= do to_sql() . Funkcja może nawet ukryć całą powyższą logikę:

import csv

from io import StringIO
from psycopg2 import sql

def psql_upsert_copy(table, conn, keys, data_iter):
    dbapi_conn = conn.connection

    buf = StringIO()
    writer = csv.writer(buf)
    writer.writerows(data_iter)
    buf.seek(0)

    if table.schema:
        table_name = sql.SQL("{}.{}").format(
            sql.Identifier(table.schema), sql.Identifier(table.name))
    else:
        table_name = sql.Identifier(table.name)

    tmp_table_name = sql.Identifier(table.name + "_staging")
    columns = sql.SQL(", ").join(map(sql.Identifier, keys))

    with dbapi_conn.cursor() as cur:
        # Create the staging table
        stmt = "CREATE TEMPORARY TABLE {} ( LIKE {} ) ON COMMIT DROP"
        stmt = sql.SQL(stmt).format(tmp_table_name, table_name)
        cur.execute(stmt)

        # Populate the staging table
        stmt = "COPY {} ( {} ) FROM STDIN WITH CSV"
        stmt = sql.SQL(stmt).format(tmp_table_name, columns)
        cur.copy_expert(stmt, buf)

        # Upsert from the staging table to the destination. First find
        # out what the primary key columns are.
        stmt = """
               SELECT kcu.column_name
               FROM information_schema.table_constraints tco
               JOIN information_schema.key_column_usage kcu 
               ON kcu.constraint_name = tco.constraint_name
               AND kcu.constraint_schema = tco.constraint_schema
               WHERE tco.constraint_type = 'PRIMARY KEY'
               AND tco.table_name = %s
               """
        args = (table.name,)

        if table.schema:
            stmt += "AND tco.table_schema = %s"
            args += (table.schema,)

        cur.execute(stmt, args)
        pk_columns = {row[0] for row in cur.fetchall()}
        # Separate "data" columns from (primary) key columns
        data_columns = [k for k in keys if k not in pk_columns]
        # Build conflict_target
        pk_columns = sql.SQL(", ").join(map(sql.Identifier, pk_columns))

        set_ = sql.SQL(", ").join([
            sql.SQL("{} = EXCLUDED.{}").format(k, k)
            for k in map(sql.Identifier, data_columns)])

        stmt = """
               INSERT INTO {} ( {} )
               SELECT {}
               FROM {}
               ON CONFLICT ( {} )
               DO UPDATE SET {}
               """

        stmt = sql.SQL(stmt).format(
            table_name, columns, columns, tmp_table_name, pk_columns, set_)
        cur.execute(stmt)

Następnie wstawisz nową ramkę DataFrame za pomocą

df.to_sql("test_table", engine,
          method=psql_upsert_copy,
          index=False,
          if_exists="append")

Korzystając z tej metody, przekształcenie ~1 000 000 wierszy na tej maszynie z lokalną bazą danych zajęło około 16 sekund.




  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. Jak uruchomić PostgreSQL w systemie Windows?

  2. Pobierz wartość pola z rekordu, który powoduje, że warunek agregacji jest prawdziwy

  3. Jak zapobiec usunięciu pierwszego wiersza w tabeli (PostgreSQL)?

  4. Wiele zapytań SHOW TRANSACTION ISOLATION LEVEL w postgresie

  5. Optymalizacja zapytania licznika dla PostgreSQL