Database
 sql >> Baza danych >  >> RDS >> Database

Konfigurowanie Service Broker do przetwarzania asynchronicznego

W moim ostatnim artykule mówiłem o korzyściach płynących z implementacji przetwarzania asynchronicznego za pomocą Service Broker w SQL Server w porównaniu z innymi metodami, które istnieją do oddzielonego przetwarzania długich zadań. W tym artykule omówimy wszystkie składniki, które należy skonfigurować dla podstawowej konfiguracji Service Broker w jednej bazie danych, oraz ważne kwestie dotyczące zarządzania konwersacją między usługami brokera. Aby rozpocząć, musimy utworzyć bazę danych i włączyć bazę danych do użytku Service Broker:

CREATE DATABASE AsyncProcessingDemo;
GO
 
IF (SELECT is_broker_enabled FROM sys.databases WHERE name = N'AsyncProcessingDemo') = 0
BEGIN
  ALTER DATABASE AsyncProcessingDemo SET ENABLE_BROKER;
END
GO
 
USE AsyncProcessingDemo;
GO

Konfigurowanie komponentów brokera

Podstawowymi obiektami, które należy utworzyć w bazie danych, są typy komunikatów dla komunikatów, kontrakt definiujący sposób przesyłania komunikatów między usługami, kolejka i usługa inicjatora oraz kolejka i usługa docelowa. Wiele przykładów online dla brokera usług pokazuje złożone nazewnictwo obiektów dla typów komunikatów, kontraktów i usług dla brokera usług. Jednak nie ma wymogu, aby nazwy były złożone, a proste nazwy obiektów mogą być używane dla dowolnego obiektu.

W przypadku wiadomości będziemy musieli utworzyć typ wiadomości dla żądania, który będzie się nazywał AsyncRequest i typ wiadomości dla wyniku, który będzie nazywał się AsyncResult . Oba będą używać kodu XML, który zostanie zweryfikowany jako poprawnie utworzony przez usługi brokera do wysyłania i odbierania danych wymaganych przez usługi.

-- Create the message types
CREATE MESSAGE TYPE [AsyncRequest] VALIDATION = WELL_FORMED_XML;
CREATE MESSAGE TYPE [AsyncResult]  VALIDATION = WELL_FORMED_XML;

Umowa określa, że ​​AsyncRequest zostanie wysłany przez usługę inicjującą do usługi docelowej i że usługa docelowa zwróci AsyncResult wiadomość z powrotem do usługi inicjującej. Umowa może również określać wiele typów wiadomości dla inicjatora i celu lub że określony typ wiadomości może być wysłany przez dowolną usługę, jeśli wymaga tego określone przetwarzanie.

-- Create the contract
CREATE CONTRACT [AsyncContract] 
(
  [AsyncRequest] SENT BY INITIATOR, 
  [AsyncResult]  SENT BY TARGET
);

Dla każdej usługi należy utworzyć kolejkę, aby zapewnić przechowywanie wiadomości odebranych przez usługę. Usługa docelowa, do której zostanie wysłane żądanie, musi zostać utworzona, określając AsyncContract aby umożliwić wysyłanie wiadomości do usługi. W tym przypadku usługa nazywa się ProcessingService i zostanie utworzony w ProcessingQueue w bazie danych. Usługa inicjowania nie wymaga określenia umowy, co umożliwia otrzymywanie wiadomości tylko w odpowiedzi na rozmowę, która została z niej zainicjowana.

-- Create the processing queue and service - specify the contract to allow sending to the service
CREATE QUEUE ProcessingQueue;
CREATE SERVICE [ProcessingService] ON QUEUE ProcessingQueue ([AsyncContract]);
 
-- Create the request queue and service 
CREATE QUEUE RequestQueue;
CREATE SERVICE [RequestService] ON QUEUE RequestQueue;

Wysyłanie wiadomości do przetworzenia

Jak wyjaśniłem w poprzednim artykule, wolę zaimplementować opakowującą procedurę składowaną do wysyłania nowej wiadomości do usługi brokera, aby w razie potrzeby mogła zostać zmodyfikowana w celu skalowania wydajności. Ta procedura jest prostym opakowaniem wokół tworzenia nowej konwersacji i wysyłania wiadomości do ProcessingService .

-- Create the wrapper procedure for sending messages
CREATE PROCEDURE dbo.SendBrokerMessage 
	@FromService SYSNAME,
	@ToService   SYSNAME,
	@Contract    SYSNAME,
	@MessageType SYSNAME,
	@MessageBody XML
AS
BEGIN
  SET NOCOUNT ON;
 
  DECLARE @conversation_handle UNIQUEIDENTIFIER;
 
  BEGIN TRANSACTION;
 
  BEGIN DIALOG CONVERSATION @conversation_handle
    FROM SERVICE @FromService
    TO SERVICE @ToService
    ON CONTRACT @Contract
    WITH ENCRYPTION = OFF;
 
  SEND ON CONVERSATION @conversation_handle
    MESSAGE TYPE @MessageType(@MessageBody);
 
  COMMIT TRANSACTION;
END
GO

Używając opakowującej procedury składowanej, możemy teraz wysłać wiadomość testową do ProcessingService aby sprawdzić, czy poprawnie skonfigurowaliśmy usługi brokera.

-- Send a request
EXECUTE dbo.SendBrokerMessage
  @FromService = N'RequestService',
  @ToService   = N'ProcessingService',
  @Contract    = N'AsyncContract',
  @MessageType = N'AsyncRequest',
  @MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>';
 
-- Check for message on processing queue
SELECT CAST(message_body AS XML) FROM ProcessingQueue;
GO

Przetwarzanie wiadomości

Chociaż moglibyśmy ręcznie przetwarzać wiadomości z ProcessingQueue , prawdopodobnie będziemy chcieli, aby wiadomości były przetwarzane automatycznie, gdy są wysyłane do ProcessingService . Aby to zrobić, należy utworzyć procedurę składowaną aktywacji, którą przetestujemy, a następnie powiążemy z kolejką, aby zautomatyzować przetwarzanie po aktywacji kolejki. Aby przetworzyć wiadomość, musimy RECEIVE wiadomość z kolejki w ramach transakcji, wraz z typem wiadomości i uchwytem konwersacji dla wiadomości. Typ wiadomości zapewnia zastosowanie odpowiedniej logiki do przetwarzanej wiadomości, a uchwyt konwersacji umożliwia odesłanie odpowiedzi do usługi inicjującej po przetworzeniu wiadomości.

RECEIVE polecenie umożliwia przetwarzanie pojedynczej wiadomości lub wielu wiadomości w ramach tego samego uchwytu konwersacji lub grupy w ramach jednej transakcji. Aby przetworzyć wiele komunikatów, należy użyć zmiennej tabeli, a do przetwarzania pojedynczego komunikatu można użyć zmiennej lokalnej. Poniższa procedura aktywacji pobiera pojedynczą wiadomość z kolejki, sprawdza typ wiadomości, aby określić, czy jest to AsyncRequest komunikat, a następnie wykonuje długotrwały proces na podstawie otrzymanych informacji o komunikacie. Jeśli nie otrzyma komunikatu w pętli, poczeka do 5000 ms lub 5 sekund, aż kolejna wiadomość wejdzie do kolejki przed wyjściem z pętli i zakończeniem jej wykonywania. Po przetworzeniu wiadomości buduje AsyncResult wiadomość i wysyła ją z powrotem do inicjatora w tym samym uchwycie konwersacji, z którego odebrano wiadomość. Procedura sprawdza również typ wiadomości, aby określić, czy EndDialog lub Error odebrano wiadomość, aby posprzątać rozmowę przez jej zakończenie.

-- Create processing procedure for processing queue
CREATE PROCEDURE dbo.ProcessingQueueActivation
AS
BEGIN
  SET NOCOUNT ON;
 
  DECLARE @conversation_handle UNIQUEIDENTIFIER;
  DECLARE @message_body XML;
  DECLARE @message_type_name sysname;
 
  WHILE (1=1)
  BEGIN
    BEGIN TRANSACTION;
 
    WAITFOR
    (
      RECEIVE TOP (1)
        @conversation_handle = conversation_handle,
        @message_body = CAST(message_body AS XML),
        @message_type_name = message_type_name
      FROM ProcessingQueue
    ), TIMEOUT 5000;
 
    IF (@@ROWCOUNT = 0)
    BEGIN
      ROLLBACK TRANSACTION;
      BREAK;
    END
 
    IF @message_type_name = N'AsyncRequest'
    BEGIN
      -- Handle complex long processing here
      -- For demonstration we'll pull the account number and send a reply back only
 
      DECLARE @AccountNumber INT = @message_body.value('(AsyncRequest/AccountNumber)[1]', 'INT');
 
      -- Build reply message and send back
      DECLARE @reply_message_body XML = N'
        ' + CAST(@AccountNumber AS NVARCHAR(11)) + '
      ';
 
      SEND ON CONVERSATION @conversation_handle
        MESSAGE TYPE [AsyncResult] (@reply_message_body);
    END
 
    -- If end dialog message, end the dialog
    ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
    BEGIN
      END CONVERSATION @conversation_handle;
    END
 
    -- If error message, log and end conversation
    ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error'
    BEGIN
      -- Log the error code and perform any required handling here
      -- End the conversation for the error
      END CONVERSATION @conversation_handle;
    END
 
    COMMIT TRANSACTION;
  END
END
GO

RequestQueue będzie również musiał przetwarzać wiadomości, które są do niego wysyłane, więc dodatkowa procedura przetwarzania AsyncResult komunikaty zwracane przez procedurę ProcessingQueueActivation muszą zostać utworzone. Ponieważ wiemy, że wiadomość AsnycResult oznacza, że ​​wszystkie prace związane z przetwarzaniem zostały zakończone, konwersację można zakończyć po przetworzeniu tej wiadomości, która wyśle ​​wiadomość EndDialog do ProcessingService, która zostanie następnie przetworzona przez procedurę aktywacji w celu zakończenia rozmowa, sprzątanie wszystkiego i unikanie pożaru oraz zapominanie o problemach, które pojawiają się, gdy rozmowa kończy się prawidłowo.

-- Create procedure for processing replies to the request queue
CREATE PROCEDURE dbo.RequestQueueActivation
AS
BEGIN
  SET NOCOUNT ON;
 
  DECLARE @conversation_handle UNIQUEIDENTIFIER;
  DECLARE @message_body XML;
  DECLARE @message_type_name sysname;
 
  WHILE (1=1)
  BEGIN
    BEGIN TRANSACTION;
 
    WAITFOR
    (
      RECEIVE TOP (1)
        @conversation_handle = conversation_handle,
        @message_body = CAST(message_body AS XML),
        @message_type_name = message_type_name
      FROM RequestQueue
    ), TIMEOUT 5000;
 
    IF (@@ROWCOUNT = 0)
    BEGIN
      ROLLBACK TRANSACTION;
      BREAK;
    END
 
    IF @message_type_name = N'AsyncResult'
    BEGIN
      -- If necessary handle the reply message here
      DECLARE @AccountNumber INT = @message_body.value('(AsyncResult/AccountNumber)[1]', 'INT');
 
      -- Since this is all the work being done, end the conversation to send the EndDialog message
      END CONVERSATION @conversation_handle;
    END
 
    -- If end dialog message, end the dialog
    ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
    BEGIN
       END CONVERSATION @conversation_handle;
    END
 
    -- If error message, log and end conversation
    ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error'
    BEGIN
       END CONVERSATION @conversation_handle;
    END
 
    COMMIT TRANSACTION;
  END
END
GO

Testowanie procedur

Przed zautomatyzowaniem przetwarzania kolejek dla naszych usług ważne jest, aby przetestować procedury aktywacji, aby upewnić się, że przetwarzają one komunikaty w odpowiedni sposób, oraz aby zapobiec wyłączeniu kolejki w przypadku wystąpienia błędu, który nie jest prawidłowo obsługiwany. Ponieważ istnieje już wiadomość w ProcessingQueue ProcessingQueueActivation Procedura może zostać wykonana w celu przetworzenia tej wiadomości. Pamiętaj, że WAITFOR spowoduje, że procedura zakończy się po 5 sekundach, nawet jeśli wiadomość jest przetwarzana bezpośrednio z kolejki. Po przetworzeniu wiadomości możemy sprawdzić, czy procedura zadziałała poprawnie, odpytując RequestQueue aby sprawdzić, czy AsyncResult wiadomość istnieje, a następnie możemy sprawdzić, czy RequestQueueActivation procedura działa poprawnie po jej wykonaniu.

-- Process the message from the processing queue
EXECUTE dbo.ProcessingQueueActivation;
GO
 
-- Check for reply message on request queue
SELECT CAST(message_body AS XML) FROM RequestQueue;
GO
 
-- Process the message from the request queue
EXECUTE dbo.RequestQueueActivation;
GO

Automatyzacja przetwarzania

W tym momencie wszystkie komponenty są gotowe, aby w pełni zautomatyzować nasze przetwarzanie. Pozostaje tylko powiązać procedury aktywacji z ich odpowiednimi kolejkami, a następnie wysłać kolejną wiadomość testową, aby sprawdzić, czy została przetworzona i nic nie pozostanie w kolejkach.

-- Alter the processing queue to specify internal activation
ALTER QUEUE ProcessingQueue
    WITH ACTIVATION
    ( 
      STATUS = ON,
      PROCEDURE_NAME = dbo.ProcessingQueueActivation,
      MAX_QUEUE_READERS = 10,
      EXECUTE AS SELF
    );
GO
 
-- Alter the request queue to specify internal activation
ALTER QUEUE RequestQueue
    WITH ACTIVATION
    ( 
      STATUS = ON,
      PROCEDURE_NAME = dbo.RequestQueueActivation,
      MAX_QUEUE_READERS = 10,
      EXECUTE AS SELF
    );
GO
 
-- Test automated activation
-- Send a request
 
EXECUTE dbo.SendBrokerMessage
	@FromService = N'RequestService',
	@ToService   = N'ProcessingService',
	@Contract    = N'AsyncContract',
	@MessageType = N'AsyncRequest',
	@MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>';
 
-- Check for message on processing queue 
-- nothing is there because it was automatically processed
SELECT CAST(message_body AS XML) FROM ProcessingQueue;
GO
 
-- Check for reply message on request queue 
-- nothing is there because it was automatically processed
SELECT CAST(message_body AS XML) FROM RequestQueue;
GO

Podsumowanie

Podstawowe składniki do automatycznego przetwarzania asynchronicznego w programie SQL Server Service Broker można skonfigurować w konfiguracji pojedynczej bazy danych, aby umożliwić oddzielne przetwarzanie długotrwałych zadań. Może to być potężne narzędzie do poprawy wydajności aplikacji, na podstawie doświadczenia użytkownika końcowego, poprzez oddzielenie przetwarzania od interakcji użytkownika końcowego z aplikacją.


  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. Jak uzyskać wczorajszą datę w T-SQL

  2. Dowiedz się więcej o konkatenacji w SQL z przykładami

  3. Co może powiedzieć plan zapytań?

  4. Poziomy zgodności i podstawa szacowania kardynalności

  5. Praca z Salesforce.com w Alpha Anywhere