PostgreSQL
 sql >> Baza danych >  >> RDS >> PostgreSQL

Django ORM przecieka połączenia podczas korzystania z ThreadPoolExecutor

Domyślam się, że ThreadPoolExecutor to nie jest to, co tworzy połączenie DB, ale zadania wątkowe to te, które utrzymują połączenie. Miałem już z tym do czynienia.

Skończyło się na zbudowaniu tego opakowania, aby upewnić się, że wątki są zamykane ręcznie, gdy zadania są wykonywane w ThreadPoolExecutor. Powinno to być przydatne w zapewnieniu, że połączenia nie są przeciekające, jak dotąd nie zauważyłem żadnego wycieku podczas korzystania z tego kodu.

from functools import wraps
from concurrent.futures import ThreadPoolExecutor
from django.db import connection

class DjangoConnectionThreadPoolExecutor(ThreadPoolExecutor):
    """
    When a function is passed into the ThreadPoolExecutor via either submit() or map(), 
    this will wrap the function, and make sure that close_django_db_connection() is called 
    inside the thread when it's finished so Django doesn't leak DB connections.

    Since map() calls submit(), only submit() needs to be overwritten.
    """
    def close_django_db_connection(self):
        connection.close()

    def generate_thread_closing_wrapper(self, fn):
        @wraps(fn)
        def new_func(*args, **kwargs):
            try:
                return fn(*args, **kwargs)
            finally:
                self.close_django_db_connection()
        return new_func

    def submit(*args, **kwargs):
        """
        I took the args filtering/unpacking logic from 
   
        https://github.com/python/cpython/blob/3.7/Lib/concurrent/futures/thread.py 
        
        so I can properly get the function object the same way it was done there.
        """
        if len(args) >= 2:
            self, fn, *args = args
            fn = self.generate_thread_closing_wrapper(fn=fn)
        elif not args:
            raise TypeError("descriptor 'submit' of 'ThreadPoolExecutor' object "
                        "needs an argument")
        elif 'fn' in kwargs:
            fn = self.generate_thread_closing_wrapper(fn=kwargs.pop('fn'))
            self, *args = args
    
        return super(self.__class__, self).submit(fn, *args, **kwargs)

Wtedy możesz po prostu użyć tego:

    with DjangoConnectionThreadPoolExecutor(max_workers=15) as executor:
        results = list(executor.map(func, args_list))

...i miej pewność, że połączenia zostaną zamknięte.




  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. Jak zaoszczędzić czas w bazie danych w Go, korzystając z GORM i Postgresql?

  2. Jak korzystać z wyzwalaczy PostgreSQL?

  3. Oblicz liczbę jednoczesnych zdarzeń w SQL

  4. jak obliczyć czas pomiędzy dwoma znacznikami czasu (PostgreSQL)

  5. Czy tablicę PostgreSQL można zoptymalizować pod kątem łączenia?