Naprawdę interesuje mnie, co byłoby szybsze, więc przetestowałem kilka możliwych sposobów ich porównania:
- proste
executemany
bez sztuczek. - to samo z
APPEND_VALUES
wskazówka w oświadczeniu. union all
podejście, które próbowałeś w innym pytaniu. Powinno to być wolniejsze niż powyżej, ponieważ generuje naprawdę bardzo duże oświadczenie (które potencjalnie może wymagać więcej sieci niż same dane). Następnie powinien zostać przeanalizowany po stronie DB, co również zajmie dużo czasu i pominie wszystkie korzyści (nie mówiąc o potencjalnym limicie rozmiaru). Następnie mamexecutemany
Przygotowałem go do testowania z fragmentami, aby nie budować pojedynczej instrukcji dla 100 tys. rekordów. Nie użyłem konkatenacji wartości wewnątrz instrukcji, ponieważ chciałem, aby było bezpieczne.insert all
. Te same wady, ale bez związków. Porównaj to zunion
wersja.- serializuj dane w formacie JSON i wykonaj deserializację po stronie bazy danych za pomocą
json_table
. Potencjalnie dobra wydajność z pojedynczą krótką instrukcją i pojedynczym transferem danych z niewielkim obciążeniem JSON. - Sugerowane
FORALL
w procedurze wrappera PL/SQL. Powinien być taki sam jakexecutemany
ponieważ robi to samo, ale po stronie bazy danych. Narzut związany z przekształceniem danych w kolekcję. - Ten sam
FORALL
, ale z podejściem kolumnowym do przekazywania danych:przekaż proste listy wartości kolumn zamiast typu złożonego. Powinien być znacznie szybszy niżFORALL
z kolekcją, ponieważ nie ma potrzeby serializacji danych do typu kolekcji.
Korzystałem z Oracle Autonomous Database w Oracle Cloud z bezpłatnym kontem. Każdą metodę wykonywano 10 razy w pętli z tym samym zestawem danych wejściowych liczącym 100 tys. rekordów, przed każdym testem odtworzono tabelę. To jest wynik jaki mam. Czasy przygotowania i wykonania to odpowiednio transformacja danych po stronie klienta końcowego wywołania DB.
>>> t = PerfTest(100000)
>>> t.run("exec_many", 10)
Method: exec_many.
Duration, avg: 2.3083874 s
Preparation time, avg: 0.0 s
Execution time, avg: 2.3083874 s
>>> t.run("exec_many_append", 10)
Method: exec_many_append.
Duration, avg: 2.6031369 s
Preparation time, avg: 0.0 s
Execution time, avg: 2.6031369 s
>>> t.run("union_all", 10, 10000)
Method: union_all.
Duration, avg: 27.9444233 s
Preparation time, avg: 0.0408773 s
Execution time, avg: 27.8457551 s
>>> t.run("insert_all", 10, 10000)
Method: insert_all.
Duration, avg: 70.6442494 s
Preparation time, avg: 0.0289269 s
Execution time, avg: 70.5541995 s
>>> t.run("json_table", 10)
Method: json_table.
Duration, avg: 10.4648237 s
Preparation time, avg: 9.7907693 s
Execution time, avg: 0.621006 s
>>> t.run("forall", 10)
Method: forall.
Duration, avg: 5.5622837 s
Preparation time, avg: 1.8972456000000002 s
Execution time, avg: 3.6650380999999994 s
>>> t.run("forall_columnar", 10)
Method: forall_columnar.
Duration, avg: 2.6702698000000002 s
Preparation time, avg: 0.055710800000000005 s
Execution time, avg: 2.6105702 s
>>>
Najszybszym sposobem jest po prostu executemany
, nie tyle niespodzianka. Interesujące jest to, że APPEND_VALUES
nie poprawia zapytania i uzyskuje średnio więcej czasu, więc wymaga to dokładniejszego zbadania.
O FORALL
:zgodnie z oczekiwaniami, indywidualna tablica dla każdej kolumny zajmuje mniej czasu, ponieważ nie ma dla niej przygotowania danych. Jest mniej więcej porównywalny z executemany
, ale myślę, że narzut PL/SQL odgrywa tutaj pewną rolę.
Inną interesującą częścią dla mnie jest JSON:większość czasu spędziłem na zapisywaniu LOB do bazy danych i serializacji, ale samo zapytanie było bardzo szybkie. Może operację zapisu można w jakiś sposób poprawić za pomocą chuncsize lub w inny sposób przekazywania danych LOB do instrukcji select, ale w moim kodzie jest to dalekie od bardzo prostego i prostego podejścia z executemany
.
Istnieją również możliwe podejścia bez Pythona, które powinny będą szybsze jako natywne narzędzia do danych zewnętrznych, ale ich nie testowałem:
Poniżej znajduje się kod, którego użyłem do testowania.
import cx_Oracle as db
import os, random, json
import datetime as dt
class PerfTest:
def __init__(self, size):
self._con = db.connect(
os.environ["ora_cloud_usr"],
os.environ["ora_cloud_pwd"],
"test_low",
encoding="UTF-8"
)
self._cur = self._con.cursor()
self.inp = [(i, "Test {i}".format(i=i), random.random()) for i in range(size)]
def __del__(self):
if self._con:
self._con.rollback()
self._con.close()
#Create objets
def setup(self):
try:
self._cur.execute("drop table rand")
#print("table dropped")
except:
pass
self._cur.execute("""create table rand(
id int,
str varchar2(100),
val number
)""")
self._cur.execute("""create or replace package pkg_test as
type ts_test is record (
id rand.id%type,
str rand.str%type,
val rand.val%type
);
type tt_test is table of ts_test index by pls_integer;
type tt_ids is table of rand.id%type index by pls_integer;
type tt_strs is table of rand.str%type index by pls_integer;
type tt_vals is table of rand.val%type index by pls_integer;
procedure write_data(p_data in tt_test);
procedure write_data_columnar(
p_ids in tt_ids,
p_strs in tt_strs,
p_vals in tt_vals
);
end;""")
self._cur.execute("""create or replace package body pkg_test as
procedure write_data(p_data in tt_test)
as
begin
forall i in indices of p_data
insert into rand(id, str, val)
values (p_data(i).id, p_data(i).str, p_data(i).val)
;
commit;
end;
procedure write_data_columnar(
p_ids in tt_ids,
p_strs in tt_strs,
p_vals in tt_vals
) as
begin
forall i in indices of p_ids
insert into rand(id, str, val)
values (p_ids(i), p_strs(i), p_vals(i))
;
commit;
end;
end;
""")
def build_union(self, size):
return """insert into rand(id, str, val)
select id, str, val from rand where 1 = 0 union all
""" + """ union all """.join(
["select :{}, :{}, :{} from dual".format(i*3+1, i*3+2, i*3+3)
for i in range(size)]
)
def build_insert_all(self, size):
return """
""".join(
["into rand(id, str, val) values (:{}, :{}, :{})".format(i*3+1, i*3+2, i*3+3)
for i in range(size)]
)
#Test case with executemany
def exec_many(self):
start = dt.datetime.now()
self._cur.executemany("insert into rand(id, str, val) values (:1, :2, :3)", self.inp)
self._con.commit()
return (dt.timedelta(0), dt.datetime.now() - start)
#The same as above but with prepared statement (no parsing)
def exec_many_append(self):
start = dt.datetime.now()
self._cur.executemany("insert /*+APPEND_VALUES*/ into rand(id, str, val) values (:1, :2, :3)", self.inp)
self._con.commit()
return (dt.timedelta(0), dt.datetime.now() - start)
#Union All approach (chunked). Should have large parse time
def union_all(self, size):
##Chunked list of big tuples
start_prepare = dt.datetime.now()
new_inp = [
tuple([item for t in r for item in t])
for r in list(zip(*[iter(self.inp)]*size))
]
new_stmt = self.build_union(size)
dur_prepare = dt.datetime.now() - start_prepare
#Execute unions
start_exec = dt.datetime.now()
self._cur.executemany(new_stmt, new_inp)
dur_exec = dt.datetime.now() - start_exec
##In case the size is not a divisor
remainder = len(self.inp) % size
if remainder > 0 :
start_prepare = dt.datetime.now()
new_stmt = self.build_union(remainder)
new_inp = tuple([
item for t in self.inp[-remainder:] for item in t
])
dur_prepare += dt.datetime.now() - start_prepare
start_exec = dt.datetime.now()
self._cur.execute(new_stmt, new_inp)
dur_exec += dt.datetime.now() - start_exec
self._con.commit()
return (dur_prepare, dur_exec)
#The same as union all, but with no need to union something
def insert_all(self, size):
##Chunked list of big tuples
start_prepare = dt.datetime.now()
new_inp = [
tuple([item for t in r for item in t])
for r in list(zip(*[iter(self.inp)]*size))
]
new_stmt = """insert all
{}
select * from dual"""
dur_prepare = dt.datetime.now() - start_prepare
#Execute
start_exec = dt.datetime.now()
self._cur.executemany(
new_stmt.format(self.build_insert_all(size)),
new_inp
)
dur_exec = dt.datetime.now() - start_exec
##In case the size is not a divisor
remainder = len(self.inp) % size
if remainder > 0 :
start_prepare = dt.datetime.now()
new_inp = tuple([
item for t in self.inp[-remainder:] for item in t
])
dur_prepare += dt.datetime.now() - start_prepare
start_exec = dt.datetime.now()
self._cur.execute(
new_stmt.format(self.build_insert_all(remainder)),
new_inp
)
dur_exec += dt.datetime.now() - start_exec
self._con.commit()
return (dur_prepare, dur_exec)
#Serialize at server side and do deserialization at DB side
def json_table(self):
start_prepare = dt.datetime.now()
new_inp = json.dumps([
{ "id":t[0], "str":t[1], "val":t[2]} for t in self.inp
])
lob_var = self._con.createlob(db.DB_TYPE_CLOB)
lob_var.write(new_inp)
start_exec = dt.datetime.now()
self._cur.execute("""
insert into rand(id, str, val)
select id, str, val
from json_table(
to_clob(:json), '$[*]'
columns
id int,
str varchar2(100),
val number
)
""", json=lob_var)
dur_exec = dt.datetime.now() - start_exec
self._con.commit()
return (start_exec - start_prepare, dur_exec)
#PL/SQL with FORALL
def forall(self):
start_prepare = dt.datetime.now()
collection_type = self._con.gettype("PKG_TEST.TT_TEST")
record_type = self._con.gettype("PKG_TEST.TS_TEST")
def recBuilder(x):
rec = record_type.newobject()
rec.ID = x[0]
rec.STR = x[1]
rec.VAL = x[2]
return rec
inp_collection = collection_type.newobject([
recBuilder(i) for i in self.inp
])
start_exec = dt.datetime.now()
self._cur.callproc("pkg_test.write_data", [inp_collection])
dur_exec = dt.datetime.now() - start_exec
return (start_exec - start_prepare, dur_exec)
#PL/SQL with FORALL and plain collections
def forall_columnar(self):
start_prepare = dt.datetime.now()
ids, strs, vals = map(list, zip(*self.inp))
start_exec = dt.datetime.now()
self._cur.callproc("pkg_test.write_data_columnar", [ids, strs, vals])
dur_exec = dt.datetime.now() - start_exec
return (start_exec - start_prepare, dur_exec)
#Run test
def run(self, method, iterations, *args):
#Cleanup schema
self.setup()
start = dt.datetime.now()
runtime = []
for i in range(iterations):
single_run = getattr(self, method)(*args)
runtime.append(single_run)
dur = dt.datetime.now() - start
dur_prep_total = sum([i.total_seconds() for i, _ in runtime])
dur_exec_total = sum([i.total_seconds() for _, i in runtime])
print("""Method: {meth}.
Duration, avg: {run_dur} s
Preparation time, avg: {prep} s
Execution time, avg: {ex} s""".format(
inp_s=len(self.inp),
meth=method,
run_dur=dur.total_seconds() / iterations,
prep=dur_prep_total / iterations,
ex=dur_exec_total / iterations
))