Mysql
 sql >> Baza danych >  >> RDS >> Mysql

Używanie Pythona i MySQL w procesie ETL:Używanie Pythona i SQLAlchemy

W poprzednich dwóch artykułach z tej serii omówiliśmy, jak używać Pythona i SQLAlchemy do wykonywania procesu ETL. Dzisiaj zrobimy to samo, ale tym razem używając Pythona i SQL Alchemy bez poleceń SQL w formacie tekstowym. Umożliwi nam to korzystanie z SQLAlchemy niezależnie od silnika bazy danych, z którym jesteśmy połączeni. Więc zacznijmy.

Dzisiaj omówimy, jak wykonać proces ETL za pomocą Pythona i SQLAlchemy. Stworzymy skrypt do wydobywania codziennych danych z naszej operacyjnej bazy danych, przekształcania ich, a następnie ładowania do naszej hurtowni danych.

To trzeci artykuł z tej serii. Jeśli nie przeczytałeś dwóch pierwszych artykułów (Korzystanie z Pythona i MySQL w procesie ETL i SQLAlchemy), gorąco zachęcam do zrobienia tego przed kontynuowaniem.

Cała ta seria jest kontynuacją naszej serii hurtowni danych:

  • Tworzenie DWH, część pierwsza:model danych biznesowych subskrypcji
  • Tworzenie DWH, część druga:model danych biznesowych subskrypcji
  • Tworzenie hurtowni danych, część 3:model danych biznesowych subskrypcji

Dobra, teraz zacznijmy od dzisiejszego tematu. Najpierw spójrzmy na modele danych.

Modele danych



Operacyjny (na żywo) model danych bazy danych




Model danych DWH


Oto dwa modele danych, których będziemy używać. Więcej informacji o hurtowniach danych (DWH) znajdziesz w tych artykułach:

  • Schemat gwiazdy
  • Schemat płatka śniegu
  • Schemat gwiazdy kontra schemat płatka śniegu

Dlaczego SQLAlchemy?

Cała idea SQLAlchemy polega na tym, że po zaimportowaniu baz danych nie potrzebujemy kodu SQL, który jest specyficzny dla powiązanego silnika bazy danych. Zamiast tego możemy importować obiekty do SQLAlchemy i używać składni SQLAlchemy dla instrukcji. To pozwoli nam używać tego samego języka bez względu na silnik bazy danych, z którym jesteśmy połączeni. Główną zaletą jest to, że programista nie musi dbać o różnice między różnymi silnikami baz danych. Twój program SQLAlchemy będzie działał dokładnie tak samo (z niewielkimi zmianami) po migracji do innego silnika bazy danych.

Zdecydowałem się używać tylko poleceń SQLAlchemy i list Pythona do komunikacji z tymczasowym magazynem i między różnymi bazami danych. Głównym powodem tej decyzji jest to, że 1) listy Pythona są dobrze znane i 2) kod byłby czytelny dla osób bez umiejętności Pythona.

Nie oznacza to, że SQLAlchemia jest idealna. Ma pewne ograniczenia, które omówimy później. Na razie spójrzmy tylko na poniższy kod:

Uruchamianie skryptu i wyniku

To jest polecenie Pythona używane do wywołania naszego skryptu. Skrypt sprawdza dane w operacyjnej bazie danych, porównuje wartości z DWH i importuje nowe wartości. W tym przykładzie aktualizujemy wartości w dwóch tabelach wymiarów i jednej tabeli faktów; skrypt zwraca odpowiednie dane wyjściowe. Cały skrypt jest napisany tak, abyś mógł go uruchamiać wiele razy dziennie. Usunie „stare” dane z tego dnia i zastąpi je nowymi.

Przeanalizujmy cały skrypt, zaczynając od góry.

Importowanie SQLAlchemy

Pierwszą rzeczą, którą musimy zrobić, to zaimportować moduły, których użyjemy w skrypcie. Zazwyczaj będziesz importować swoje moduły podczas pisania skryptu. W większości przypadków na początku nie będziesz wiedzieć dokładnie, jakich modułów będziesz potrzebować.

from datetime import date

# import SQLAlchemy
from sqlalchemy import create_engine, select, MetaData, Table, and_, func, case

Zaimportowaliśmy datetime Pythona moduł, który dostarcza nam klas, które pracują z datami.

Następnie mamy sqlalchemy moduł. Nie zaimportujemy całego modułu, tylko to, czego potrzebujemy – niektóre specyficzne dla SQLAlchemy (create_engine , MetaData , Table ), niektóre części instrukcji SQL (select , and_ , case ) i func , który umożliwia nam korzystanie z funkcji takich jak count() i sum() .

Łączenie z bazami danych

Będziemy musieli połączyć się z dwiema bazami danych na naszym serwerze. W razie potrzeby moglibyśmy połączyć się z większą liczbą baz danych (MySQL, SQL Server lub dowolną inną) z różnych serwerów. W tym przypadku obie bazy danych są bazami danych MySQL i są przechowywane na moim lokalnym komputerze.

# connect to databases
engine_live = sqlalchemy.create_engine('mysql+pymysql://:@localhost:3306/subscription_live')
connection_live = engine_live.connect()
engine_dwh = sqlalchemy.create_engine('mysql+pymysql://:@localhost:3306/subscription_dwh')
connection_dwh = engine_dwh.connect()

metadata = MetaData(bind=None)

Stworzyliśmy dwa silniki i dwa połączenia. Nie będę tu wchodzić w szczegóły, ponieważ wyjaśniliśmy to już w poprzednim artykule.

Aktualizacja dim_time Wymiar

Cel:Wstaw wczorajszą datę, jeśli nie jest jeszcze w tabeli.

W naszym skrypcie zaktualizujemy dwie tabele wymiarów o nowe wartości. Pozostałe postępują według tego samego wzoru, więc omówimy to tylko raz; nie musimy jeszcze kilka razy zapisywać prawie identycznego kodu.

Pomysł jest bardzo prosty. Zawsze uruchomimy skrypt, aby wstawić nowe dane na wczoraj. Dlatego musimy sprawdzić, czy ta data została wstawiona do tabeli wymiarów. Jeśli już tam jest, nic nie zrobimy; jeśli nie, dodamy to. Rzućmy okiem na kod, aby zaktualizować dim_time tabela.

Najpierw sprawdzimy, czy data istnieje. Jeśli nie istnieje, dodamy go. Zaczynamy od zapisania wczorajszej daty w zmiennej. W Pythonie robisz to w ten sposób:

yesterday = date.fromordinal(date.today().toordinal()-1)
yesterday_str = str(yesterday)

Pierwszy wiersz przyjmuje bieżącą datę, konwertuje ją na wartość liczbową, odejmuje 1 od tej wartości i konwertuje tę wartość liczbową z powrotem na datę (wczoraj =dzisiaj – 1 ). Druga linia przechowuje datę w formacie tekstowym.

Następnie sprawdzimy, czy data jest już w bazie danych:

table_dim_time = Table('dim_time', metadata, autoload = True, autoload_with = engine_dwh)
stmt = select([table_dim_time]).where(table_dim_time.columns.time_date == yesterday_str)
result = connection_dwh.execute(stmt).fetchall()
date_exists = len(result)

Po załadowaniu tabeli uruchomimy zapytanie, które powinno zwrócić wszystkie wiersze z tabeli wymiarów, w których wartość czasu/daty jest równa wczoraj. Wynik może mieć 0 (brak takiej daty w tabeli) lub 1 wiersz (data jest już w tabeli).

Jeśli data nie jest jeszcze w tabeli, użyjemy polecenia insert(), aby ją dodać:

if date_exists == 0:
  print("New value added.")
  stmt = table_dim_time.insert().values(time_date=yesterday, time_year=yesterday.year, time_month=yesterday.month, time_week=yesterday.isocalendar()[1], time_weekday=yesterday.weekday())
  connection_dwh.execute(stmt)
else:
  print("No new values.")

Jedną z nowych rzeczy, na którą chciałbym zwrócić uwagę, jest użycie. .year , .month , .isocalendar()[1] i .weekday aby uzyskać daty.

Aktualizacja dim_city Wymiar

Cel:Wstaw nowe miasta, jeśli istnieją (np. porównaj listę miast w aktywnej bazie danych z listą miast w DWH i dodaj brakujące).

Aktualizacja dim_time wymiar był dość prosty. Po prostu sprawdziliśmy, czy data jest w tabeli i wstawiliśmy ją, jeśli jeszcze jej tam nie było. Aby przetestować wartość w bazie danych DWH, użyliśmy zmiennej Pythona (wczoraj ). Użyjemy tego procesu ponownie, ale tym razem z listami.

Ponieważ nie ma łatwego sposobu łączenia tabel z różnych baz danych w jednym zapytaniu SQLAlchemy, nie możemy zastosować podejścia opisanego w części 1 tej serii. Dlatego będziemy potrzebować obiektu do przechowywania wartości potrzebnych do komunikacji między tymi dwiema bazami danych. Zdecydowałem się użyć list, ponieważ są one powszechne i wykonują swoją pracę.

Najpierw załadujemy country i city tabele z aktywnej bazy danych do odpowiednich obiektów.

# dim_city
print("\nUpdating... dim_city")
table_city = Table('city', metadata, autoload = True, autoload_with = engine_live)
table_country = Table('country', metadata, autoload = True, autoload_with = engine_live)
table_dim_city = Table('dim_city', metadata, autoload = True, autoload_with = engine_dwh)

Następnie załadujemy dim_city tabeli z DWH na listę:

# load whole dwh table in the list
stmt = select([table_dim_city]);
table_dim_city_list = connection_dwh.execute(stmt).fetchall()

Wtedy zrobimy to samo dla wartości z aktywnej bazy danych. Dołączymy do stołów country i city więc mamy wszystkie potrzebne dane na tej liście:

# load all live values in the list
stmt = select([table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name])\
	.select_from(table_city\
	.join(table_country))
table_city_list = connection_live.execute(stmt).fetchall()

Teraz przejdziemy przez listę zawierającą dane z aktywnej bazy danych. Dla każdego rekordu porównamy wartości (city_name , postal_code i country_name ). Jeśli nie znajdziemy takich wartości, dodamy nowy rekord do dim_city tabela.

# loop through live_db table
# for each record test if it is missing in the dwh table
new_values_added = 0
for city in table_city_list:
	id = -1;
	for dim_city in table_dim_city_list:
		if city[0] == dim_city[1] and city[1] == dim_city[2] and city[2] == dim_city[3]:
			id = dim_city[0]
	if id == -1:
		stmt = table_dim_city.insert().values(city_name=city[0], postal_code=city[1], country_name=city[2])
		connection_dwh.execute(stmt)
		new_values_added = 1
if new_values_added == 0:
	print("No new values.")
else:
	print("New value(s) added.")

Aby ustalić, czy wartość jest już w DWH, przetestowaliśmy kombinację atrybutów, które powinny być unikalne. (Klucz podstawowy z aktywnej bazy danych niewiele nam tutaj pomaga.) Możemy użyć podobnego kodu do aktualizacji innych słowników. Nie jest to najładniejsze rozwiązanie, ale nadal jest całkiem eleganckie. I zrobi dokładnie to, czego potrzebujemy.

Aktualizacja fact_customer_subscribed Tabela

Cel:Jeśli mamy stare dane z wczorajszej daty, najpierw je usuń. Dodaj wczorajsze dane do DWH – niezależnie od tego, czy usunęliśmy coś w poprzednim kroku, czy nie.

Po zaktualizowaniu wszystkich tabel wymiarów powinniśmy zaktualizować tabele faktów. W naszym skrypcie zaktualizujemy tylko jedną tabelę faktów. Rozumowanie jest takie samo jak w poprzedniej sekcji:aktualizacja innych tabel przebiegałaby według tego samego wzorca, więc w większości powtarzalibyśmy kod.

Przed wstawieniem wartości do tabeli faktów, musimy znać wartości powiązanych kluczy z tabel wymiarów. Aby to zrobić, ponownie załadujemy wymiary do list i porównamy je z wartościami z aktywnej bazy danych.

Pierwszą rzeczą, którą zrobimy, jest załadowanie klienta i fact_customer_subscribed tabele w obiekty:

# fact_customer_subscribed
print("\nUpdating... fact_customer_subscribed")

table_customer = Table('customer', metadata, autoload = True, autoload_with = engine_live)
table_fact_customer_subscribed = Table('fact_customer_subscribed', metadata, autoload = True, autoload_with = engine_dwh)

Teraz musimy znaleźć klucze do powiązanego wymiaru czasu. Ponieważ zawsze wstawiamy dane z wczoraj, wyszukamy tę datę w dim_time tabeli i użyj jej identyfikatora. Zapytanie zwraca 1 wiersz, a identyfikator znajduje się na pierwszej pozycji (indeks zaczyna się od 0, więc jest to result[0][0] ):

# find key for the dim_time dimension
stmt = select([table_dim_time]).where(table_dim_time.columns.time_date == yesterday)
result = connection_dwh.execute(stmt).fetchall()
dim_time_id = result[0][0]

Na ten czas usuniemy wszystkie powiązane rekordy z tabeli faktów:

# delete any existing data in the fact table for that time dimension value
stmt = table_fact_customer_subscribed.delete().where(table_fact_customer_subscribed.columns.dim_time_id == dim_time_id)
connection_dwh.execute(stmt)

Dobrze, teraz mamy identyfikator wymiaru czasu przechowywany w dim_time_id zmienny. Było to łatwe, ponieważ możemy mieć tylko jedną wartość wymiaru czasowego. Inaczej będzie wyglądała historia w wymiarze miasta. Najpierw załadujemy wszystkie potrzebne nam wartości – wartości, które jednoznacznie opisują miasto (nie ID) oraz wartości zagregowane:

# prepare data for insert
stmt = select([table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name, func.sum(case([(table_customer.columns.active == 1, 1)], else_=0)).label('total_active'), func.sum(case([(table_customer.columns.active == 0, 1)], else_=0)).label('total_inactive'), func.sum(case([(and_(table_customer.columns.active == 1, func.date(table_customer.columns.time_updated) == yesterday), 1)], else_=0)).label('daily_new'), func.sum(case([(and_(table_customer.columns.active == 0, func.date(table_customer.columns.time_updated) == yesterday), 1)], else_=0)).label('daily_canceled')])\
	.select_from(table_customer\
	.join(table_city)\
	.join(table_country))\
	.group_by(table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name)

Chciałbym podkreślić kilka rzeczy w powyższym zapytaniu:

  • func.sum(...) to SUMA(...) ze „standardowego SQL”.
  • Przypadek case(...) składnia używa and_ przed warunkami, a nie między nimi.
  • .label(...) działa jak alias SQL AS.
  • Korzystamy z \ aby przejść do następnej linii i zwiększyć czytelność zapytania. (Zaufaj mi, bez ukośnika jest prawie nieczytelny – próbowałem :) )
  • .group_by(...) pełni rolę GROUP BY SQL.

Następnie przejdziemy w pętli przez każdy rekord zwrócony przy użyciu poprzedniego zapytania. Dla każdego rekordu porównamy wartości, które jednoznacznie definiują miasto (city_name , postal_code , country_name ) z wartościami przechowywanymi na liście utworzonej z DWH dim_city stół. Jeśli wszystkie trzy wartości są zgodne, przechowamy identyfikator z listy i użyjemy go podczas wstawiania nowych danych. W ten sposób dla każdego rekordu będziemy mieć identyfikatory dla obu wymiarów:

# loop through all new records
# use time dimension
# for each record find key for city dimension
# insert row
new_values = connection_live.execute(stmt).fetchall()
for new_value in new_values:
	dim_city_id = -1;
	for dim_city in table_dim_city_list:
		if new_value[0] == dim_city[1] and new_value[1] == dim_city[2] and new_value[2] == dim_city[3]:
			dim_city_id = dim_city[0]
	if dim_city_id > 0:	
		stmt_insert = table_fact_customer_subscribed.insert().values(dim_city_id=dim_city_id, dim_time_id=dim_time_id, total_active=new_value[3], total_inactive=new_value[4], daily_new=new_value[5], daily_canceled=new_value[6])
		connection_dwh.execute(stmt_insert)
		dim_city_id = -1
print("Completed.")

I to wszystko. Zaktualizowaliśmy nasze DWH. Skrypt byłby znacznie dłuższy, gdybyśmy zaktualizowali wszystkie tabele wymiarów i faktów. Złożoność byłaby również większa, gdyby tabela faktów była powiązana z większą liczbą tabel wymiarów. W takim przypadku potrzebowalibyśmy dla pętla dla każdej tabeli wymiarów.

To nie działa!

Byłem bardzo rozczarowany, kiedy napisałem ten skrypt, a potem dowiedziałem się, że coś takiego nie zadziała:

stmt = select([table_city.columns.city_name])\
	.select_from(table_city\
	.outerjoin(table_dim_city, table_city.columns.city_name == table_dim_city.columns.city_name))\
	.where(table_dim_city.columns.id.is_(None))

W tym przykładzie próbuję użyć tabel z dwóch różnych baz danych. Jeśli nawiążemy dwa oddzielne połączenia, pierwsze połączenie nie „widzi” tabel z innego połączenia. Jeśli połączymy się bezpośrednio z serwerem, a nie z bazą danych, nie będziemy mogli załadować tabel.

Dopóki to się nie zmieni (mam nadzieję, że wkrótce), będziesz musiał użyć jakiejś struktury (np. tego, co zrobiliśmy dzisiaj), aby komunikować się między dwiema bazami danych. To komplikuje kod, ponieważ musisz zastąpić pojedyncze zapytanie dwoma listami i zagnieżdżonym for pętle.

Podziel się przemyśleniami na temat SQLAlchemy i Pythona

To był ostatni artykuł z tej serii. Ale kto wie? Może spróbujemy innego podejścia w nadchodzących artykułach, więc bądź na bieżąco. W międzyczasie podzielcie się swoimi przemyśleniami na temat SQLAlchemy i Pythona w połączeniu z bazami danych. Jak myślisz, czego nam brakuje w tym artykule? Co byś dodał? Powiedz nam w komentarzach poniżej.

Tutaj możesz pobrać kompletny skrypt, którego użyliśmy w tym artykule.

Specjalne podziękowania należą się Dirkowi J Bosmanowi (@dirkjobosman), który polecił tę serię artykułów.


  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. Jak uzyskać liczbę bitów w ciągu w MySQL — BIT_LENGTH()

  2. Skuteczne monitorowanie MySQL za pomocą pulpitów nawigacyjnych SCUMM:część 3

  3. Ustaw strefę czasową w PHP i MySQL

  4. Optymalne ustawienia MySQL dla zapytań dostarczających duże ilości danych?

  5. Różnica w MySQL JOIN vs LEFT JOIN