PostgreSQL
 sql >> Baza danych >  >> RDS >> PostgreSQL

Zawieś się w skrypcie Pythona przy użyciu SQLAlchemy i wieloprocesowości

Uważam, że TypeError pochodzi z multiprocessing get .

Usunąłem cały kod DB z twojego skryptu. Spójrz na to:

import multiprocessing
import sqlalchemy.exc

def do(kwargs):
    i = kwargs['i']
    print i
    raise sqlalchemy.exc.ProgrammingError("", {}, None)
    return i


pool = multiprocessing.Pool(processes=5)               # start 4 worker processes
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously

# Use get or wait?
# r.get()
r.wait()

pool.close()
pool.join()
print results

Używanie r.wait zwraca oczekiwany wynik, ale używając r.get podnosi TypeError . Jak opisano w dokumentacji Pythona , użyj r.wait po map_async .

Edytuj :Muszę zmienić moją poprzednią odpowiedź. Teraz wierzę, że TypeError pochodzi z SQLAlchemy. Zmieniłem skrypt, aby odtworzyć błąd.

Edytuj 2 :Wygląda na to, że problem polega na tym, że multiprocessing.pool nie działa dobrze, jeśli jakikolwiek pracownik zgłasza wyjątek, którego konstruktor wymaga parametru (zobacz także tutaj ).

Zmieniłem mój skrypt, aby to podkreślić.

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

class GoodExc(Exception):
    def __init__(self, a=None):
        '''Optional param in the constructor.'''
        self.a = a

def do(kwargs):
    i = kwargs['i']
    print i
    raise BadExc('a')
    # raise GoodExc('a')
    return i

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

W twoim przypadku, biorąc pod uwagę, że twój kod zgłasza wyjątek SQLAlchemy, jedynym rozwiązaniem, jakie przychodzi mi do głowy, jest przechwycenie wszystkich wyjątków w do funkcji i ponownie zgłoś normalny Exception zamiast. Coś takiego:

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

def do(kwargs):
    try:
        i = kwargs['i']
        print i
        raise BadExc('a')
        return i
    except Exception as e:
        raise Exception(repr(e))

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

Edytuj 3 :więc wydaje się, że jest to błąd w Pythonie , ale odpowiednie wyjątki w SQLAlchemy mogą to obejść:dlatego zgłosiłem problem z SQLAlchemy .

Jako obejście problemu myślę, że rozwiązanie na końcu Edytuj 2 zrobi (zawijanie wywołań zwrotnych w try-except i re-raise).



  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. Dwa SQL LEFT JOINS dają niepoprawny wynik

  2. Jak uruchomić serwer PostgreSQL na Mac OS X?

  3. Mnożenie dwóch kolumn, które zostały obliczone na podstawie instrukcji CASE

  4. Postgresql usuwa wiele wierszy z wielu tabel

  5. Jak pobrać dane z wielu powiązanych tabel w Postgresie?