Veuillez activer Javascript pour voir le contenu

[Azure] Pattern Request/Response avec Azure Service Bus

· ☕ 6 min de lecture

Une des façons les plus simples et courantes d’utiliser les services de messagerie tels qu’Azure Service Bus est pour effectuer une communication unidirectionnel.

request
source: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-overview

Exemple: Bob en tant qu’envoyeur de message et Eve en tant que réceptionniste.
      1. Eve écoute la file d’attente Q1
      2. Bob envoie un message sur Q1
      3. Eve reçoit le message de Bob à partir Q1

Mais comment Bob peut-il savoir que le traitement de Eve est terminé ?
Ou plus généralement comment ces 2 services peuvent-ils communiquer de manière bidirectionnelle?

C’est là que le pattern Request/Response (ou Request/Reply) intervient.

Le pattern Request/Response

Ce pattern utilise un routage de message afin d’obtenir une communication bidirectionnel entre deux services.
Fonctionnellement c’est très simple, l’envoyeur envoie un message sur une file d’attente et attend une réponse sur une autre file d’attente.

request/response simple
source: https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReplyJmsExample.html

Il existe 4 types de routage principaux qui sont très bien détaillés ici : https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messages-payloads
Dans la version “la plus simple” (Request/Response simple), il n’existe aucune garantie que Bob reçoit son message de réponse. En effet si un nouvel utilisateur nommé Alice écoute la même file d’attente, elle pourrait intercepter le message destiné à Bob.

request/response simple
Concurrence entre Bob et Alice:

Naturellement une idée pourrait venir en tête assez rapidement : Bob créé une file d’attente à lui et attend la réponse dessus. Une fois la réponse obtenue il supprime la file d’attente.
C’est une logique correcte, mais qui n’est pas à faire, car:

  • elle implique une gestion des services morts (que faire si le processus crash sans avoir supprimé son service ? Nous devons développer un cleaner en externe ? Doit-on supprimer le service avec les lettres mortes associées qui fournissent de précisieuses informations pour régler un potentiel bug ?…)
  • elle n’est évidemment pas performante (il faudra à chaque fois créer et supprimer la queue, dans des fréquences parfois élevées)
  • elle introduit une pollution du Service Bus Namespace qui le rend difficile/impossible à analyser
  • les Service Bus Namespace ont une limite sur la quantité de file d’attente/topic pouvant être créé
  • …etc

En bref beaucoup de problèmes pour une solution personnalisée, et le standard est toujours à privilégier. N’en existe-t-il pas un ?
Justement si, grace au protocole AMQP qui apporte une notion de groupe, retranscrite chez Microsoft sous le nom session qui résout très simplement et rapidement cette problématique.

Les sessions (groupe)

Pour garantir que Bob soit le seul à récupérer le message de réponse qui lui est destiné, nous allons utiliser les sessions (ou plus précisement les groupes du protocole AMQP), qui pour faire simple apporte une multipléxage et ainsi plusieurs utilisateurs peuvent écouter la même file tout en ayant chacun leur propre groupe de message.
Bob écoute et verrouille une session spécifique sur la file d’attente, ainsi lui seul pourra écouter ses messages (à l’image d’une sous file d’attente réservée à Bob).
Bob devra fournir des informations supplémentaires à son message afin que le réceptionneur (Eve) puisse envoyer correctement le message de réponse.
Techniquement cela se traduit par deux propriétés définies par le protocole AMQP 1.0 :

le nom des champs du protocole AMQP ne sont pas forcement les mêmes que ceux de l’API (ex: group-id dans le protocole = SessionId dans l’API)
  • ReplyTo (reply-to): chemin d’accès où Bob attend la réponse. Eve y enverra son message de réponse.
  • ReplyToSessionId (reply-to-group-id): l’id de la session écoutée par Bob. Eve assignera cette valeur à la propriété SessionId (group-id) de son message de réponse.

Exemple

L’exemple sera réalisé avec Microsoft.Azure.ServiceBus et de deux files d’attente.
Techniquement il est indispensable que la file d’attente de réponse n’accepte que les sessions.
Cela peut se faire via le portail Azure.

activate session
source: https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-sessions

Ou via le code avec la propriété RequiresSession.

1
2
3
4
5
var managementClient = new ManagementClient(connectionString);
await managementClient.CreateQueueAsync(new QueueDescription(queueName)
{
    RequiresSession = true
});

Creation des files d’attente

Dans un premier temps nous allons créer pour la démonstration 2 files d’attente:

  • sample.request: sans session, utilisée pour envoyer les messages de Bob à traiter par Eve.
  • sample.retry: avec session, utilisée pour envoyer la réponse à Bob.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static async Task Main()
{
  var connectionString = "<your_connection_string>";
  var requestQueueName = "sample.request";
  var replyQueueName = "sample.reply";

  // create queues
  await Task.WhenAll(CreateQueueAsync(connectionString, requestQueueName, false), CreateQueueAsync(connectionString, replyQueueName, true));
}

// /!\ IT'S JUST FOR DEMO
static async Task CreateQueueAsync(string connectionString, string queueName, bool requiresSession)
{
  var managementClient = new ManagementClient(connectionString);
  if (await managementClient.QueueExistsAsync(queueName))
  {
      await managementClient.DeleteQueueAsync(queueName);
  }
  await managementClient.CreateQueueAsync(new QueueDescription(queueName)
  {
      RequiresSession = requiresSession
  });
}

Pour la démonstration chaque service sera simulé par un thread.

Thread de l’envoyeur (Bob)

Bob envoie un message avec la propriété ReplyTo égal au chemin d’accès de la file d’attente de réponse et ReplyToSessionId égal à l’identifiant de sa session.
Une fois le message envoyé Bob écoute sa session.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

public static class SampleThreadFactory
{
...
  public static Thread CreateRequestor(string threadId, string connectionString, string requestQueueName, string replyQueueName)
  {
    return new Thread(async () =>
    {
      /*** send message ***/
      var messageSender = new MessageSender(connectionString, requestQueueName);
      var sessionId = "session-" + threadId;

      var message = new Message
      {
          MessageId = threadId,
          ReplyTo = new ServiceBusConnectionStringBuilder(connectionString) { EntityPath = replyQueueName }.ToString(),
          ReplyToSessionId = sessionId,
          TimeToLive = TimeSpan.FromMinutes(2)
      };

      await messageSender.SendAsync(message);
      await messageSender.CloseAsync();
      /*** send message ***/

      /*** wait response ***/
      SessionClient sessionClient = new SessionClient(connectionString, replyQueueName);
      var session = await sessionClient.AcceptMessageSessionAsync(sessionId);

      Message sessionMessage = await session.ReceiveAsync(TimeSpan.FromMinutes(2));

      await session.CompleteAsync(sessionMessage.SystemProperties.LockToken);
      await session.CloseAsync();
      await sessionClient.CloseAsync();
      /*** wait response ***/
    });
  }
}

Thread du répondeur (Eve)

Eve écoute la file d’attente sur laquelle Bob envoie ses messages.
Une fois qu’un message est intercepté, elle envoie un message de réponse sur le chemin d’accès défini par la propriété ReplyTo.
Le message de réponse envoyé doit contenir la propriété SessionId définie à partir la propriété ReplyToSessionId du message intercepté (et pour le suivi ainsi qu’une standardisation, la propriété CorrelationId est égal à l’id (MessageId) du message réceptionné).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public static class SampleThreadFactory
{
  public static Thread CreateReplier(string threadId, string connectionString, string requestQueueName)
  {
    return new Thread(() =>
    {
      var messageReceiver = new MessageReceiver(connectionString, requestQueueName);
      messageReceiver.RegisterMessageHandler(
        async (message, cancellationToken) =>
        {
          var connectionStringBuilder = new ServiceBusConnectionStringBuilder(message.ReplyTo);
          var replyToQueue = new MessageSender(connectionStringBuilder);
          var replyMessage = new Message(Encoding.UTF8.GetBytes($"processed by {threadId}"))
          {
              CorrelationId = message.MessageId,
              SessionId = message.ReplyToSessionId,
              TimeToLive = TimeSpan.FromMinutes(1)
          };

          /****  Simulate an action  *****/
          await Task.Delay(new Random().Next(1000, 2000), cancellationToken);
          /*******************************/

          await replyToQueue.SendAsync(replyMessage);
        },
        new MessageHandlerOptions(args => throw args.Exception)
        {
            MaxConcurrentCalls = 10
        });
    });
  }

Initialisation du contexte de test

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
static async Task Main()
{
  var connectionString = "<your_connection_string>";
  var requestQueueName = "sample.request";
  var replyQueueName = "sample.reply";
  // create queues
  await Task.WhenAll(CreateQueueAsync(connectionString, requestQueueName, false), CreateQueueAsync(connectionString, replyQueueName, true));
  // start all
  Parallel.ForEach(new List<Thread>
  {
    SampleThreadFactory.CreateRequestor("REQUESTOR-BOB", connectionString, requestQueueName, replyQueueName),
    SampleThreadFactory.CreateReplier("REPLIER-EVE", connectionString, requestQueueName)
  }, (thread, state) => thread.Start());
  Console.Read();
}

Résulat

result-single

Et aucun problème avec plusieurs envoyeurs, chacun reçoit le message de réponse qui lui est destiné.

result-multiple

Voilà!

Sources

Repository

Documentation

Partager sur

Jérémy Landon
ÉCRIT PAR
Jérémy Landon
Freelance / Author / Speaker / Open source contributor

Contenu de cette page