Ten artykuł będzie interesujący dla tych, którzy często mają do czynienia z integracją danych.
Wprowadzenie
Załóżmy, że istnieje baza danych, w której użytkownicy zawsze modyfikują dane (aktualizują lub usuwają). Być może z tej bazy korzysta duża aplikacja, która nie pozwala na modyfikowanie struktury tabeli. Zadanie polega na tym, aby od czasu do czasu ładować dane z tej bazy do innej bazy danych na innym serwerze. Najprostszym sposobem rozwiązania tego problemu jest załadowanie nowych danych ze źródłowej bazy danych do docelowej bazy danych ze wstępnym wyczyszczeniem docelowej bazy danych. Możesz użyć tej metody, o ile czas ładowania danych jest akceptowalny i nie przekracza ustalonych terminów. Co się stanie, jeśli załadowanie danych zajmie kilka dni? Ponadto niestabilne kanały komunikacji prowadzą do sytuacji, w których ładowanie danych zostaje zatrzymane i ponownie uruchomione. Jeśli napotkasz te przeszkody, sugeruję rozważenie jednego z algorytmów „przeładowywania danych”. Oznacza to, że od momentu załadowania ostatniego wczytania nastąpiły tylko modyfikacje danych.
CDC
W SQL Server 2008 firma Microsoft wprowadziła mechanizm śledzenia danych o nazwie Change Data Capture (CDC). Ogólnie rzecz biorąc, celem tego mechanizmu jest to, że włączenie CDC dla dowolnej tabeli bazy danych spowoduje utworzenie tabeli systemowej w tej samej bazie danych o podobnej nazwie, jak oryginalna tabela (schemat będzie następujący:„cdc” jako prefiks plus znak stara nazwa schematu plus „_” i koniec „_CT”. Na przykład oryginalna tabela to dbo.Example, wtedy tabela systemowa będzie miała nazwę cdc.dbo_Example_CT). Będzie przechowywać wszystkie dane, które zostały zmodyfikowane.
Właściwie, aby zagłębić się w CDC, rozważ przykład. Ale najpierw upewnij się, że agent SQL, który używa CDC, działa na instancji testowej SQL Server.
Ponadto rozważymy skrypt, który tworzy bazę danych i tabelę testową, wypełnia tę tabelę danymi i włącza CDC dla tej tabeli.
Aby zrozumieć i uprościć zadanie, użyjemy jednej instancji SQL Server bez dystrybucji źródłowej i docelowej bazy danych na różne serwery.
użyj mastergo-- utwórz źródłową bazę danych, jeśli nie istnieje (wybierz * z sys.databases, gdzie nazwa ='db_src_cdc') utwórz bazę danych db_src_cdcgouse db_src_cdcgo-- włącz CDC, jeśli jest wyłączone, jeśli nie istnieje (wybierz * z sys.databases, gdzie nazwa =db_name() i is_cdc_enabled=1) exec sys.sp_cdc_enable_dbgo-- utwórz rolę dla tabel z CDCif nie istnieje (wybierz * z sys.sysusers gdzie name ='CDC_Reader' i issqlrole=1) utwórz rolę CDC_Readergo-- utwórz tabelę object_id('dbo.Example','U') ma wartość null utwórz tabelę dbo.Example (identyfikator int klucz podstawowy PK_Example, tytuł varchar(200) nie null )go-- wypełnij tabelę dbo.Example (tytuł) wartości ( 'Jeden'),('Dwa'),('Trzy'),('Cztery'),('Pięć');go-- włącz CDC dla tabeli, jeśli nie istnieje (wybierz * z sys.tables, gdzie is_tracked_by_cdc =1 and name ='Przykład') exec sys.sp_cdc_enable_table @source_schema ='dbo', @source_name ='Przykład', @role_name ='CDC_Reader'go-- wypełnij tabelę pewnymi danymi. Zmienimy lub usuniemy coś zaktualizuj dbo.Przykładowy Tytuł =reverse(Tytuł)gdzie ID w (2,3,4);usuń z dbo.Przykład gdzie ID w (1,2);ustaw tożsamość_wstaw dbo.Przykład na;wstaw dbo. Przykład (ID, Tytuł) wartości(1,'Jeden'),(6,'Sześć');set identity_insert dbo.Przykład off;go
Teraz spójrzmy, co mamy po wykonaniu tego skryptu w tabelach dbo.Example i cdc.dbo_Example_CT (należy zauważyć, że CDC jest asynchroniczny. Dane są umieszczane w tabelach, w których przechowywane jest śledzenie zmian po określonym czasie ).
wybierz * z dbo.Przykład;
Tytuł ID---- ---------------------- 1 Jeden 3 eerhT 4 ruoF 5 Pięć 6 Sześć
wybierz row_number() over ( partycja według identyfikatora kolejność według __$start_lsn desc, __$seqval desc ) jako __$rn, *z cdc.dbo_Example_CT;
__$rn __$start_lsn __$end_lsn __$seqval __$operacja __$update_mask ID Tytuł------ ---------- - ----------- ---------------------- ------------ ---- ------------ --- ----------- 1 0x0000003A000000580005 NULL 0x0000003A000000580003 2 0x03 1 Jeden 2 0x0000003A000000560006 NULL 0x0000003A000000560002 1 0x03 1 Jeden 1 0x0000003A000000560006 NULL 0x0000003A000000560005 1 0x03 2 owT 2 0x0000003A000000540005 NULL 0x0000003A000000540002 3 0x02 2 Dwa 3 0x0000003A000000540005 NULL 0x0000003A000000540002 4 0x02 2 owT 1 0x0000003A000000540005 NULL 0x0000003A000000540003 3 0x02 3 Trzy 2 0x0000003A0000030000540000A000040x0000003A0000030000540000A NULL 0 3 0x02 4 Cztery 2 0x0000003A000000540005 NULL 0x0000003A000000540004 4 0x02 4 ruoF 1 0x0000003A000000580005 NULL 0x0000003A000000580004 2 0x03
Rozważ szczegółowo strukturę tabeli, w której przechowywane jest śledzenie zmian. Pola __ $start_lsn i __ $seqval to odpowiednio LSN (numer kolejny dziennika w bazie danych) i numer transakcji w ramach transakcji. W tych polach istnieje ważna właściwość, a mianowicie możemy być pewni, że rekord z wyższym LSN zostanie wykonany później. Dzięki tej właściwości możemy łatwo uzyskać najnowszy stan każdego rekordu w zapytaniu, filtrując nasz wybór według warunku – gdzie __ $ rn =1.
Pole __$operacja zawiera kod transakcji:
- 1 – rekord został usunięty
- 2 – rekord został wstawiony
- 3, 4 – rekord jest aktualizowany. Stare dane przed aktualizacją to 3, nowe dane to 4.
Oprócz pól serwisowych z prefiksem «__$», pola oryginalnej tabeli są całkowicie zduplikowane. Ta informacja wystarczy nam, aby przejść do ładowania przyrostowego.
Konfigurowanie bazy danych do ładowania danych
Utwórz tabelę w naszej testowej docelowej bazie danych, do której będą ładowane dane, a także dodatkową tabelę do przechowywania danych o dzienniku ładowania.
użyj mastergo-- utwórz docelową bazę danych, jeśli nie istnieje (wybierz * z sys.databases, gdzie nazwa ='db_dst_cdc') utwórz bazę danych db_dst_cdcgouse db_dst_cdcgo-- utwórz tabelę, jeśli object_id('dbo.Example','U') jest null utwórz tabelę dbo.Przykład (Identyfikator int ograniczenie PK_Example klucz podstawowy, Tytuł varchar(200) nie null )go-- utwórz tabelę do przechowywania logów obciążenia object_id('dbo.log_cdc','U') jest null stwórz tabelę dbo Zaloguj sięChciałbym zwrócić uwagę na pola tabeli LOG_CDC:
- TABLE_NAME przechowuje informacje o tym, jaka tabela została załadowana (możliwe jest załadowanie kilku tabel w przyszłości, z różnych baz danych lub nawet z różnych serwerów; format tabeli to „NAZWA_SERWERA.NAZWA_DB_NAZWA_SCHEMATU.NAZWA_TABELI”
- DT to pole daty i godziny ładowania, które jest opcjonalne w przypadku obciążenia przyrostowego. Będzie to jednak przydatne do kontroli ładowania.
- LSN – po załadowaniu tabeli musimy przechowywać informacje o miejscu rozpoczęcia kolejnego ładowania, jeśli jest to wymagane. W związku z tym po każdym ładowaniu dodajemy do tej kolumny najnowsze (maksymalne) __ $ start_lsn.
Algorytm ładowania danych
Jak opisano powyżej, korzystając z zapytania, możemy uzyskać najnowszy stan tabeli za pomocą funkcji okna. Jeśli znamy numer LSN najnowszego obciążenia, przy następnym ładowaniu możemy odfiltrować ze źródła wszystkie dane, których zmiany są większe niż zapisany numer LSN, o ile było przynajmniej jedno pełne poprzednie obciążenie:
z incr_Example as( select row_number() over ( partycja według ID order by __$start_lsn desc, __$seqval desc ) as __$rn, * from db_src_cdc.cdc.dbo_Example_CT gdzie __$operacja <> 3 i __$ start_lsn> @lsn)wybierz * z incr_Example
Następnie możemy uzyskać wszystkie rekordy dla pełnego obciążenia, jeśli ładunek LSN nie jest przechowywany:
z incr_Example as( select row_number() over ( partycja według ID order by __$start_lsn desc, __$seqval desc ) as __$rn, * from db_src_cdc.cdc.dbo_Example_CT gdzie __$operacja <> 3 i __$ start_lsn> @lsn), full_Example as( select * from db_src_cdc.dbo.Example gdzie @lsn ma wartość null)select ID, Title, __$operationfrom incr_Examplegdzie __$rn =1union allselect ID, Title, 2 as __$operationfrom full_Example
Zatem w zależności od wartości @LSN zapytanie to wyświetli albo wszystkie ostatnie zmiany (z pominięciem tymczasowych) ze statusem Usunięty lub nie, albo wszystkie dane z oryginalnej tabeli, dodając status 2 (nowy rekord) – to pole służy tylko do ujednolicenia dwóch selekcji. Za pomocą tego zapytania możemy łatwo zaimplementować pełne ładowanie lub przeładowanie za pomocą polecenia MERGE (począwszy od wersji SQL 2008).
Aby uniknąć wąskich gardeł, które mogą tworzyć alternatywne procesy i ładować dopasowane dane z różnych tabel (w przyszłości będziemy ładować kilka tabel i ewentualnie mogą istnieć relacje relacyjne między nimi), sugeruję użycie migawki DB na źródłowej bazie danych ( kolejna funkcja SQL 2008).
Pełny tekst obciążenia jest następujący:
[rozwiń tytuł=”Kod”]
/* Algorytm ładowania danych*/-- utwórz migawkę bazy danych, jeśli istnieje (wybierz * z sys.databases gdzie name ='db_src_cdc_ss' ) upuść bazę danych db_src_cdc_ss;declare @query nvarchar(max);select @query =N' utwórz bazę danych db_src_cdc_ss on ( name =N'''+name+ ''', filename =N'''+[filename]+'.ss'' ) jako migawkę db_src_cdc'z db_src_cdc.sys.sysfiles gdzie groupid =1; exec ( @query );-- odczytaj LSN z poprzedniego loaddeclare @lsn binary(10) =(wybierz max(lsn) z db_dst_cdc.dbo.log_cdc gdzie nazwa_tabeli ='localhost.db_src_cdc.dbo.Example');-- wyczyść tabela przed pełnym loadif @lsn jest null obcinanie tabeli db_dst_cdc.dbo.Example;-- załaduj proces z incr_Example as( select row_number() over (kolejność partycji według identyfikatora według __$start_lsn desc, __$seqval desc ) as __$rn , * z db_src_cdc_ss.cdc.dbo_Example_CT gdzie __$operacja <> 3 i __$start_lsn> @lsn), full_Example as( wybierz * z db_src_cdc_ss.dbo.Przykład gdzie @lsn jest puste), cte_,przykład as( wybierz ID Tytuł, __$operacja z incr_Example, gdzie __$rn =1 union all select ID, Title, 2 jako __$operacja z full_Example)merge db_dst_cdc.dbo.Przykład jako trg przy użyciu cte_Example jako src na trg.ID=src.IDpo dopasowaniu i __$operacja =1, a następnie usuń, gdy dopasowane i __$operacja <> 1, a następnie zaktualizuj zestaw trg.Title =src.Titlewhen nie dopasowane przez cel i __$operacja <> 1, a następnie wstaw wartości (ID, Tytuł) (src.ID, src .Title);-- oznacza koniec procesu ładowania i najnowsze wartości LSNinsert db_dst_cdc.dbo.log_cdc (nazwa_tabeli, lsn) ('localhost.db_src_cdc.dbo.Example', isnull((wybierz max(__$start_lsn) z db_src_cdc_ss.cdc.dbo_Example_CT),0))-- usuń zrzut bazy danych, jeśli istnieje (wybierz * z sys.databases, gdzie name ='db_src_cdc_ss' ) usuń bazę danych db_src_cdc_ss
[/rozwiń]