ZeroMQ: une intro à 0MQ (.NET)

ZMQ est une API de files d’attentes (aka Messaging) basée sur des sockets et conçue spécialement pour des applications à hautes performances et faible latence.

Sa prise en main sur des applications « real-world » ne m’a pas semblé aisée, aussi j’espère que cet article pourra aider à saisir les premiers principes nécessaires pour utiliser cette API.

Avant tout, je tiens à préciser un point de vue personnel qui, je pense, est déjà partagé par un certain nombre de personnes: ZMQ n’est pas « facile » à utiliser. Comparé à MSMQ, par exemple, ce dernier est un framework « clé en main ». ZMQ demande plus de code et surtout plus de préparation. C’est une API légère et surtout un grand nombre de recommandations sur la façon de concevoir votre application. En contrepartie, le gain accru en performances est bien au rendez-vous, voyez par vous-même le graphique ci-dessous. Enfin, tout développeur ZMQ devrait lire entièrement le guide ZMQ (livre de 500 pages disponible gratuitement en ligne). L’API est structurée sous la forme de « patterns » d’utilisation. Le but de cet article est de présenter dans les grandes lignes comment utiliser ZMQ « en vrai », sur des scénarios simples (qui sont aussi les principaux scénarios d’après mon expérience personnelle): Push/Pull, Request/Reply et Publisher/Subscriber. Si vous êtes convaincu, je vous recommande vivement de lire le guide avant d’implémenter ZMQ sur des projets professionnels.

Benchmark graph
(source du graphique: http://mikehadlow.blogspot.co.uk/2011/04/message-queue-shootout.html)

Mise à jour du 30 janvier 2016:
Entre aujourd’hui et le jour de rédaction de cet article, le code de NetMQ (ZMQ sur .NET) a subi beaucoup d’évolutions, sa communauté open source ayant été très active. Le code fourni dans cet article est devenu obsolète. Cependant les notions clés sont toujours les mêmes.

0MQ, les origines…

0MQ, ZeroMQ, ZMQ, NetMQ… : Le nom original est 0MQ mais j’utilise dans le reste de cet article le terme ZMQ, pour la simple raison que c’est le terme qui retourne le plus de résultats dans une recherche Google.

Le projet aurait commencé en 2007. La première version date de 2009 ou 2010, et je dirai que ZMQ est arrivé à maturité fin 2011 avec la version 3.1.

De façon très basique et un peu réductrice, ZMQ est une API open-source de files d’attentes basée sur des sockets TCP. Autrement dit, une API de Messaging IPC (inter-process) et distribuée (sans broker).

L’objectif de leurs deux principaux auteurs, Martin Sústrik et Pieter Hintjens, était de concevoir une API de Messaging conçue pour des applications de micro-trading haute fréquence. Open source, le projet s’est rapidement diffusé de façon plus générale aux applications de n’importe quel autre domaine, même non financier.

Les principaux traits recherchés par ZMQ sont donc ses performances et sa robustesse. Un gros avantage également est qu’il s’agit d’une librairie: il n’y a aucun service particulier à installer.

L’API originale est codée en C++, tandis qu’il existe des bindings dans d’autres langages (qui utilisent la DLL native) ainsi que des réécritures indépendantes.

Quelle API utiliser pour .NET ?

Il existe plusieurs portages. L’API actuellement maintenue pour .NET est NetMQ. Il s’agit d’une réécriture complète de ZMQ en C#. Le binding clrzmq à partir de la bibliothèque native (C++) n’est plus maintenu. La liste des bindings pour ZMQ est maintenue sur cette page.

Ce qui n’est pas inclus dans ZMQ

Basiquement, ZMQ n’est « qu’une » API de communication. Ne sont pas inclus notamment :

  • La sérialisation : c’est à vous d’intégrer la sérialisation des messages avec par exemple :
    • protobuf ou thrift pour les prudents avides de performances,
    • Json, pour les mainstream users,
    • Avro pour les early adopters,
    • ou tout simplement XmlSerializer ou BinaryFormatter pour les old school!
    • […] ou tout autre protocole métier selon le domaine de votre entreprise.
  • La sécurité (à vous de chiffrer vos échanges si cela est souhaité).
  • La persistence des messages.

Enfin, ZMQ n’est pas fiable (aka reliable) au sens donné dans la terminologie des protocoles de messagerie (cf. paragraphe dédié en fin d’article): entre autres, les messages ne sont pas persistants, ils sont uniquement en mémoire. Si l’application s’arrête, les messages non transmis sont perdus.
ZMQ ne gère que l’échange de messages (et il excelle à cette tâche).

Principes de base: sockets, messages, patterns, contexte

Sockets

Que ce soit au sein d’un même programme ou entre différents programmes (éventuellement distribués sur plusieurs machines), la communication dans ZMQ se fait au travers de composants nommés Sockets. Ces composants sont une extension des sockets « standards » (aka sockets Berkeley). On en déduira rapidement que l’API est relativement bas niveau comparée à d’autres comme MSMQ. L’avantage est une API de taille très réduite et des performances hors normes. L’inconvénient évident est qu’il y a davantage de travail nécessaire pour l’intégrer dans une application.

Une clé de ZMQ est de ne pas utiliser de mécanisme de synchronisation: les sockets ZMQ ne sont pas thread-safe. Cela peut surprendre au premier abord.

Si deux threads souhaitent communiquer (au sein du même processus ou de processus différents), chaque thread doit utiliser un socket ZMQ. Les sockets n’étant pas thread-safe, un même socket ne doit pas être utilisé par deux threads différents et il est déconseillé également de transférer un socket d’un thread à un autre (autrement dit, un thread qui désire communiquer doit créer lui-même son socket). Si deux threads doivent envoyer un message à un troisième thread, encore une fois, chaque thread doit utiliser un socket ZMQ dédié, par exemple deux threads détiennent chacun un socket Push et un troisième reçoit les messages avec un socket Pull. Par voie de conséquence, nous n’utiliserons jamais un client ZMQ sous la forme d’un singleton. Nous aurons plus probablement une factory capable de construire un client. La factory pourra être singleton tandis que chaque client construit sera dédié au thread qui le consomme.

Messages

Les échanges entre deux sockets se font par messages. Un message est soit une chaîne, soit un tableau d’octets.

Patterns

Chaque socket ZMQ est d’un type donné, en fonction du pattern de communication. Par exemple, dans le premier cas présenté plus bas, le socket qui envoie les messages est un socket Push, tandis que le socket qui reçoit les messages est un socket Pull.

Les échanges sont asynchrones. La séquence d’initialisation des sockets n’a (sauf exceptions décrites dans le guide ZMQ) pas d’importance: on peut envoyer un message vers un serveur qui n’est pas encore démarré. ZMQ gère la mise en file d’attente des messages. En cas de coupure (crash de processus, micro-coupure réseau…), ZMQ gère les nouvelles tentatives d’envoi jusqu’à réussir la transmission des messages.

Bind / Connect

Un socket peut indifféremment écouter une adresse (*bind*) et se connecter à une adresse (*connect*). L'écoute correspond généralement à la partie "serveur", la plus stable, et la connexion à la partie "client", la plus éphémère. 
Par exemple, un socket *Push* peut indifféremment se connecter à ou écouter une adresse; idem pour un socket *Pull* (et n'importe quel autre type de socket).

Par contre, le pattern n'est pas indifférent à la topologie des connexions entre sockets. Dans le pattern *Push/Pull*, il est indispensable que la connexion/écoute respecte cette combinaison. Par exemple, un socket *Push* doit se connecter à l'adresse écoutée par un socket *Pull* ou un socket *Pull* doit se connecter à l'adresse écoutée par un socket *Push*.

Le respect du pattern n'est donc pas magique. Par exemple, considérons le cas suivant:

- 1 socket *Push A* qui écoute l'adresse X.
- 1 socket *Push B* qui se connecte à l'adresse X.
- 1 socket *Pull Z* qui se connecte à l'adresse X.

Cette topologie ne fonctionnera pas comme attendu: ZMQ ne va pas « magiquement » faire communiquer le socket Push B avec le socket Pull Z. Le socket Push B sera connecté au socket Push A. Cette combinaison ne fonctionne pas. Par conséquent, seuls les messages du socket Push A pourront être reçus par le socket Pull Z (mais il n’y aura pas d’erreur au runtime lors du branchement des deux sockets push).

En revanche, cette configuration fonctionnerait :

- 1 socket *Push A* connecté à l'adresse X.
- 1 socket *Push B* connecté à l'adresse X.
- 1 socket *Pull Z* qui écoute l'adresse X.

Contexte

Le cycle de vie des sockets ZMQ est géré par un contexte. C’est le seul composant ZMQ qui soit thread-safe. Le contexte ZMQ permet de créer des sockets et de les terminer proprement. Le guide ZMQ recommande explicitement de créer un unique contexte au sein de l’application (un contexte par processus). D’après mon expérience sur des applications .NET, je ne recommande pas cela. Je m’expliquerai un peu plus tard, lorsqu’il s’agira de terminer les sockets. Il est intéressant de noter une forme de contradiction sur ce point, entre le guide de ZMQ et le livre, plus récent, The Architecture of Open Source Applications (chapitre ZeroMQ de Martin Sústrik). L’auteur précise que chaque librairie qui utilise ZMQ au sein d’une application devrait détenir son propre contexte ZMQ.

Mise à jour du 30 janvier 2016:
Martin Sústrik a changé son opinion plus tard (cf. Getting Rid of ZeroMQ-style Contexts). NetMQ a également évolué depuis la rédaction de cet article: le contexte existe toujours mais il est maintenant possible de créer les sockets à partir de leur constructeur, sans contexte. En arrière-plan, NetMQ gère la création et la libération du contexte singleton, grâce à un compteur de références (destruction lorsque le dernier socket est libéré).

Séquence de terminaison (libération du contexte)

C’est probablement l’aspect le plus compliqué à gérer, du point de vue de l’utilisation de ZMQ (et peut-être de son implémentation interne).

Le problème est le suivant: ZMQ tente de respecter l’envoi et la délivrance des messages transmis via les sockets du contexte. Comme tout est asynchrone – la connexion des sockets et les échanges de messages -, la séquence de libération des sockets (et de leurs worker threads associés – que l’on ne gère pas mais qui existent) est relativement compliquée. L’utilisateur demande à libérer le contexte. Le contexte émet un signal de terminaison aux sockets. Les sockets qui reçoivent ce signal ont la responsabilité de s’auto-libérer. Lorsque tous les sockets du contexte ont été libérés, le contexte peut se terminer et la séquence de libération a réussie. Si au moins un socket ne se termine pas, la libération du contexte bloque indéfiniment. La séquence interne est documentée en détails dans le whitepaper 0MQ Termination.

Une règle de base: un socket doit être créé et libéré par le thread qui l’utilise.

Exemples

Les sections suivantes présentent les principaux patterns. Il en existe d’autres (voir le guide ZMQ).

Le code complet des exemples est disponible sur GitHub.

Push / Pull

C’est le pattern le plus classique pour une file d’attente, aka fire & forget. Il peut y avoir plusieurs sockets Push, mais un seul socket Pull.

Voici un exemple schématique :

var context = NetMQContext.Create();

// Client :
var pushSocket = context.CreatePushSocket();
pushSocket.Connect("tcp://localhost:8080");
pushSocket.Send("test 1");

// Server :
var pullSocket = context.CreatePullSocket();
pullSocket.Bind("tcp://*:8080");
	
var buffer = pullSocket.Receive();
string receivedString = Encoding.Utf8.GetString(buffer);

Évidemment, cet exemple ne ressemble en rien à un cas réel d’utilisation. Je vous conseille de jeter un oeil à la solution de l’exemple sur GitHub qui correspond plus à du code utilisable (DemoPushPull).

La première chose à prendre en compte est que chaque méthode de l’API ZMQ est susceptible de propager une exception. Il faut donc traiter certains cas bien connus. Par exemple, une TerminatingException est propagée si le contexte est en cours de libération pendant un appel de méthode sur un socket (par exemple pour débloquer un appel – bloquant – à Receive). Ou encore, une AgainException si un timeout a lieu pendant la réception sur un socket.

La classe PushClient du projet d’exemple est la plus simple. Elle gère TerminationException lors d’un envoi, auquel cas la classe s’auto-libère (libère le socket sous-jacent). La méthode IDisposable.Dispose gère la libération en tenant compte des éventuelles exceptions propagées par ZMQ. L’exemple utilise une méthode d’extension CaptureMqExceptions() pour rendre le code plus lisible: cela masque un bloc try {} catch {} sur chaque appel de méthode. On notera aussi l’affectation Linger = 0 lors de la libération du socket, cela afin de débloquer une éventuelle tentative de connexion (qui a lieu en arrière-plan).

MISE A JOUR 14 Mars 2015 : Option Linger de ZMQ
Cette dernière affirmation est correcte, mais vague car ZMQ a étendu la notion de cette option Linger dans ses propres sockets.
Cette question sur Stackoverflow peut aider à comprendre l’option Linger sur un socket TCP conventionnel.
Dans le cas de ZMQ, l’option Linger sert à définir un timeout pour l’envoi des messages en attente dans la file du socket, lorsque celui-ci doit être fermé. C’est donc le délai laissé au socket ZMQ après un appel à sa méthode Close(), pour traiter les messages qu’il lui reste.
Par voie de conséquence, cette option affecte la séquence de terminaison du contexte ZMQ. La valeur par défaut de l’option Linger étant -1 (l’infini), si le socket n’est pas en mesure d’envoyer ses messages, il est susceptible de bloquer indéfiniment la libération du contexte et donc très probablement l’arrêt de l’application. Par exemple, si un socket PUSH ou REQ doit envoyer un message, mais que le socket serveur PULL ou REP n’est pas opérationnel, la libération du contexte bloquera indéfiniment si l’option Linger ne définit pas un délai fini.
Il est donc plus que judicieux de modifier cette valeur par défaut avec :

  • soit la valeur 0, qui indique que le socket doit être terminé sans délai même s’il lui reste des messages à envoyer,
  • soit une valeur supérieure à 0 pour définir une durée d’attente maximum.

L’option Linger d’un socket ZMQ est l’une des rares à pouvoir être définie après un appel à la méthode Connect() ou Bind() du socket. Cependant, une exception sera propagée si le contexte est déjà en phase de terminaison lorsqu’on tente de modifier cette option.

La classe PullReceiver correspond au serveur. Son implémentation est moins triviale que le client. Cela est dû au fait que l’écoute du socket est gérée dans une boucle au sein d’un background thread. C’est généralement ce dont on a besoin pour implémenter le traitement de la file des messages reçus. La règle déjà donnée plus haut est respectée par cette classe: le socket est créé et libéré par le thread qui l’utilise. La boucle surveille donc un CancellationToken en plus de gérer TerminationException pour mettre fin à l’écoute et libérer le socket. A contrario, la boucle continue l’écoute en cas de AgainException, typiquement propagée par la méthode Receive en cas de timeout.

Send / Reply

C’est le pattern requête-réponse. Rappelons que les patterns sont des contrats : un socket Send qui a envoyé un message a l’obligation de recevoir une réponse avant de pouvoir envoyer un nouveau message. De même, le socket qui reçoit la requête doit retourner une réponse avant de recevoir la requête suivante.

Il s’agit d’une légère variante du pattern Push/Pull dans laquelle ce qui serait le PullSocket retourne un message à ce qui serait le PushSocket:

var context = NetMQContext.Create();
	
// Client :
var reqSocket = context.CreateRequestSocket();
reqSocket.Connect("tcp://localhost:8080");
reqSocket.Send("Question");
byte[] responseBuffer = reqSocket.Receive();
string response = Encoding.Utf8.GetString(responseBuffer);

// Server :
var repSocket = context.CreateResponseSocket();
repSocket.Bind("tcp://*:8080");
	
var requestBuffer = repSocket.Receive();
string request = Encoding.Utf8.GetString(requestBuffer);
repSocket.Send("Answer");

Encore une fois, ce code n’est qu’un schéma qui ne ressemble guère à un cas réel. Jetez un oeil à la solution en exemple sur GitHub (DemoRequestReply). Le code est très similaire au pattern précédemment décrit.

Publish / Subscribe

C’est l’inverse du pattern Push/Pull, également de type fire & forget mais aussi broadcast: il y a typiquement un producteur de message (le socket Publish) et plusieurs consommateurs (sockets Subscribe).

Ce pattern a un certain nombre d’impacts sur l’architecture de l’application que je ne peux décrire dans cet article (ces contraintes sont les mêmes avec d’autres API de messaging). La contrepartie est que c’est, avec le pattern Push/Pull, le schéma d’échanges le plus performant car il n’y a pas de synchronisation requise entre un envoi (une requête) et un retour (une réponse).

Voici quelques particularités de ce pattern, en comparaison des autres patterns de ZMQ :

  • Les messages sont publiés uniquement aux abonnés effectivement connectés au moment de la publication: le guide ZMQ fait le parallèle avec une station radio: si la radio n’est pas allumée au moment de la diffusion d’un message, le message n’est pas reçu.
  • Les abonnés peuvent « filtrer » les messages auxquels ils s’abonnent.
  • L’implémentation du client (abonné) ressemble plus à l’implémentation d’un serveur comparé aux autres patterns.

La principale difficulté rencontrée lorsqu’on débute avec ce pattern est liée au fait que la connexion entre les deux sockets est asynchrone: il n’y aucune garantie que la publication d’un message soit effectivement transmise après réussite de la connexion avec un abonné. Là encore, c’est un sujet compliqué qui n’est pas l’objet de cet article. La méthode la plus simple (mais pas la plus jolie) est de retarder la publication des premiers messages d’un certain délai pour laisser le temps aux abonnés de se connecter. Le guide ZMQ fournit plusieurs pistes pour répondre à cette problématique et en réalité, la solution dépend beaucoup de l’architecture du système (connait-on à l’avance le nombre d’abonnés ? doit-on attendre que tous les abonnés soient connectés avant de publier un message ? etc.).

Un abonné peut souscrire à certains messages, en fonction d’un préfixe (filtre). Pour désactiver le filtrage, il faut s’abonner explicitement à une chaîne vide. Si un message ne correspond pas au filtre de l’abonné, il n’est pas émis vers cet abonné.

	var context = NetMQContext.Create();
	
	// Client :
	var subSocket = context.CreateSubscriberSocket();
	subSocket.Subscribe("T1:");
	subSocket.Connect("tcp://localhost:8080");
	var buffer = subSocket.Receive();
	string receivedString = Encoding.Utf8.GetString(buffer);

	// Server :
	var pubSocket = context.CreatePublisherSocket();
	pubSocket.Bind("tcp://*:8080");
	pubSocket.Send("T1:test T1");
	pubSocket.Send("T2:test T2");

Comme dans les exemples précédents, celui-ci n’a rien à voir avec un cas réel. Jetez un oeil à la solution en exemple sur GitHub (DemoPublishSubscribe).

Gérer plusieurs clients à partir d’un même contexte ZMQ

Les exemples décrits jusqu’à maintenant créent un contexte par client. Bien que cela fonctionne, si vous souhaitez communiquer avec un même serveur à partir de différents threads, il sera plus efficace (en terme d’utilisation mémoire) d’instancier les différents clients à partir d’un même contexte. Chaque client doit être créé et libéré par le thread qui l’utilise. Pour cela, le plus simple est d’implémenter le pattern Factory, dont le rôle est de gérer le contexte ZMQ. Ainsi, chaque thread partage la même factory (qui doit être thread-safe) afin de créer un socket client par thread.

La solution d’exemple sur GitHub propose une telle implémentation (DemoRequestReplyWithSingleClientContext).

Il s’agit du dernier exemple de cet article.

ZMQ n’est pas fiable

Comme mentionné plus haut, ZMQ n’est pas fiable (aka reliable) au sens donné dans la terminologie des protocoles de messagerie. Un pilier de la fiabilité d’un service de Messaging est le moyen de stocker les messages pour permettre leur renvoi en cas d’incident. Les messages transmis à ZMQ sont stockés uniquement en mémoire. Par conséquent, si la fiabilité est recherchée (terme fort vague, dont la définition varie d’un protocole de messagerie à un autre), c’est à l’application de gérer cela, par dessus ZMQ. L’alternative est de concevoir l’application de telle sorte que la fiabilité du service de Messaging n’est pas un pré-requis à la fiabilité de l’application. Un article également intéressant à lire à ce sujet se trouve sur InfoQ: Nobody needs Reliable Messaging: la solution la plus commune est l’idempotence des messages reçus par chacune des parties et le renvoi automatique lorsqu’on suspecte la non prise en compte d’un message. Les besoins des applications sont généralement assez uniques et différents à ce niveau pour qu’il soit préférable de gérer leur « fiabilité » au niveau de l’application et non du protocole de communication. Par exemple, pour une communication in-process, il n’y a pas véritablement de sens à avoir une communication fiable. La fiabilité est une contrainte qui n’est pertinente que dans la communication entre différents processus. À ce propos, ZMQ gère la communication par pipes et TCP, entre autres. Notons que si l’on sait d’avance et de façon constante que l’on fait communiquer des threads au sein d’un même processus, il existe d’autres choix comme LMAX Disruptor (voir aussi mon prochain article).

Pour plus de détails sur le thème de la fiabilité, les chapitres 4 et 5 du guide ZMQ en fournissent une définition particulière et proposent des exemples de mise en oeuvre.

Notons également l’existence d’un outil, PZQ, pour gérer la persistence des messages ZMQ. Il s’agit d’un exécutable indépendant de type man-in-the-middle.

J’ai tendance à penser que si l’on recherche un service de Messaging fiable, il ne faut pas s’orienter vers ZMQ. MSMQ est un très bon choix (avec un compromis sur les performances). En revanche, si l’on accepte de ne pas dépendre de certaines contraintes (par exemple la fiabilité, l’ordre, etc.), « se rapprocher du métal » peut changer la donne entre deux designs. Encore une fois, utiliser un service de Messaging non fiable n’implique pas nécessairement de développer une application non fiable.

Conclusion

Si vous êtes arrivé jusqu’ici, vous êtes probablement assez courageux pour tester ZMQ si ce n’est déjà fait. ZMQ est assez différent des autres frameworks de messaging et on ne peut pas facilement le substituer dans une application existante. Le fait est qu’il faut étudier son intégration lorsqu’on définit l’architecture de l’application. Si ce choix est applicable, ses performances sont sans commune mesure avec les frameworks plus connus. Cela a bien entendu un prix.

Notons que Martin Sústrik a démarré un nouveau projet nanomsg fortement inspiré de son retour d’expérience sur ZMQ. Une différence notable de mon point de vue est la disparition de la notion de contexte (ce que je vois comme un avantage). Il existe un binding NNanomsg pour .NET, mais ce projet n’est pas encore mature pour de la production.

Références

ØMQ: Mission Accomplished, Martin Sústrik, 2011-11-10
0MQ: A new approach to messaging, Martin Sustrik & Martin Lucina, 2010-01-20
nanmsg: Differences between nanomsg and ZeroMQ
The Architecture of Open Source Applications, vol. 2 (chapitre ZeroMQ), 2012-09-21
0MQ Termination, Mike Pearce, 2011-06-7
0MQ: Broker vs. Brokerless, 2008-12-12
Message Queue Shootout!, Mike Hadlow, 2011-04-10
Et aussi: 0MQ: Whitepapers