W poprzednich dwóch artykułach z tej serii omówiliśmy, jak używać Pythona i SQLAlchemy do wykonywania procesu ETL. Dzisiaj zrobimy to samo, ale tym razem używając Pythona i SQL Alchemy bez poleceń SQL w formacie tekstowym. Umożliwi nam to korzystanie z SQLAlchemy niezależnie od silnika bazy danych, z którym jesteśmy połączeni. Więc zacznijmy.
Dzisiaj omówimy, jak wykonać proces ETL za pomocą Pythona i SQLAlchemy. Stworzymy skrypt do wydobywania codziennych danych z naszej operacyjnej bazy danych, przekształcania ich, a następnie ładowania do naszej hurtowni danych.
To trzeci artykuł z tej serii. Jeśli nie przeczytałeś dwóch pierwszych artykułów (Korzystanie z Pythona i MySQL w procesie ETL i SQLAlchemy), gorąco zachęcam do zrobienia tego przed kontynuowaniem.
Cała ta seria jest kontynuacją naszej serii hurtowni danych:
- Tworzenie DWH, część pierwsza:model danych biznesowych subskrypcji
- Tworzenie DWH, część druga:model danych biznesowych subskrypcji
- Tworzenie hurtowni danych, część 3:model danych biznesowych subskrypcji
Dobra, teraz zacznijmy od dzisiejszego tematu. Najpierw spójrzmy na modele danych.
Modele danych
Operacyjny (na żywo) model danych bazy danych
Model danych DWH
Oto dwa modele danych, których będziemy używać. Więcej informacji o hurtowniach danych (DWH) znajdziesz w tych artykułach:
- Schemat gwiazdy
- Schemat płatka śniegu
- Schemat gwiazdy kontra schemat płatka śniegu
Dlaczego SQLAlchemy?
Cała idea SQLAlchemy polega na tym, że po zaimportowaniu baz danych nie potrzebujemy kodu SQL, który jest specyficzny dla powiązanego silnika bazy danych. Zamiast tego możemy importować obiekty do SQLAlchemy i używać składni SQLAlchemy dla instrukcji. To pozwoli nam używać tego samego języka bez względu na silnik bazy danych, z którym jesteśmy połączeni. Główną zaletą jest to, że programista nie musi dbać o różnice między różnymi silnikami baz danych. Twój program SQLAlchemy będzie działał dokładnie tak samo (z niewielkimi zmianami) po migracji do innego silnika bazy danych.
Zdecydowałem się używać tylko poleceń SQLAlchemy i list Pythona do komunikacji z tymczasowym magazynem i między różnymi bazami danych. Głównym powodem tej decyzji jest to, że 1) listy Pythona są dobrze znane i 2) kod byłby czytelny dla osób bez umiejętności Pythona.
Nie oznacza to, że SQLAlchemia jest idealna. Ma pewne ograniczenia, które omówimy później. Na razie spójrzmy tylko na poniższy kod:
Uruchamianie skryptu i wyniku
To jest polecenie Pythona używane do wywołania naszego skryptu. Skrypt sprawdza dane w operacyjnej bazie danych, porównuje wartości z DWH i importuje nowe wartości. W tym przykładzie aktualizujemy wartości w dwóch tabelach wymiarów i jednej tabeli faktów; skrypt zwraca odpowiednie dane wyjściowe. Cały skrypt jest napisany tak, abyś mógł go uruchamiać wiele razy dziennie. Usunie „stare” dane z tego dnia i zastąpi je nowymi.
Przeanalizujmy cały skrypt, zaczynając od góry.
Importowanie SQLAlchemy
Pierwszą rzeczą, którą musimy zrobić, to zaimportować moduły, których użyjemy w skrypcie. Zazwyczaj będziesz importować swoje moduły podczas pisania skryptu. W większości przypadków na początku nie będziesz wiedzieć dokładnie, jakich modułów będziesz potrzebować.
from datetime import date # import SQLAlchemy from sqlalchemy import create_engine, select, MetaData, Table, and_, func, case
Zaimportowaliśmy datetime
Pythona moduł, który dostarcza nam klas, które pracują z datami.
Następnie mamy sqlalchemy
moduł. Nie zaimportujemy całego modułu, tylko to, czego potrzebujemy – niektóre specyficzne dla SQLAlchemy (create_engine
, MetaData
, Table
), niektóre części instrukcji SQL (select
, and_
, case
) i func
, który umożliwia nam korzystanie z funkcji takich jak count() i sum() .
Łączenie z bazami danych
Będziemy musieli połączyć się z dwiema bazami danych na naszym serwerze. W razie potrzeby moglibyśmy połączyć się z większą liczbą baz danych (MySQL, SQL Server lub dowolną inną) z różnych serwerów. W tym przypadku obie bazy danych są bazami danych MySQL i są przechowywane na moim lokalnym komputerze.
# connect to databases engine_live = sqlalchemy.create_engine('mysql+pymysql://: @localhost:3306/subscription_live') connection_live = engine_live.connect() engine_dwh = sqlalchemy.create_engine('mysql+pymysql:// : @localhost:3306/subscription_dwh') connection_dwh = engine_dwh.connect() metadata = MetaData(bind=None)
Stworzyliśmy dwa silniki i dwa połączenia. Nie będę tu wchodzić w szczegóły, ponieważ wyjaśniliśmy to już w poprzednim artykule.
Aktualizacja dim_time Wymiar
Cel:Wstaw wczorajszą datę, jeśli nie jest jeszcze w tabeli.
W naszym skrypcie zaktualizujemy dwie tabele wymiarów o nowe wartości. Pozostałe postępują według tego samego wzoru, więc omówimy to tylko raz; nie musimy jeszcze kilka razy zapisywać prawie identycznego kodu.
Pomysł jest bardzo prosty. Zawsze uruchomimy skrypt, aby wstawić nowe dane na wczoraj. Dlatego musimy sprawdzić, czy ta data została wstawiona do tabeli wymiarów. Jeśli już tam jest, nic nie zrobimy; jeśli nie, dodamy to. Rzućmy okiem na kod, aby zaktualizować dim_time
tabela.
Najpierw sprawdzimy, czy data istnieje. Jeśli nie istnieje, dodamy go. Zaczynamy od zapisania wczorajszej daty w zmiennej. W Pythonie robisz to w ten sposób:
yesterday = date.fromordinal(date.today().toordinal()-1) yesterday_str = str(yesterday)
Pierwszy wiersz przyjmuje bieżącą datę, konwertuje ją na wartość liczbową, odejmuje 1 od tej wartości i konwertuje tę wartość liczbową z powrotem na datę (wczoraj =dzisiaj – 1 ). Druga linia przechowuje datę w formacie tekstowym.
Następnie sprawdzimy, czy data jest już w bazie danych:
table_dim_time = Table('dim_time', metadata, autoload = True, autoload_with = engine_dwh) stmt = select([table_dim_time]).where(table_dim_time.columns.time_date == yesterday_str) result = connection_dwh.execute(stmt).fetchall() date_exists = len(result)
Po załadowaniu tabeli uruchomimy zapytanie, które powinno zwrócić wszystkie wiersze z tabeli wymiarów, w których wartość czasu/daty jest równa wczoraj. Wynik może mieć 0 (brak takiej daty w tabeli) lub 1 wiersz (data jest już w tabeli).
Jeśli data nie jest jeszcze w tabeli, użyjemy polecenia insert(), aby ją dodać:
if date_exists == 0: print("New value added.") stmt = table_dim_time.insert().values(time_date=yesterday, time_year=yesterday.year, time_month=yesterday.month, time_week=yesterday.isocalendar()[1], time_weekday=yesterday.weekday()) connection_dwh.execute(stmt) else: print("No new values.")
Jedną z nowych rzeczy, na którą chciałbym zwrócić uwagę, jest użycie. .year
, .month
, .isocalendar()[1]
i .weekday
aby uzyskać daty.
Aktualizacja dim_city Wymiar
Cel:Wstaw nowe miasta, jeśli istnieją (np. porównaj listę miast w aktywnej bazie danych z listą miast w DWH i dodaj brakujące).
Aktualizacja dim_time
wymiar był dość prosty. Po prostu sprawdziliśmy, czy data jest w tabeli i wstawiliśmy ją, jeśli jeszcze jej tam nie było. Aby przetestować wartość w bazie danych DWH, użyliśmy zmiennej Pythona (wczoraj ). Użyjemy tego procesu ponownie, ale tym razem z listami.
Ponieważ nie ma łatwego sposobu łączenia tabel z różnych baz danych w jednym zapytaniu SQLAlchemy, nie możemy zastosować podejścia opisanego w części 1 tej serii. Dlatego będziemy potrzebować obiektu do przechowywania wartości potrzebnych do komunikacji między tymi dwiema bazami danych. Zdecydowałem się użyć list, ponieważ są one powszechne i wykonują swoją pracę.
Najpierw załadujemy country
i city
tabele z aktywnej bazy danych do odpowiednich obiektów.
# dim_city print("\nUpdating... dim_city") table_city = Table('city', metadata, autoload = True, autoload_with = engine_live) table_country = Table('country', metadata, autoload = True, autoload_with = engine_live) table_dim_city = Table('dim_city', metadata, autoload = True, autoload_with = engine_dwh)
Następnie załadujemy dim_city
tabeli z DWH na listę:
# load whole dwh table in the list stmt = select([table_dim_city]); table_dim_city_list = connection_dwh.execute(stmt).fetchall()
Wtedy zrobimy to samo dla wartości z aktywnej bazy danych. Dołączymy do stołów country
i city
więc mamy wszystkie potrzebne dane na tej liście:
# load all live values in the list stmt = select([table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name])\ .select_from(table_city\ .join(table_country)) table_city_list = connection_live.execute(stmt).fetchall()
Teraz przejdziemy przez listę zawierającą dane z aktywnej bazy danych. Dla każdego rekordu porównamy wartości (city_name
, postal_code
i country_name
). Jeśli nie znajdziemy takich wartości, dodamy nowy rekord do dim_city
tabela.
# loop through live_db table # for each record test if it is missing in the dwh table new_values_added = 0 for city in table_city_list: id = -1; for dim_city in table_dim_city_list: if city[0] == dim_city[1] and city[1] == dim_city[2] and city[2] == dim_city[3]: id = dim_city[0] if id == -1: stmt = table_dim_city.insert().values(city_name=city[0], postal_code=city[1], country_name=city[2]) connection_dwh.execute(stmt) new_values_added = 1 if new_values_added == 0: print("No new values.") else: print("New value(s) added.")
Aby ustalić, czy wartość jest już w DWH, przetestowaliśmy kombinację atrybutów, które powinny być unikalne. (Klucz podstawowy z aktywnej bazy danych niewiele nam tutaj pomaga.) Możemy użyć podobnego kodu do aktualizacji innych słowników. Nie jest to najładniejsze rozwiązanie, ale nadal jest całkiem eleganckie. I zrobi dokładnie to, czego potrzebujemy.
Aktualizacja fact_customer_subscribed Tabela
Cel:Jeśli mamy stare dane z wczorajszej daty, najpierw je usuń. Dodaj wczorajsze dane do DWH – niezależnie od tego, czy usunęliśmy coś w poprzednim kroku, czy nie.
Po zaktualizowaniu wszystkich tabel wymiarów powinniśmy zaktualizować tabele faktów. W naszym skrypcie zaktualizujemy tylko jedną tabelę faktów. Rozumowanie jest takie samo jak w poprzedniej sekcji:aktualizacja innych tabel przebiegałaby według tego samego wzorca, więc w większości powtarzalibyśmy kod.
Przed wstawieniem wartości do tabeli faktów, musimy znać wartości powiązanych kluczy z tabel wymiarów. Aby to zrobić, ponownie załadujemy wymiary do list i porównamy je z wartościami z aktywnej bazy danych.
Pierwszą rzeczą, którą zrobimy, jest załadowanie klienta i fact_customer_subscribed
tabele w obiekty:
# fact_customer_subscribed print("\nUpdating... fact_customer_subscribed") table_customer = Table('customer', metadata, autoload = True, autoload_with = engine_live) table_fact_customer_subscribed = Table('fact_customer_subscribed', metadata, autoload = True, autoload_with = engine_dwh)
Teraz musimy znaleźć klucze do powiązanego wymiaru czasu. Ponieważ zawsze wstawiamy dane z wczoraj, wyszukamy tę datę w dim_time
tabeli i użyj jej identyfikatora. Zapytanie zwraca 1 wiersz, a identyfikator znajduje się na pierwszej pozycji (indeks zaczyna się od 0, więc jest to result[0][0]
):
# find key for the dim_time dimension stmt = select([table_dim_time]).where(table_dim_time.columns.time_date == yesterday) result = connection_dwh.execute(stmt).fetchall() dim_time_id = result[0][0]
Na ten czas usuniemy wszystkie powiązane rekordy z tabeli faktów:
# delete any existing data in the fact table for that time dimension value stmt = table_fact_customer_subscribed.delete().where(table_fact_customer_subscribed.columns.dim_time_id == dim_time_id) connection_dwh.execute(stmt)
Dobrze, teraz mamy identyfikator wymiaru czasu przechowywany w dim_time_id
zmienny. Było to łatwe, ponieważ możemy mieć tylko jedną wartość wymiaru czasowego. Inaczej będzie wyglądała historia w wymiarze miasta. Najpierw załadujemy wszystkie potrzebne nam wartości – wartości, które jednoznacznie opisują miasto (nie ID) oraz wartości zagregowane:
# prepare data for insert stmt = select([table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name, func.sum(case([(table_customer.columns.active == 1, 1)], else_=0)).label('total_active'), func.sum(case([(table_customer.columns.active == 0, 1)], else_=0)).label('total_inactive'), func.sum(case([(and_(table_customer.columns.active == 1, func.date(table_customer.columns.time_updated) == yesterday), 1)], else_=0)).label('daily_new'), func.sum(case([(and_(table_customer.columns.active == 0, func.date(table_customer.columns.time_updated) == yesterday), 1)], else_=0)).label('daily_canceled')])\ .select_from(table_customer\ .join(table_city)\ .join(table_country))\ .group_by(table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name)
Chciałbym podkreślić kilka rzeczy w powyższym zapytaniu:
func.sum(...)
to SUMA(...) ze „standardowego SQL”.- Przypadek
case(...)
składnia używaand_
przed warunkami, a nie między nimi. .label(...)
działa jak alias SQL AS.- Korzystamy z
\
aby przejść do następnej linii i zwiększyć czytelność zapytania. (Zaufaj mi, bez ukośnika jest prawie nieczytelny – próbowałem :) ) .group_by(...)
pełni rolę GROUP BY SQL.
Następnie przejdziemy w pętli przez każdy rekord zwrócony przy użyciu poprzedniego zapytania. Dla każdego rekordu porównamy wartości, które jednoznacznie definiują miasto (city_name
, postal_code
, country_name
) z wartościami przechowywanymi na liście utworzonej z DWH dim_city
stół. Jeśli wszystkie trzy wartości są zgodne, przechowamy identyfikator z listy i użyjemy go podczas wstawiania nowych danych. W ten sposób dla każdego rekordu będziemy mieć identyfikatory dla obu wymiarów:
# loop through all new records # use time dimension # for each record find key for city dimension # insert row new_values = connection_live.execute(stmt).fetchall() for new_value in new_values: dim_city_id = -1; for dim_city in table_dim_city_list: if new_value[0] == dim_city[1] and new_value[1] == dim_city[2] and new_value[2] == dim_city[3]: dim_city_id = dim_city[0] if dim_city_id > 0: stmt_insert = table_fact_customer_subscribed.insert().values(dim_city_id=dim_city_id, dim_time_id=dim_time_id, total_active=new_value[3], total_inactive=new_value[4], daily_new=new_value[5], daily_canceled=new_value[6]) connection_dwh.execute(stmt_insert) dim_city_id = -1 print("Completed.")
I to wszystko. Zaktualizowaliśmy nasze DWH. Skrypt byłby znacznie dłuższy, gdybyśmy zaktualizowali wszystkie tabele wymiarów i faktów. Złożoność byłaby również większa, gdyby tabela faktów była powiązana z większą liczbą tabel wymiarów. W takim przypadku potrzebowalibyśmy dla pętla dla każdej tabeli wymiarów.
To nie działa!
Byłem bardzo rozczarowany, kiedy napisałem ten skrypt, a potem dowiedziałem się, że coś takiego nie zadziała:
stmt = select([table_city.columns.city_name])\ .select_from(table_city\ .outerjoin(table_dim_city, table_city.columns.city_name == table_dim_city.columns.city_name))\ .where(table_dim_city.columns.id.is_(None))
W tym przykładzie próbuję użyć tabel z dwóch różnych baz danych. Jeśli nawiążemy dwa oddzielne połączenia, pierwsze połączenie nie „widzi” tabel z innego połączenia. Jeśli połączymy się bezpośrednio z serwerem, a nie z bazą danych, nie będziemy mogli załadować tabel.
Dopóki to się nie zmieni (mam nadzieję, że wkrótce), będziesz musiał użyć jakiejś struktury (np. tego, co zrobiliśmy dzisiaj), aby komunikować się między dwiema bazami danych. To komplikuje kod, ponieważ musisz zastąpić pojedyncze zapytanie dwoma listami i zagnieżdżonym for pętle.
Podziel się przemyśleniami na temat SQLAlchemy i Pythona
To był ostatni artykuł z tej serii. Ale kto wie? Może spróbujemy innego podejścia w nadchodzących artykułach, więc bądź na bieżąco. W międzyczasie podzielcie się swoimi przemyśleniami na temat SQLAlchemy i Pythona w połączeniu z bazami danych. Jak myślisz, czego nam brakuje w tym artykule? Co byś dodał? Powiedz nam w komentarzach poniżej.
Tutaj możesz pobrać kompletny skrypt, którego użyliśmy w tym artykule.
Specjalne podziękowania należą się Dirkowi J Bosmanowi (@dirkjobosman), który polecił tę serię artykułów.