ZMQ: Création d’un Service Bus IPC avec 0mq

J’ai présenté la librairie de messaging ZMQ dans mon article une intro à 0mq que je vous recommande de lire avant celui-ci.

Cette fois, j’aborde un exemple de mise en oeuvre de ZMQ pour créer un Service Bus inter-processus, permettant de faire communiquer différentes applications par événements.

Les idées suivantes seront abordées :

  • Publication et réception des événements avec le pattern publisher/ subscriber.
  • Mise en oeuvre d’un event proxy avec des sockets xsubscriber et xpublisher.
  • Atténuation du « slow joiner problem ».
  • Sérialisation des échanges avec protobuf.

Mise à jour du 30 janvier 2016:
Entre aujourd’hui et le jour de rédaction de cet article, le code de NetMQ (ZMQ sur .NET) a subi beaucoup d’évolutions, sa communauté open source ayant été très active. Le code fourni dans cet article est devenu obsolète. Cependant les notions clés sont toujours les mêmes.

L’implémentation

Comme d’habitude, le code complet est sur GitHub:
http://github.com/eric-b/Samples/tree/master/ZmqServiceBus.

Cette fois, pas de programme console de démonstration mais une DLL de tests destinée à NUnit.

Le challenge

Design du Service Bus

Le bus sera exposé sous la forme des interfaces suivantes :

interface IServiceBus : IDisposable
{
    ISubscriber CreateSubscriber(params long[] subscribeToEventCodes);
    IPublisher CreatePublisher();
    void Release(ISubscriber instance);
    void Release(IPublisher instance);
}

interface ISubscriber
{
	event EventHandler<MessageEventArgs> OnMessage;
}

interface IPublisher
{
	void Publish<T>(long eventCode, T message);
}

Les composants ISubscriber et IPublisher encapsulent chacun un socket ZMQ respectivement de type subscriber et publisher.

Comme un socket ZMQ, les composants ISubscriber et IPublisher ne sont pas thread-safes. Il appartient au code utilisateur du bus de créer autant de sockets que nécessaires pour mettre en oeuvre une communication qui ne requiert pas le partage d’un socket entre plusieurs threads.

Il devrait paraître déjà évident que l’interface IServiceBus est basiquement une factory de IPublishers et ISubscribers. Cela est lié au fait que ZMQ requiert une gestion stricte du cycle de vie de ses sockets (ce point est abordée dans mon article précédent).

Rappels sur le pattern publisher / subscriber (dans le contexte de ZMQ)

Ce pattern est le plus classique pour la mise en place d’un Service Bus. Chaque composant applicatif peut publier un événement sur le bus via un socket Publisher, ou s’abonner à un événement du bus, via un socket Subscriber. Ainsi, un composant applicatif est en mesure d’émettre et de recevoir sur le bus via respectivement un publisher et un subscriber.

Dans le monde ZMQ, plusieurs particularités sont à prendre en considération :

  • Tout est asynchrone.
  • Les échanges suivant le pattern pub/sub sont comparables à une diffusion radio (broadcast).

Connexion et envois asynchrones

La mise en oeuvre d’un socket ZMQ passe par deux phases :

  • Connexion côté client / instable (ou binding côté serveur / stable)
  • Envoi de messages

Bien entendu, la réception de messages est asynchrone, mais cela est intuitif et ne pose pas de difficulté particulière.

Ce dont il faut bien avoir conscience est que la méthode socket.Connect() (ou socket.Bind()) retourne sans que le socket sous-jacent soit effectivement connecté. C’est pour cela que l’on voit souvent dans les exemples de code basés sur ZMQ des instructions Thread.Sleep().

Broadcasting

Lorsqu’un message est envoyé par un socket publisher, il est envoyé aux subscribers qui se sont auparavant abonnés au publisher. Si un subscriber s’abonne après la publication d’un message, ce message ne sera pas reçu par le subscriber. En cela, l’exemple d’une station de radio est bien choisi par le guide ZMQ: si la radio n’est pas allumée sur le bon canal, on perd ce qui est diffusé dessus.

Slow joiner problem

Les deux caractéristiques décrites ci-dessus aident à comprendre ce problème typique, propre au design de ZMQ. Le symptome observé est que malgré avoir connecté et abonné un subscriber à un publisher, aucun message publié n’est reçu par le subscriber, ou encore les premiers messages publiés sont manqués.

Voici la séquence d’événements :

  1. pub.Bind(...): Binding du socket publisher.
  2. sub.Subscribe(...): Abonnement du subscriber.
  3. sub.Connect(): Connexion d’un socket subscriber (la souscription définie précédemment est envoyée à ce moment là).
  4. pub.Send(...): Publication d’un message par le publisher.
  5. sub.Receive(...): Réception du message par le subscriber.

Le problème est que l’étape 3 est asynchrone: lorsque le publisher envoie effectivement un message, rien ne garantie que le subscriber ait eu le temps de se connecter physiquement.

La solution de facilité est de forcer un délai après la connexion ou le binding d’un socket (par exemple avec Thread.Sleep()). Ce n’est cependant pas une solution élégante, en plus d’être fragile. Voici la séquence d’événements que cela donnerait :

  1. pub.Bind(...): Binding du socket publisher.
  2. Thread.Sleep(150) (ce délai n’est pas strictement requis dans tous les scénarios).
  3. sub.Subscribe(...): Abonnement du subscriber.
  4. sub.Connect(): Connexion d’un socket subscriber (la souscription définie précédemment est envoyée à ce moment là).
  5. Thread.Sleep(150)
  6. pub.Send(...): Publication d’un message par le publisher.
  7. sub.Receive(...): Réception du message par le subscriber.

Une solution plus élégante est de mettre en place un mécanisme de synchronisation. Il y a plusieurs stratégies possibles selon les scénarios d’utilisation. Par exemple, dans le cas de notre Service Bus, nous souhaitons nous assurer qu’avant de mettre à disposition un ISubscriber auprès d’un composant applicatif, ce Subscriber soit immédiatement en mesure de recevoir des messages qui seraient publiés par un Publisher.

La technique utilisée dans l’implémentation proposée consiste à créer un Publisher pour l’occasion, publier plusieurs messages de synchronisation jusqu’à ce que le Subscriber en reçoive un, indiquant qu’il est prêt à recevoir les messages qu’il attend. Le message de synchronisation doit contenir un code unique permettant de garantir que le message est issue du bon publisher et reçu par le bon subscriber. Cette technique garantie simplement que lorsque le Subscriber est utilisé, il est bien physiquement connecté (puisqu’un message témoin a été reçu).

Certaines applications ont d’autres besoins, par exemple s’assurer de ne pas publier d’événements avant que tous les subscribers soient prêts. Cet article ne s’intéresse pas à ce problème (en gros, il faut que le publisher connaisse à l’avance le nombre de subscribers attendus et mettre en place le même type d’échanges tout en comptant le nombre de subscribers ayant acquitté réception du message témoin, via un second canal).

L’Event Proxy: un hub pour les contrôler tous (xpub / xsub)

(le terme « contrôler » n’est pas approprié mais ça sonnait bien et l’idée est là, j’espère)

Le rôle d’un Service Bus est de servir de vecteur de communication à différents composants qui ne se connaissent pas. L’objectif est de découpler ces composants.

Afin que tous les composants du système puissent communiquer sans pour autant « se connaître », une solution est d’exposer le Service Bus au travers d’une « adresse bien connue ».

ZMQ propose pour cela un composant nommé Proxy qui permet de relier nos deux sockets publisher et subscriber. Plutôt que de connecter un Subscriber final à un Publisher final, on connecte ceux-ci respectivement à un XPublisher et un XSubscriber. Ces deux derniers étant connectés entre eux au sein du Proxy.
Les sockets XPublisher et XSubscriber sont une variante des sockets Publisher et Subscriber, nécessaires pour router les abonnements sous la forme de messages spéciaux. Pour l’essentiel, le XPublisher transmet au XSubscriber les demandes d’abonnement envoyées par les Subscribers qui se connectent au proxy. Les demandes sont ensuites transmises du XSubscriber aux différents Publishers connectés au proxy.
Un message d’abonnement est simplement constitué du préfixe des messages à recevoir (le topic de l’abonnement) précédé de l’octet 1. Un message de désabonnement est identique, sauf que le premier octet est 0. Le désabonnement est automatique lorsque le socket Subscriber est libéré proprement. Par exemple, la méthode subSocket.Subscribe("foo") (pour recevoir les messages qui commencent par « foo ») transmettra un message hexadécimal « 01666f6f » lors de la connexion du socket, et un message « 00666f6f » lors de la libération du socket.

Les figures 12 et 13 du guide ZMQ illustrent très bien cette idée:

Figure 12:

Figure 13:

Du point de vue du Service Bus, le composant réalisé est un Event Proxy. C’est l’approche broker: s’il n’est plus opérationnel, les communications ne passent plus. Dans la terminologie ZMQ, le composant ainsi créé est un device de type Forwarder.

L’Event Proxy est typiquement mis en oeuvre dans un processus dédié, permettant à d’autres processus de communiquer entre eux sans se connaître, mais en connaissant simplement l’adresse de l’Event Proxy.

On peut voir l’Event Proxy comme la partie physique, réseau, de notre Service Bus qui est, lui, une abstraction utilisée par tous les composants de tous nos processus. D’un point de vue du code, il y a un seul Event Proxy instancié, et plusieurs (références au) Service Bus; conceptuellement, chaque processus partage le même Service Bus. L’Event Proxy est également un Single Point Of Failure et le design de votre système doit prendre ce fait en compte si vous faites ce choix d’architecture.

Par comparaison, Twitter avec ses #hashtags joue conceptuellement le rôle d’Event Proxy social. Les hashtags sont les topics que l’on peut suivre (subscribers). Vous n’avez pas à connaître le compte de toutes les personnes qui publient un message contenant un hashtag. Une personne peut même changer de compte et continuer à diffuser des messages avec le même hashtag, vous continuerez à le voir, sans même savoir qu’il s’agit de la même personne ou qu’elle a changé de compte. Grâce au Service Bus (et techniquement à l’Event Proxy), les composants d’un système peuvent être fortement découplés. Cette comparaison est conceptuelle plus que technique: Twitter ne permet pas formellement de s’abonner à un hashtag, mais des services tiers le permettent.

MISE A JOUR 18 Mars 2015 : Bug dans la version NetMQ 3.3.0.11
La version actuelle NetMQ 3.3.0.11 publiée sur Nuget contient un bug affectant le XSubscriber du proxy: Dans le cas d’un publisher qui se connecte pour la première fois au XSubscriber, le XSubscriber lui transmettra une demande d’abonnement (s’il y en a effectivement de la part de Subscribers connectés au XPublisher). Si le publisher se déconnecte puis se reconnecte au XSubscriber, ce dernier ne répétera pas les demandes d’abonnement déjà transmises. Par conséquent, le publisher ne transmettra pas ses messages. Ce bug affectera typiquement un Event Proxy IPC (autrement dit non in-process).
Ce bug est corrigé dans la version actuelle de NetMQ mais il faut la compiler à partir du code source de GitHub.

Event Proxy et slow joiner problem

Malgré mes efforts, ce fait n’est sûrement pas intuitif : bien que le slow joiner problem concerne – comme son nom l’indique – les subscribers, lorsque les messages transitent par un Event Proxy, ce problème peut également affecter les publishers retournés par le Service Bus.

De plus, un problème comparable affecte l’Event Proxy lui-même: comme le binding des sockets est asynchrone, rien ne garantie que l’Event Proxy soit prêt à router les messages lorsque l’on commence à émettre des messages sur le bus. Cela ne pose généralement pas de problème car l’Event Proxy est typiquement mis en oeuvre dans un processus séparé. Ce processus doit bien entendu être démarré avant que les autres processus puissent communiquer entre eux via le bus. Cependant ce démarrage asynchrone de l’Event Proxy peut poser des difficultés pour des tests unitaires qui mettraient en oeuvre cet Event Proxy de façon locale, pour les tests eux-mêmes. Dans ce cas on souhaite attendre que le proxy soit opérationnel avant de commencer à publier des événements sur le bus.

La solution décrite précédemment appliquée lors de la création d’un subscriber peut être réutilisée également lors de la création d’un publisher et lors de la mise en oeuvre de l’Event Proxy.

Pour aller plus loin…

Bien que parfaitement fonctionnel et de bonne qualité (de mon point de vue très subjectif et imparfaitement impartial), ce service bus n’est pas optimal à plus égards.

Tout d’abord, le nombre de sockets TCP consommés par processus peut être important, en fonction du nombre de composants de vos applications qui utilisent le Serivce Bus. Chaque création d’un Publisher ou d’un Subscriber revient à consommer un nouveau socket TCP (c’est même une vision un peu simplifiée, je vous invite à jeter un oeil avec un outil comme TCPView). Cela fonctionne très bien mais ce n’est pas gratuit. Une optimisation serait de créer un réseau interne dans le bus afin de fournir des publishers et subscribers non pas IPC (TCP) mais InProc. Ceux-ci seraient connectés à un seul canal TCP. Ainsi, chaque processus du système serait en mesure d’utiliser le bus IPC en ne consommant que deux sockets TCP.

Un autre point est que le composant Publisher n’est pas thread-safe. Autant ce fait est naturel pour les Subscribers, autant il est assez courant d’avoir besoin d’un Publisher thread-safe. La meilleure solution est de faire des choix qui ne requierent pas cela. Malgré tout, si ce scénario est nécessaire, une possibilité est d’utiliser LMAX Disruptor pour gérer un publisher dans un EventHandler et d’exposer l’ajout d’événements dans le RingBuffer au travers d’un IPublisher qui serait donc thread-safe. J’ai abordé LMAX Disruptor dans un précédent article.

L’implémentation proposée n’est pas adaptée au transport InProc (c’est-à-dire un Service Bus interne au processus). Dans ce scénario, l’Event Proxy doit être hébergé par le processus (éventuellement au sein du Service Bus, bien que conceptuellement c’est un composant distinct). ZMQ impose pour le transport InProc que le binding d’un socket (celui de l’Event Proxy) soit mis en oeuvre avant la connexion de l’autre point de terminaison. Comme la connexion et le binding d’un socket sont asynchrones (je me répète), une solution est de fiabiliser la phase de connexion des sockets (publishers et subscribers) en essayant plusieurs fois pendant un court laps de temps jusqu’à réussite de la connexion.

Les composants de cette implémentation sont très peu configurables. Les options HighWaterMarks devraient typiquement être adaptées aux besoins du système. Il s’agit du nombre maximum de messages pouvant être mis en file d’attente dans un socket ZMQ. Dans le code fourni, une constante de 100K messages est utilisée. Si vous constatez que vous recevez des messages, puis que vous en perdez au bout d’un laps de temps, vérifiez ce paramètre.La solution la moins élégante est de désactiver cette limite (avec la valeur 0). Notez cependant que votre processus risque de crasher et de surcharger le système en cas de dépassement de capacité. Une solution plus robuste est de mettre en place un mécanisme de contrôle de flux du côté des publishers: un subscriber qui constate qu’il reçoit les événements plus vite qu’il ne peut les traiter peut avertir le composant qui les publie pour qu’il ralentisse la cadence. Bien sûr, cela dépend du système. Le fait de « perdre » des messages est une option tout à fait envisageable dans certains cas.

Enfin, le plus souvent les solutions génériques ne sont pas les plus optimales. Étudiez les contraintes techniques de votre design et faîtes vos choix en conséquence. Cet article a pour but d’illustrer une façon de faire qui est tout à fait viable sur certains designs, et qui l’est moins sur d’autres. Je pense que si l’on choisit ZMQ pour une solution, c’est avant tout pour ses performances hors normes. Et toute solution recherchant les performances doit être façonnée sur-mesure en fonction des choix d’architecture qui ont été faits sur le système cible.