W moim ostatnim artykule mówiłem o korzyściach płynących z implementacji przetwarzania asynchronicznego za pomocą Service Broker w SQL Server w porównaniu z innymi metodami, które istnieją do oddzielonego przetwarzania długich zadań. W tym artykule omówimy wszystkie składniki, które należy skonfigurować dla podstawowej konfiguracji Service Broker w jednej bazie danych, oraz ważne kwestie dotyczące zarządzania konwersacją między usługami brokera. Aby rozpocząć, musimy utworzyć bazę danych i włączyć bazę danych do użytku Service Broker:
CREATE DATABASE AsyncProcessingDemo; GO IF (SELECT is_broker_enabled FROM sys.databases WHERE name = N'AsyncProcessingDemo') = 0 BEGIN ALTER DATABASE AsyncProcessingDemo SET ENABLE_BROKER; END GO USE AsyncProcessingDemo; GO
Konfigurowanie komponentów brokera
Podstawowymi obiektami, które należy utworzyć w bazie danych, są typy komunikatów dla komunikatów, kontrakt definiujący sposób przesyłania komunikatów między usługami, kolejka i usługa inicjatora oraz kolejka i usługa docelowa. Wiele przykładów online dla brokera usług pokazuje złożone nazewnictwo obiektów dla typów komunikatów, kontraktów i usług dla brokera usług. Jednak nie ma wymogu, aby nazwy były złożone, a proste nazwy obiektów mogą być używane dla dowolnego obiektu.
W przypadku wiadomości będziemy musieli utworzyć typ wiadomości dla żądania, który będzie się nazywał AsyncRequest
i typ wiadomości dla wyniku, który będzie nazywał się AsyncResult
. Oba będą używać kodu XML, który zostanie zweryfikowany jako poprawnie utworzony przez usługi brokera do wysyłania i odbierania danych wymaganych przez usługi.
-- Create the message types CREATE MESSAGE TYPE [AsyncRequest] VALIDATION = WELL_FORMED_XML; CREATE MESSAGE TYPE [AsyncResult] VALIDATION = WELL_FORMED_XML;
Umowa określa, że AsyncRequest
zostanie wysłany przez usługę inicjującą do usługi docelowej i że usługa docelowa zwróci AsyncResult
wiadomość z powrotem do usługi inicjującej. Umowa może również określać wiele typów wiadomości dla inicjatora i celu lub że określony typ wiadomości może być wysłany przez dowolną usługę, jeśli wymaga tego określone przetwarzanie.
-- Create the contract CREATE CONTRACT [AsyncContract] ( [AsyncRequest] SENT BY INITIATOR, [AsyncResult] SENT BY TARGET );
Dla każdej usługi należy utworzyć kolejkę, aby zapewnić przechowywanie wiadomości odebranych przez usługę. Usługa docelowa, do której zostanie wysłane żądanie, musi zostać utworzona, określając AsyncContract
aby umożliwić wysyłanie wiadomości do usługi. W tym przypadku usługa nazywa się ProcessingService
i zostanie utworzony w ProcessingQueue
w bazie danych. Usługa inicjowania nie wymaga określenia umowy, co umożliwia otrzymywanie wiadomości tylko w odpowiedzi na rozmowę, która została z niej zainicjowana.
-- Create the processing queue and service - specify the contract to allow sending to the service CREATE QUEUE ProcessingQueue; CREATE SERVICE [ProcessingService] ON QUEUE ProcessingQueue ([AsyncContract]); -- Create the request queue and service CREATE QUEUE RequestQueue; CREATE SERVICE [RequestService] ON QUEUE RequestQueue;
Wysyłanie wiadomości do przetworzenia
Jak wyjaśniłem w poprzednim artykule, wolę zaimplementować opakowującą procedurę składowaną do wysyłania nowej wiadomości do usługi brokera, aby w razie potrzeby mogła zostać zmodyfikowana w celu skalowania wydajności. Ta procedura jest prostym opakowaniem wokół tworzenia nowej konwersacji i wysyłania wiadomości do ProcessingService
.
-- Create the wrapper procedure for sending messages CREATE PROCEDURE dbo.SendBrokerMessage @FromService SYSNAME, @ToService SYSNAME, @Contract SYSNAME, @MessageType SYSNAME, @MessageBody XML AS BEGIN SET NOCOUNT ON; DECLARE @conversation_handle UNIQUEIDENTIFIER; BEGIN TRANSACTION; BEGIN DIALOG CONVERSATION @conversation_handle FROM SERVICE @FromService TO SERVICE @ToService ON CONTRACT @Contract WITH ENCRYPTION = OFF; SEND ON CONVERSATION @conversation_handle MESSAGE TYPE @MessageType(@MessageBody); COMMIT TRANSACTION; END GO
Używając opakowującej procedury składowanej, możemy teraz wysłać wiadomość testową do ProcessingService
aby sprawdzić, czy poprawnie skonfigurowaliśmy usługi brokera.
-- Send a request EXECUTE dbo.SendBrokerMessage @FromService = N'RequestService', @ToService = N'ProcessingService', @Contract = N'AsyncContract', @MessageType = N'AsyncRequest', @MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>'; -- Check for message on processing queue SELECT CAST(message_body AS XML) FROM ProcessingQueue; GO
Przetwarzanie wiadomości
Chociaż moglibyśmy ręcznie przetwarzać wiadomości z ProcessingQueue
, prawdopodobnie będziemy chcieli, aby wiadomości były przetwarzane automatycznie, gdy są wysyłane do ProcessingService
. Aby to zrobić, należy utworzyć procedurę składowaną aktywacji, którą przetestujemy, a następnie powiążemy z kolejką, aby zautomatyzować przetwarzanie po aktywacji kolejki. Aby przetworzyć wiadomość, musimy RECEIVE
wiadomość z kolejki w ramach transakcji, wraz z typem wiadomości i uchwytem konwersacji dla wiadomości. Typ wiadomości zapewnia zastosowanie odpowiedniej logiki do przetwarzanej wiadomości, a uchwyt konwersacji umożliwia odesłanie odpowiedzi do usługi inicjującej po przetworzeniu wiadomości.
RECEIVE
polecenie umożliwia przetwarzanie pojedynczej wiadomości lub wielu wiadomości w ramach tego samego uchwytu konwersacji lub grupy w ramach jednej transakcji. Aby przetworzyć wiele komunikatów, należy użyć zmiennej tabeli, a do przetwarzania pojedynczego komunikatu można użyć zmiennej lokalnej. Poniższa procedura aktywacji pobiera pojedynczą wiadomość z kolejki, sprawdza typ wiadomości, aby określić, czy jest to AsyncRequest
komunikat, a następnie wykonuje długotrwały proces na podstawie otrzymanych informacji o komunikacie. Jeśli nie otrzyma komunikatu w pętli, poczeka do 5000 ms lub 5 sekund, aż kolejna wiadomość wejdzie do kolejki przed wyjściem z pętli i zakończeniem jej wykonywania. Po przetworzeniu wiadomości buduje AsyncResult
wiadomość i wysyła ją z powrotem do inicjatora w tym samym uchwycie konwersacji, z którego odebrano wiadomość. Procedura sprawdza również typ wiadomości, aby określić, czy EndDialog
lub Error
odebrano wiadomość, aby posprzątać rozmowę przez jej zakończenie.
-- Create processing procedure for processing queue CREATE PROCEDURE dbo.ProcessingQueueActivation AS BEGIN SET NOCOUNT ON; DECLARE @conversation_handle UNIQUEIDENTIFIER; DECLARE @message_body XML; DECLARE @message_type_name sysname; WHILE (1=1) BEGIN BEGIN TRANSACTION; WAITFOR ( RECEIVE TOP (1) @conversation_handle = conversation_handle, @message_body = CAST(message_body AS XML), @message_type_name = message_type_name FROM ProcessingQueue ), TIMEOUT 5000; IF (@@ROWCOUNT = 0) BEGIN ROLLBACK TRANSACTION; BREAK; END IF @message_type_name = N'AsyncRequest' BEGIN -- Handle complex long processing here -- For demonstration we'll pull the account number and send a reply back only DECLARE @AccountNumber INT = @message_body.value('(AsyncRequest/AccountNumber)[1]', 'INT'); -- Build reply message and send back DECLARE @reply_message_body XML = N' ' + CAST(@AccountNumber AS NVARCHAR(11)) + ' '; SEND ON CONVERSATION @conversation_handle MESSAGE TYPE [AsyncResult] (@reply_message_body); END -- If end dialog message, end the dialog ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN END CONVERSATION @conversation_handle; END -- If error message, log and end conversation ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error' BEGIN -- Log the error code and perform any required handling here -- End the conversation for the error END CONVERSATION @conversation_handle; END COMMIT TRANSACTION; END END GO
RequestQueue
będzie również musiał przetwarzać wiadomości, które są do niego wysyłane, więc dodatkowa procedura przetwarzania AsyncResult
komunikaty zwracane przez procedurę ProcessingQueueActivation muszą zostać utworzone. Ponieważ wiemy, że wiadomość AsnycResult oznacza, że wszystkie prace związane z przetwarzaniem zostały zakończone, konwersację można zakończyć po przetworzeniu tej wiadomości, która wyśle wiadomość EndDialog do ProcessingService, która zostanie następnie przetworzona przez procedurę aktywacji w celu zakończenia rozmowa, sprzątanie wszystkiego i unikanie pożaru oraz zapominanie o problemach, które pojawiają się, gdy rozmowa kończy się prawidłowo.
-- Create procedure for processing replies to the request queue CREATE PROCEDURE dbo.RequestQueueActivation AS BEGIN SET NOCOUNT ON; DECLARE @conversation_handle UNIQUEIDENTIFIER; DECLARE @message_body XML; DECLARE @message_type_name sysname; WHILE (1=1) BEGIN BEGIN TRANSACTION; WAITFOR ( RECEIVE TOP (1) @conversation_handle = conversation_handle, @message_body = CAST(message_body AS XML), @message_type_name = message_type_name FROM RequestQueue ), TIMEOUT 5000; IF (@@ROWCOUNT = 0) BEGIN ROLLBACK TRANSACTION; BREAK; END IF @message_type_name = N'AsyncResult' BEGIN -- If necessary handle the reply message here DECLARE @AccountNumber INT = @message_body.value('(AsyncResult/AccountNumber)[1]', 'INT'); -- Since this is all the work being done, end the conversation to send the EndDialog message END CONVERSATION @conversation_handle; END -- If end dialog message, end the dialog ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN END CONVERSATION @conversation_handle; END -- If error message, log and end conversation ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error' BEGIN END CONVERSATION @conversation_handle; END COMMIT TRANSACTION; END END GO
Testowanie procedur
Przed zautomatyzowaniem przetwarzania kolejek dla naszych usług ważne jest, aby przetestować procedury aktywacji, aby upewnić się, że przetwarzają one komunikaty w odpowiedni sposób, oraz aby zapobiec wyłączeniu kolejki w przypadku wystąpienia błędu, który nie jest prawidłowo obsługiwany. Ponieważ istnieje już wiadomość w ProcessingQueue
ProcessingQueueActivation
Procedura może zostać wykonana w celu przetworzenia tej wiadomości. Pamiętaj, że WAITFOR
spowoduje, że procedura zakończy się po 5 sekundach, nawet jeśli wiadomość jest przetwarzana bezpośrednio z kolejki. Po przetworzeniu wiadomości możemy sprawdzić, czy procedura zadziałała poprawnie, odpytując RequestQueue
aby sprawdzić, czy AsyncResult
wiadomość istnieje, a następnie możemy sprawdzić, czy RequestQueueActivation
procedura działa poprawnie po jej wykonaniu.
-- Process the message from the processing queue EXECUTE dbo.ProcessingQueueActivation; GO -- Check for reply message on request queue SELECT CAST(message_body AS XML) FROM RequestQueue; GO -- Process the message from the request queue EXECUTE dbo.RequestQueueActivation; GO
Automatyzacja przetwarzania
W tym momencie wszystkie komponenty są gotowe, aby w pełni zautomatyzować nasze przetwarzanie. Pozostaje tylko powiązać procedury aktywacji z ich odpowiednimi kolejkami, a następnie wysłać kolejną wiadomość testową, aby sprawdzić, czy została przetworzona i nic nie pozostanie w kolejkach.
-- Alter the processing queue to specify internal activation ALTER QUEUE ProcessingQueue WITH ACTIVATION ( STATUS = ON, PROCEDURE_NAME = dbo.ProcessingQueueActivation, MAX_QUEUE_READERS = 10, EXECUTE AS SELF ); GO -- Alter the request queue to specify internal activation ALTER QUEUE RequestQueue WITH ACTIVATION ( STATUS = ON, PROCEDURE_NAME = dbo.RequestQueueActivation, MAX_QUEUE_READERS = 10, EXECUTE AS SELF ); GO -- Test automated activation -- Send a request EXECUTE dbo.SendBrokerMessage @FromService = N'RequestService', @ToService = N'ProcessingService', @Contract = N'AsyncContract', @MessageType = N'AsyncRequest', @MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>'; -- Check for message on processing queue -- nothing is there because it was automatically processed SELECT CAST(message_body AS XML) FROM ProcessingQueue; GO -- Check for reply message on request queue -- nothing is there because it was automatically processed SELECT CAST(message_body AS XML) FROM RequestQueue; GO
Podsumowanie
Podstawowe składniki do automatycznego przetwarzania asynchronicznego w programie SQL Server Service Broker można skonfigurować w konfiguracji pojedynczej bazy danych, aby umożliwić oddzielne przetwarzanie długotrwałych zadań. Może to być potężne narzędzie do poprawy wydajności aplikacji, na podstawie doświadczenia użytkownika końcowego, poprzez oddzielenie przetwarzania od interakcji użytkownika końcowego z aplikacją.