Uważam, że TypeError
pochodzi z multiprocessing
get
.
Usunąłem cały kod DB z twojego skryptu. Spójrz na to:
import multiprocessing
import sqlalchemy.exc
def do(kwargs):
i = kwargs['i']
print i
raise sqlalchemy.exc.ProgrammingError("", {}, None)
return i
pool = multiprocessing.Pool(processes=5) # start 4 worker processes
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously
# Use get or wait?
# r.get()
r.wait()
pool.close()
pool.join()
print results
Używanie r.wait
zwraca oczekiwany wynik, ale używając r.get
podnosi TypeError
. Jak opisano w dokumentacji Pythona
, użyj r.wait
po map_async
.
Edytuj :Muszę zmienić moją poprzednią odpowiedź. Teraz wierzę, że TypeError
pochodzi z SQLAlchemy. Zmieniłem skrypt, aby odtworzyć błąd.
Edytuj 2 :Wygląda na to, że problem polega na tym, że multiprocessing.pool
nie działa dobrze, jeśli jakikolwiek pracownik zgłasza wyjątek, którego konstruktor wymaga parametru (zobacz także tutaj
).
Zmieniłem mój skrypt, aby to podkreślić.
import multiprocessing
class BadExc(Exception):
def __init__(self, a):
'''Non-optional param in the constructor.'''
self.a = a
class GoodExc(Exception):
def __init__(self, a=None):
'''Optional param in the constructor.'''
self.a = a
def do(kwargs):
i = kwargs['i']
print i
raise BadExc('a')
# raise GoodExc('a')
return i
pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
# set a timeout in order to be able to catch C-c
r.get(1e100)
except KeyboardInterrupt:
pass
print results
W twoim przypadku, biorąc pod uwagę, że twój kod zgłasza wyjątek SQLAlchemy, jedynym rozwiązaniem, jakie przychodzi mi do głowy, jest przechwycenie wszystkich wyjątków w do
funkcji i ponownie zgłoś normalny Exception
zamiast. Coś takiego:
import multiprocessing
class BadExc(Exception):
def __init__(self, a):
'''Non-optional param in the constructor.'''
self.a = a
def do(kwargs):
try:
i = kwargs['i']
print i
raise BadExc('a')
return i
except Exception as e:
raise Exception(repr(e))
pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
# set a timeout in order to be able to catch C-c
r.get(1e100)
except KeyboardInterrupt:
pass
print results
Edytuj 3 :więc wydaje się, że jest to błąd w Pythonie , ale odpowiednie wyjątki w SQLAlchemy mogą to obejść:dlatego zgłosiłem problem z SQLAlchemy .
Jako obejście problemu myślę, że rozwiązanie na końcu Edytuj 2 zrobi (zawijanie wywołań zwrotnych w try-except i re-raise).