CouchDB
 sql >> Baza danych >  >> NoSQL >> CouchDB

Synchronizacja w stylu CouchDB i rozwiązywanie konfliktów w Postgres z Hasura

Mówiliśmy o trybie offline, najpierw z Hasura i RxDB (zasadniczo Postgres i PouchDB poniżej).

Ten post nadal zagłębia się w temat. Jest to dyskusja i przewodnik po implementacji rozwiązywania konfliktów w stylu CouchDB za pomocą Postgres (centralna baza danych backendu) i PouchDB (aplikacja frontendowa użytkownik bazy danych).

Oto, o czym będziemy rozmawiać:

  • Co to jest rozwiązywanie konfliktów?
  • Czy moja aplikacja wymaga rozwiązania konfliktów?
  • Objaśnienie rozwiązywania konfliktów z PouchDB
  • Zapewnienie łatwej replikacji i zarządzania konfliktami w pouchdb (frontend) i Postgres (backend) dzięki RxDB i Hasura
    • Konfigurowanie Hasury
    • Konfiguracja po stronie klienta
    • Wdrażanie rozwiązywania konfliktów
    • Korzystanie z widoków
    • Korzystanie z wyzwalaczy postgres
  • Niestandardowe strategie rozwiązywania konfliktów z Hasura
    • Niestandardowe rozwiązywanie konfliktów na serwerze
    • Niestandardowe rozwiązywanie konfliktów na kliencie
  • Wniosek

Co to jest rozwiązywanie konfliktów?

Weźmy jako przykład tablicę Trello. Załóżmy, że zmieniłeś osobę przypisaną na karcie Trello w trybie offline. W międzyczasie kolega edytuje opis tej samej karty. Kiedy wrócisz online, chciałbyś zobaczyć obie zmiany. Załóżmy teraz, że oboje zmieniliście opis w tym samym czasie, co powinno się stać w tym przypadku? Jedną z opcji jest po prostu wzięcie ostatniego zapisu - czyli zastąpienie wcześniejszej zmiany nową. Innym jest powiadomienie użytkownika i pozwolenie mu na zaktualizowanie karty za pomocą scalonego pola (takiego jak git!).

Ten aspekt przyjmowania wielu jednoczesnych zmian (które mogą być sprzeczne) i łączenia ich w jedną zmianę nazywa się rozwiązywaniem konfliktów.

Jakiego rodzaju aplikacje możesz tworzyć, gdy masz dobre możliwości replikacji i rozwiązywania konfliktów?

Infrastruktura replikacji i rozwiązywania konfliktów jest trudna do wbudowania we frontend i backend aplikacji. Ale po skonfigurowaniu niektóre ważne przypadki użycia stają się wykonalne! W rzeczywistości, w przypadku niektórych rodzajów aplikacji replikacja (a tym samym rozwiązywanie konfliktów) ma kluczowe znaczenie dla funkcjonalności aplikacji!

  1. W czasie rzeczywistym:zmiany wprowadzane przez użytkowników na różnych urządzeniach są ze sobą synchronizowane
  2. Współpraca:różni użytkownicy pracują jednocześnie na tych samych danych
  3. Najpierw offline:ten sam użytkownik może pracować ze swoimi danymi, nawet gdy aplikacja nie jest połączona z centralną bazą danych

Przykłady:Trello, klienty poczty e-mail, takie jak Gmail, Superhuman, dokumenty Google, Facebook, Twitter itp.

Hasura bardzo ułatwia dodawanie wydajnych, bezpiecznych funkcji czasu rzeczywistego do istniejącej aplikacji opartej na Postgres. Nie ma potrzeby wdrażania dodatkowej infrastruktury zaplecza do obsługi tych przypadków użycia! W kilku następnych sekcjach dowiemy się, jak używać PouchDB/RxDB na interfejsie użytkownika i sparować go z Hasura, aby tworzyć potężne aplikacje z doskonałym doświadczeniem użytkownika.

Wyjaśnienie rozwiązywania konfliktów z PouchDB

Zarządzanie wersjami za pomocą PouchDB

PouchDB - którego RxDB używa pod spodem - jest wyposażony w potężny mechanizm wersjonowania i zarządzania konfliktami. Każdy dokument w PouchDB ma skojarzone z nim pole wersji. Pola wersji mają postać <depth>-<object-hash> na przykład 2-c1592ce7b31cc26e91d2f2029c57e621 . Tutaj głębokość wskazuje głębokość w drzewie wersji. Hash obiektu to losowo generowany ciąg.

Zajrzyj do wersji PouchDB

PouchDB udostępnia interfejsy API do pobierania historii zmian dokumentu. Możemy w ten sposób zapytać o historię zmian:

todos.pouch.get(todo.id, {
    revs: true
})

To zwróci dokument zawierający _revisions pole:

{
  "id": "559da26d-ad0f-42bc-a172-1821641bf2bb",
  "_rev": "4-95162faab173d1e748952179e0db1a53",
  "_revisions": {
    "ids": [
      "95162faab173d1e748952179e0db1a53",
      "94162faab173d1e748952179e0db1a53",
      "9055e63d99db056a95b61936f0185c8c",
      "de71900ec14567088bed5914b2439896"
    ],
    "start": 4
  }
}

Tutaj ids zawiera hierarchię wersji wersji (w tym bieżącą) i start zawiera „numer przedrostka” dla bieżącej wersji. Za każdym razem, gdy dodawana jest nowa wersja start jest zwiększany i dodawany jest nowy skrót na początku ids tablica.

Gdy dokument jest synchronizowany ze zdalnym serwerem, _revisions i _rev pola muszą być uwzględnione. W ten sposób wszyscy klienci mają ostatecznie pełną historię wersji. Dzieje się to automatycznie, gdy PouchDB jest skonfigurowany do synchronizacji z CouchDB. Powyższe żądanie ściągnięcia umożliwia to również podczas synchronizacji przez GraphQL.

Pamiętaj, że wszyscy klienci niekoniecznie mają wszystkie wersje, ale ostatecznie wszyscy będą mieli najnowsze wersje i historię identyfikatorów wersji dla tych wersji.

Rozwiązywanie konfliktów

Konflikt zostanie wykryty, jeśli dwie wersje mają tego samego rodzica lub prościej, jeśli dwie wersje mają tę samą głębokość. Po wykryciu konfliktu CouchDB i PouchDB użyją tego samego algorytmu, aby automatycznie wybrać zwycięzcę:

  1. Wybierz wersje z polem o największej głębokości, które nie są oznaczone jako usunięte
  2. Jeśli jest tylko 1 takie pole, traktuj je jako zwycięskie
  3. Jeśli jest więcej niż 1, posortuj pola wersji w kolejności malejącej i wybierz pierwsze.

Uwaga na temat usunięcia: PouchDB i CouchDB nigdy nie usuwają wersji ani dokumentów, zamiast tego tworzona jest nowa wersja z flagą _deleted ustawioną na true. Tak więc w kroku 1 powyższego algorytmu wszystkie łańcuchy, które kończą się wersją oznaczoną jako usunięta, są ignorowane.

Jedną z fajnych cech tego algorytmu jest to, że nie jest wymagana koordynacja między klientami lub klientem a serwerem w celu rozwiązania konfliktu. Nie ma też dodatkowego znacznika wymaganego do oznaczenia wersji jako wygrywającej. Każdy klient i serwer niezależnie wybierają zwycięzcę. Ale zwycięzcą będzie ta sama wersja, ponieważ używają tego samego algorytmu deterministycznego. Nawet jeśli jednemu z klientów brakuje niektórych wersji, ostatecznie, gdy te wersje zostaną zsynchronizowane, ta sama wersja zostanie wybrana jako zwycięska.

Wdrażanie niestandardowych strategii rozwiązywania konfliktów

Ale co, jeśli chcemy alternatywnej strategii rozwiązywania konfliktów? Na przykład „scal według pól” — jeśli dwie sprzeczne wersje zmodyfikowały różne klucze obiektu, chcemy automatycznie scalić, tworząc wersję z obydwoma kluczami. Zalecanym sposobem wykonania tego w PouchDB jest:

  1. Utwórz tę nową wersję w dowolnym łańcuchu
  2. Dodaj wersję z _deleted ustawionym na true do każdego z pozostałych łańcuchów

Scalona wersja będzie teraz automatycznie zwycięską wersją zgodnie z powyższym algorytmem. Możemy wykonać niestandardową rozdzielczość na serwerze lub kliencie. Gdy wersje zostaną zsynchronizowane, wszyscy klienci i serwer zobaczą scaloną wersję jako wersję zwycięską.

Rozwiązywanie konfliktów z Hasura i RxDB

Aby wdrożyć powyższą strategię rozwiązywania konfliktów, będziemy potrzebować Hasury do przechowywania historii wersji, a RxDB do synchronizacji wersji podczas replikacji za pomocą GraphQL.

Konfigurowanie Hasury

Kontynuując przykład aplikacji Todo z poprzedniego postu. Będziemy musieli zaktualizować schemat tabeli Todos w następujący sposób:

todo (
  id: text primary key,
  userId: text,
  text: text, <br/>
  createdAt: timestamp,
  isCompleted: boolean,
  deleted: boolean,
  updatedAt: boolean,
  _revisions: jsonb,
  _rev: text primary key,
  _parent_rev: text,
  _depth: integer,
)

Zwróć uwagę na dodatkowe pola:

  • _rev reprezentuje rewizję rekordu.
  • _parent_rev reprezentuje nadrzędną wersję rekordu
  • _depth to głębokość rekordu w drzewie wersji
  • _revisions zawiera pełną historię zmian rekordu.

Kluczem podstawowym tabeli jest (id , _rev ).

Ściśle mówiąc, potrzebujemy tylko _revisions pola, ponieważ można z niego uzyskać inne informacje. Ale łatwość dostępu do innych pól ułatwia wykrywanie i rozwiązywanie konfliktów.

Konfiguracja po stronie klienta

Musimy ustawić syncRevisions do true podczas konfigurowania replikacji


    async setupGraphQLReplication(auth) {
        const replicationState = this.db.todos.syncGraphQL({
            url: syncURL,
            headers: {
                'Authorization': `Bearer ${auth.idToken}`
            },
            push: {
                batchSize,
                queryBuilder: pushQueryBuilder
            },
            pull: {
                queryBuilder: pullQueryBuilder(auth.userId)
            },

            live: true,

            liveInterval: 1000 * 60 * 10,
            deletedFlag: 'deleted',
            syncRevisions: true,
        });

       ...
    }

Musimy również dodać pole tekstowe last_pulled_rev do schematu RxDB. To pole jest używane wewnętrznie przez wtyczkę, aby uniknąć wypychania wersji pobranych z serwera z powrotem na serwer.

const todoSchema = {
    ...
    'properties': {
        ...
        'last_pulled_rev': {
            'type': 'string'
        }
    },
    ...
};

Na koniec musimy zmienić kreatory zapytań typu „pull i push”, aby zsynchronizować informacje związane z wersją

Wyciągnij Konstruktor zapytań

const pullQueryBuilder = (userId) => {
    return (doc) => {
        if (!doc) {
            doc = {
                id: '',
                updatedAt: new Date(0).toUTCString()
            };
        }

        const query = `{
            todos(
                where: {
                    _or: [
                        {updatedAt: {_gt: "${doc.updatedAt}"}},
                        {
                            updatedAt: {_eq: "${doc.updatedAt}"},
                            id: {_gt: "${doc.id}"}
                        }
                    ],
                    userId: {_eq: "${userId}"} 
                },
                limit: ${batchSize},
                order_by: [{updatedAt: asc}, {id: asc}]
            ) {
                id
                text
                isCompleted
                deleted
                createdAt
                updatedAt
                userId
                _rev
                _revisions
            }
        }`;
        return {
            query,
            variables: {}
        };
    };
};

Teraz pobieramy pola _rev i _revisions. Zaktualizowana wtyczka użyje tych pól do tworzenia lokalnych wersji PouchDB.

Narzędzie do tworzenia zapytań wypychanych


const pushQueryBuilder = doc => {
    const query = `
        mutation InsertTodo($todo: [todos_insert_input!]!) {
            insert_todos(objects: $todo){
                returning {
                  id
                }
            }
       }
    `;

    const depth = doc._revisions.start;
    const parent_rev = depth == 1 ? null : `${depth - 1}-${doc._revisions.ids[1]}`

    const todo = Object.assign({}, doc, {
        _depth: depth,
        _parent_rev: parent_rev
    })

    delete todo['updatedAt']

    const variables = {
        todo: todo
    };

    return {
        query,
        variables
    };
};

W zaktualizowanej wtyczce parametr wejściowy doc teraz zawiera _rev i _revisions pola. Przechodzimy do Hasury w zapytaniu GraphQL. Dodajemy pola _depth , _parent_rev do doc zanim to zrobisz.

Wcześniej używaliśmy upsert do wstawiania lub aktualizowania todo rekord na Hasurze. Teraz, ponieważ każda wersja staje się nowym rekordem, zamiast tego używamy zwykłej starej mutacji insertu.

Wdrażanie rozwiązywania konfliktów

Jeśli dwóch różnych klientów wprowadzi teraz sprzeczne zmiany, obie wersje zostaną zsynchronizowane i będą obecne w Hasura. Obaj klienci ostatecznie otrzymają również drugą wersję. Ponieważ strategia rozwiązywania konfliktów PouchDB jest deterministyczna, obaj klienci wybiorą tę samą wersję jako „zwycięską wersję”.

Jak możemy znaleźć tę zwycięską wersję na serwerze? Będziemy musieli zaimplementować ten sam algorytm w SQL.

Implementacja algorytmu rozwiązywania konfliktów CouchDB w Postgresie

Krok 1:znajdowanie węzłów liści nieoznaczonych jako usunięte

Aby to zrobić, musimy zignorować wszystkie wersje, które mają wersję podrzędną i wszystkie wersje, które są oznaczone jako usunięte:

    SELECT
        id,
        _rev,
        _depth
    FROM
        todos
    WHERE
        NOT EXISTS (
            SELECT
                id
            FROM
                todos AS t
            WHERE
                todos.id = t.id
                AND t._parent_rev = todos._rev)
            AND deleted = FALSE

Krok 2:Znajdź łańcuch o maksymalnej głębokości

Zakładając, że mamy wyniki z powyższego zapytania w tabeli (lub widoku lub klauzuli with) zwanej liśćmi, możemy znaleźć łańcuch o maksymalnej głębokości jest prosty:

    SELECT
        id,
        MAX(_depth) AS max_depth
    FROM
        leaves
    GROUP BY
        id

Krok 3:znajdowanie zwycięskich wersji wśród wersji o równej maksymalnej głębokości

Ponownie zakładając, że wyniki z powyższego zapytania znajdują się w tabeli (lub widoku lub klauzuli with) o nazwie max_depth, możemy znaleźć zwycięską wersję w następujący sposób:

    SELECT
        leaves.id,
        MAX(leaves._rev) AS _rev
    FROM
        leaves
        JOIN max_depths ON leaves.id = max_depths.id
            AND leaves._depth = max_depths.max_depth
    GROUP BY
        leaves.id

Tworzenie widoku ze zwycięskimi wersjami

Łącząc powyższe trzy zapytania, możemy stworzyć widok, który pokazuje nam zwycięskie wersje w następujący sposób:

CREATE OR REPLACE VIEW todos_current_revisions AS
WITH leaves AS (
    SELECT
        id,
        _rev,
        _depth
    FROM
        todos
    WHERE
        NOT EXISTS (
            SELECT
                id
            FROM
                todos AS t
            WHERE
                todos.id = t.id
                AND t._parent_rev = todos._rev)
            AND deleted = FALSE
),
max_depths AS (
    SELECT
        id,
        MAX(_depth) AS max_depth
    FROM
        leaves
    GROUP BY
        id
),
winning_revisions AS (
    SELECT
        leaves.id,
        MAX(leaves._rev) AS _rev
    FROM
        leaves
        JOIN max_depths ON leaves.id = max_depths.id
            AND leaves._depth = max_depths.max_depth
    GROUP BY
        (leaves.id))
SELECT
    todos.*
FROM
    todos
    JOIN winning_revisions ON todos._rev = winning_revisions._rev;

Ponieważ Hasura może śledzić widoki i umożliwia wysyłanie zapytań za pośrednictwem GraphQL, zwycięskie wersje mogą być teraz udostępniane innym klientom i usługom.

Za każdym razem, gdy wysyłasz zapytanie do widoku, Postgres po prostu zastępuje widok zapytaniem w definicji widoku i uruchamia wynikowe zapytanie. Jeśli często wysyłasz zapytania do widoku, może to prowadzić do wielu zmarnowanych cykli procesora. Możemy to zoptymalizować, używając wyzwalaczy Postgres i przechowując zwycięskie wersje w innej tabeli.

Używanie wyzwalaczy Postgres do obliczania zwycięskich wersji

Krok 1:utwórz nową tabelę todos_current_revisions

Schemat będzie taki sam jak w todos stół. Kluczem podstawowym będzie jednak id kolumna zamiast (id, _rev)

Krok 2:Utwórz wyzwalacz Postgres

Możemy napisać zapytanie dla wyzwalacza, zaczynając od zapytania widoku. Ponieważ funkcja wyzwalacza będzie działać dla jednego wiersza na raz, możemy uprościć zapytanie:

CREATE OR REPLACE FUNCTION calculate_winning_revision ()
    RETURNS TRIGGER
    AS $BODY$
BEGIN
    INSERT INTO todos_current_revisions WITH leaves AS (
        SELECT
            id,
            _rev,
            _depth
        FROM
            todos
        WHERE
            NOT EXISTS (
                SELECT
                    id
                FROM
                    todos AS t
                WHERE
                    t.id = NEW.id
                    AND t._parent_rev = todos._rev)
                AND deleted = FALSE
                AND id = NEW.id
        ),
        max_depths AS (
            SELECT
                MAX(_depth) AS max_depth
            FROM
                leaves
        ),
        winning_revisions AS (
            SELECT
                MAX(leaves._rev) AS _rev
            FROM
                leaves
                JOIN max_depths ON leaves._depth = max_depths.max_depth
        )
        SELECT
            todos.*
        FROM
            todos
            JOIN winning_revisions ON todos._rev = winning_revisions._rev
    ON CONFLICT ON CONSTRAINT todos_winning_revisions_pkey
        DO UPDATE SET
            _rev = EXCLUDED._rev,
            _revisions = EXCLUDED._revisions,
            _parent_rev = EXCLUDED._parent_rev,
            _depth = EXCLUDED._depth,
            text = EXCLUDED.text,
            "updatedAt" = EXCLUDED."updatedAt",
            deleted = EXCLUDED.deleted,
            "userId" = EXCLUDED."userId",
            "createdAt" = EXCLUDED."createdAt",
            "isCompleted" = EXCLUDED."isCompleted";
    RETURN NEW;
END;
$BODY$
LANGUAGE plpgsql;

CREATE TRIGGER trigger_insert_todos
    AFTER INSERT ON todos
    FOR EACH ROW
    EXECUTE PROCEDURE calculate_winning_revision ()

Otóż ​​to! Możemy teraz wysyłać zapytania do zwycięskich wersji zarówno na serwerze, jak i na kliencie.

Niestandardowe rozwiązywanie konfliktów

Przyjrzyjmy się teraz implementacji niestandardowego rozwiązywania konfliktów za pomocą Hasura i RxDB.

Niestandardowe rozwiązywanie konfliktów po stronie serwera

Powiedzmy, że chcemy scalić listę rzeczy do zrobienia według pól. Jak mamy to zrobić? Poniższy opis pokazuje nam to:

Ten SQL wygląda na dużo, ale jedyną częścią, która zajmuje się rzeczywistą strategią łączenia, jest to:

CREATE OR REPLACE FUNCTION merge_revisions (item1 jsonb, item2 jsonb)
    RETURNS jsonb
    AS $$
BEGIN
    IF NOT item1 ? 'id' THEN
        RETURN item2;
    ELSE
        RETURN item1 || (item2 -> 'diff');
    END IF;
END;
$$
LANGUAGE plpgsql;

CREATE OR REPLACE AGGREGATE agg_merge_revisions (jsonb) (
    INITCOND = '{}',
    STYPE = jsonb,
    SFUNC = merge_revisions
);

Tutaj deklarujemy niestandardową funkcję agregującą Postgres agg_merge_revisions do łączenia elementów. Sposób, w jaki to działa, jest podobny do funkcji „reduce”:Postgres zainicjuje zagregowaną wartość do '{}' , a następnie uruchom merge_revisions funkcji z bieżącym agregatem i kolejnym elementem do scalenia. Więc gdybyśmy mieli 3 sprzeczne wersje do połączenia, wynik byłby następujący:

merge_revisions(merge_revisions(merge_revisions('{}', v1), v2), v3)

Jeśli chcemy wdrożyć inną strategię, będziemy musieli zmienić merge_revisions funkcjonować. Na przykład, jeśli chcemy wdrożyć strategię „ostatni zapis wygrywa”:

CREATE OR REPLACE FUNCTION merge_revisions (item1 jsonb, item2 jsonb)
    RETURNS jsonb
    AS $$
BEGIN
    IF NOT (item1 ? 'id') THEN
        RETURN item2;
    ELSE
        IF (item2 -> 'updatedAt') > (item1 -> 'updatedAt') THEN
            RETURN item2
        ELSE
            RETURN item1
        END IF;
    END IF;
END;
$$
LANGUAGE plpgsql;

Zapytanie wstawiania w powyższym opisie może zostać uruchomione w wyzwalaczu wstawiania po wstawieniu, aby automatycznie scalać konflikty, gdy tylko wystąpią.

Uwaga: Powyżej użyliśmy SQL do implementacji niestandardowego rozwiązywania konfliktów. Alternatywnym podejściem jest użycie zapisu akcji:

  1. Utwórz niestandardową mutację do obsługi insertu zamiast domyślnej automatycznie generowanej mutacji insertu.
  2. W module obsługi akcji utwórz nową wersję rekordu. Możemy do tego użyć mutacji insertowej Hasura.
  3. Pobierz wszystkie wersje obiektu za pomocą zapytania listy
  4. Wykryj wszelkie konflikty, przeglądając drzewo wersji.
  5. Zapisz scaloną wersję.

To podejście przypadnie Ci do gustu, jeśli wolisz napisać tę logikę w języku innym niż SQL. Innym podejściem jest utworzenie widoku SQL, aby pokazać sprzeczne wersje i zaimplementować pozostałą logikę w procedurze obsługi akcji. Uprości to krok 4. powyżej, ponieważ teraz możemy po prostu zapytać o widok w celu wykrycia konfliktów.

Niestandardowe rozwiązywanie konfliktów po stronie klienta

Istnieją scenariusze, w których potrzebna jest interwencja użytkownika, aby móc rozwiązać konflikt. Na przykład, gdybyśmy tworzyli coś takiego jak aplikacja Trello, a dwóch użytkowników zmodyfikowało opis tego samego zadania, możesz chcieć pokazać użytkownikowi obie wersje i pozwolić mu utworzyć wersję scaloną. W tych scenariuszach będziemy musieli rozwiązać konflikt po stronie klienta.

Rozwiązywanie konfliktów po stronie klienta jest prostsze do wdrożenia, ponieważ PouchDB już udostępnia API do zapytań o sprzeczne wersje. Jeśli spojrzymy na todos Kolekcja RxDB z poprzedniego postu, oto jak możemy pobrać sprzeczne wersje:

todos.pouch.get(todo.id, {
    conflicts: true
})

Powyższe zapytanie wypełniłoby sprzeczne wersje w _conflicts pole w wyniku. Możemy następnie przedstawić je użytkownikowi do rozwiązania.

Wniosek

PouchDB jest dostarczany z elastyczną i wydajną konstrukcją do wersjonowania i rozwiązania do zarządzania konfliktami. Ten post pokazał nam, jak używać tych konstrukcji z Hasura/Postgres. W tym poście skupiliśmy się na robieniu tego za pomocą plpgsql. Zrobimy kolejny post pokazujący, jak to zrobić za pomocą Actions, abyś mógł używać wybranego języka na zapleczu!

Podobał Ci się ten artykuł? Dołącz do nas na Discordzie, aby uzyskać więcej dyskusji na temat Hasura i GraphQL!

Zapisz się do naszego newslettera, aby wiedzieć, kiedy publikujemy nowe artykuły.


  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. Instalowanie Apache CouchDB na CentOS 7

  2. Jak zainstalować Apache CouchDB 2.3.0 w systemie Linux?

  3. Jak zainstalować CouchDB na Debianie 10?

  4. Instalowanie Apache CouchDB na Debianie 9

  5. Zainstaluj CouchDB na Debianie 9