Implémentation du pattern ZMQ Request/Reply avec un client ASP.NET

Le scénario est le suivant :

  • Côté serveur, un service expose un socket Reply (REP).
  • Côté client, un contrôleur WebApi expose le service en HTTP, via un socket Request (REQ).
  • Pour ajouter un peu de piment, nous proposerons un cluster de plusieurs sockets REP auxquels les requêtes pourront être distribuées.

Mon exemple s’appuie sur un projet ASP.NET MVC (what else !?) mais il n’y a pas de différence majeure en ASP.NET Webforms, au niveau du client du service.

Il s’agit d’un scénario très classique où un service interne n’est pas directement accessible à un client web en AJAX: notre contrôleur WebApi joue le rôle de proxy.

Si ZMQ ne vous est pas déjà familier, je recommande de lire mon premier article d’introduction.

Terminologie client/serveur

J’utilise dans cet article la terminologie client/serveur pour que les rapports entre les composants soient plus parlants. Notez cependant que ZMQ n’a pas de notion client ou serveur et qu’il est parfaitement possible d’inverser les rôles de chaque type de socket. Dans la plupart des cas, je pense que le socket REQ sera côté client et le socket REP côté serveur.

Problématique: sockets non thread-safe

Les sockets ZMQ ne sont pas thread-safe, on ne peut donc pas simplement exposer un socket sous la forme d’un client singleton injecté dans le contrôleur WebApi.

La première solution est de créer un nouveau socket REQ à chaque appel de notre action WebApi. Cela fonctionne sans problème, mais si notre socket utilise le protocole TCP/IP, nous allons créer un nouveau socket à chaque requête web, ce qui est rarement tenable.

Solution: router et dealer

En dehors des sockets utilisés en points de terminaison d’un réseau, ZMQ apporte la notion de device pour tout ce qui est intermediation au sein du réseau. Dans le cas présent, un device Shared Queue peut être créé avec deux sockets particuliers: le Router côté client (REQ) et le Dealer côté serveur (REP). Le Router est en fait similaire à un socket REP, tandis que le Dealer est comparable à un socket REQ. D’où la possibilité de branchements que vous devez déjà visualiser. Si c’est confus, un dessin sera très clair (figure 16 extraite du guide ZMQ) :

Diagramme

L’élégance de cette solution est que nos sockets REQ et REP fonctionnent exactement de la même façon, avec ou sans device entre eux.

Le plus souvent, et comme on peut le voir sur l’illustration ci-dessus, les devices sont des composants de type singleton, tandis que les autres types de sockets sont dupliquables (si architecture distribuée).

Fonctionnement du Router et du Dealer: passer d’un échange synchrone à des échanges asynchrones

La responsibilité de nos sockets Router et Dealer est de transformer notre pattern REQ/REP synchrone, dans lequel un seul échange est possible à la fois, en plusieurs échanges asynchrones.

Pour cela, le Router insère dans le message un identifiant associé au socket REQ d’origine de chaque requête. Le Dealer consommera l’identifiant en début de message, traitera (d’où son nom) avec le socket REP cible, puis retournera sa réponse au Router en insérant de nouveau l’identifiant d’origine. Le Router retournera enfin la réponse au socket REQ (en consommant l’identifiant en début de message). Du point de vue des sockets REQ et REP, le message est inchangé (sans identifiant). Pour plus de détails, voir la section Request-Reply Envelopes du guide ZMQ.

Contrairement au socket REQ, le socket Router peut accepter plusieurs requêtes successives (issues chacune d’un socket REQ), sans avoir à attendre la réponse du socket REP (ou du Dealer) entre chacune d’entre elles.

Concrètement, notre device pourra être installé entre un socket REP IPC (TCP/IP) et un socket REQ in-process.Il permettra de créer efficacement autant de sockets REQ qu’on le souhaite, sans consommer de socket TCP à chaque nouvelle requête ASP.NET. Le device est optionnel: la communication REQ/REP fonctionne avec ou sans.

Implémentation du protocole Req/Rep

La solution complète est disponible sur GitHub.

Partons du principal: le code ci-dessous correspond au serveur. On utilise la librairie NetMQ.


internal class Program
{
    private static void Main(string[] args)
    {
        try
        {
            var address = "tcp://localhost:1040";
            using (new RepServer(address))
            {
                Console.WriteLine("Server ready: {0}", address);

                Console.WriteLine("\r\nPress a key to exit...");
                Console.ReadKey(true);
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex);
        }
    }
}


public sealed class RepServer : IDisposable
{
    private readonly NetMQContext _ctx;
    private readonly RepSocket _serverSocket;

    public RepServer(string address)
    {
        _ctx = NetMQContext.Create();
        _serverSocket = new RepSocket(address, _ctx);
    }
    
    public void Dispose()
    {
        _serverSocket.Dispose();
        _ctx.Dispose();
    }
}


public sealed class RepSocket : IDisposable
{
    private readonly Task _bgTask;
    private readonly CancellationTokenSource _bgTaskCts;
    private readonly NetMQContext _ctx;
    private readonly string _address;
    
    public RepSocket(string address, NetMQContext context)
    {
        _ctx = context;
        _address = address;
        _bgTaskCts = new CancellationTokenSource();
        _bgTask = new Task(BackgroundTask, _bgTaskCts.Token, _bgTaskCts.Token, TaskCreationOptions.LongRunning);
        _bgTask.Start();
    }
    
    private void BackgroundTask(object state)
    {
        var cancellationToken = (CancellationToken)state;
        ResponseSocket socket = _ctx.CreateResponseSocket();
        try
        {
            socket.Bind(_address);

            byte[] receiveBuffer;
            while (!cancellationToken.IsCancellationRequested)
            {
                try
                {
                    receiveBuffer = socket.Receive();
                    if (receiveBuffer == null)
                        continue; // NetMQ > 3.3.0.11
                }
                catch (AgainException)
                {
                    continue; // NetMQ = 3.3.0.11
                }

                #region Always send a reply...
                
                try
                {
                    Thread.Sleep(500); // simulates processing...
                    socket.Send(string.Format("Reply ({0})", Encoding.UTF8.GetString(receiveBuffer)));
                }
                catch (TerminatingException)
                {
                    try
                    {
                        socket.Send("Exit...");
                    }
                    catch
                    {
                    }
                    throw;
                }
                
                #endregion
            }
        }
        catch (TerminatingException)
        {
        }
        finally
        {
            try
            {
                socket.Dispose();
            }
            catch (NetMQException)
            {
            }
        }
    }
    
    public void Dispose()
    {
        _bgTaskCts.Cancel();
    }
}


Et ci-dessous le code du client :


internal class Program
{
    private static void Main(string[] args)
    {
        try
        {
            var address = "tcp://localhost:1040";
            using (var ctx = NetMQ.NetMQContext.Create())
                using (var client = new ReqSocket(address, ctx))
                {
                    Console.WriteLine("Client connected.");
                    var watch = Stopwatch.StartNew();
                    int i = 0;
                    while (watch.Elapsed < TimeSpan.FromSeconds(2))
                    {
                        string response = client.SendRequest(string.Format("Request #{0}", ++i));
                        Console.WriteLine(response);
                    }
                }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex);
        }
        finally
        {
            Console.WriteLine("\r\nPress a key to exit...");
            Console.ReadKey(true);
        }
    }
}


public sealed class ReqSocket : IDisposable
{
    private static readonly TimeoutException TimeoutException = new TimeoutException();

    private readonly RequestSocket _reqSocket;

    public ReqSocket(string address, NetMQContext context)
    {
        _reqSocket = context.CreateRequestSocket();
        _reqSocket.Options.ReceiveTimeout = TimeSpan.FromSeconds(1);
        _reqSocket.Options.Linger = TimeSpan.Zero;
        try
        {
            _reqSocket.Connect(address);
        }
        catch
        {
            try
            {
                _reqSocket.Dispose();
            }
            catch
            {
            }
            throw;
        }
    }

    public void Dispose()
    {
        _reqSocket.Dispose();
    }

    public string SendRequest(string request)
    {
        _reqSocket.Send(request);
        byte[] receiveBuffer;
        try
        {
            receiveBuffer = _reqSocket.Receive();
            if (receiveBuffer == null) // NetMQ > 3.3.0.11
                throw TimeoutException;

            return Encoding.UTF8.GetString(receiveBuffer);
        }
        catch (TerminatingException)
        {
            Dispose();
            throw;
        }
        catch (AgainException) // NetMQ = 3.3.0.11
        {
            throw TimeoutException;
        }
    }
}

Le code ci-dessus contient deux programmes console, un serveur et un client. Il faut démarrer le programme serveur, puis le client.

Il est aussi possible de lancer le même exemple dans un simple test unitaire (NUnit) :

[TestFixture]
public class SimpleReqRepTest
{
    [Test]
    public void Test()
    {
        string address = string.Format("tcp://localhost:{0}", Helper.GetAvailablePort());
        using (new RepServer(address))
        using (var context = NetMQ.NetMQContext.Create())
        using (var client = new ReqSocket(address, context))
        {
            Console.WriteLine("Client connected.");
            for (int i = 0; i < 4; i++)
            {
                string response = client.SendRequest(string.Format("Request #{0}", i+1));
                Console.WriteLine(response);
                Assert.AreEqual(string.Format("Reply (Request #{0})", i+1), response);
            }
        }
    }
}

La sortie est la suivante :

Client connected.
Reply (Request #1)
Reply (Request #2)
Reply (Request #3)
Reply (Request #4)

Le client envoie 4 requêtes et le serveur attend 500ms avant de répondre à chacune. Comme il n’y a qu’un seul socket serveur, le test dure un peu plus de 2 secondes.

Pour intégrer ce client dans un contrôleur WebApi, il suffirait de fournir le contexte ZMQ au contrôleur pour qu’il puisse instancier un client à chaque requête ASP.NET, ou, plus propre, fournir une factory qui ferait la même chose. Cependant, cela consommera un socket TCP à chaque requête ASP.NET, ce qui est un peu maladroit.

Device Shared Queue (Router/Dealer)

On réutilise le composant Proxy proposé par ZMQ. Ce dernier est « simplement » un pont entre deux sockets Router et Dealer. Tout ce qui est reçu sur un socket est envoyé vers l’autre et vice-versa.

Voici notre device :


public sealed class RouterDealerQueueDevice
{
    private readonly NetMQContext _ctx;
    private readonly string _frontendAddress, _backendAddress;
    private readonly Thread _bgThread;

    public RouterDealerQueueDevice(string frontEndAddress, string backEndAddress, NetMQContext zmqContext)
    {
        _ctx = zmqContext;
        _frontendAddress = frontEndAddress;
        _backendAddress = backEndAddress;
       
        _bgThread = new Thread(new ThreadStart(ProxyThread));
        _bgThread.IsBackground = true;
        _bgThread.Start();

        if (_frontendAddress.StartsWith("inproc://") &&
            !TrySyncInProcSocket(_frontendAddress, 1000))
        {
            throw new TimeoutException();
        }
    }

    private void ProxyThread()
    {
        RouterSocket router = null;
        DealerSocket dealer = null;
        try
        {
            router = _ctx.CreateRouterSocket();
            dealer = _ctx.CreateDealerSocket();
            router.Bind(_frontendAddress);
            dealer.Connect(_backendAddress);

            router.Options.Linger = TimeSpan.Zero;
            dealer.Options.Linger = TimeSpan.Zero;
            var xproxy = new Proxy(router, dealer, null);
            xproxy.Start();
        }
        catch (TerminatingException)
        {
        }
        finally
        {
            if (router != null)
            {
                try
                {
                    router.Dispose();
                }
                catch (NetMQException)
                {
                }
            }
            if (dealer != null)
            {
                try
                {
                    dealer.Dispose();
                }
                catch (NetMQException)
                {
                }
            }
        }
    }
}

Et le test unitaire correspondant :


[TestFixture]
public sealed class SharedQueueClientTest
{
    [Test]
    public void Test()
    {
        string address = string.Format("tcp://localhost:{0}", Helper.GetAvailablePort());
        using (new RepServer(address))
            using (var clientFactory = new ClientFactory(address))
            {
                List<Task> tasks = new List<Task>();
                for (int i = 0; i < 4; i++)
                    tasks.Add(Task.Factory.StartNew(RequestThread, clientFactory));

                Task.WaitAll(tasks.ToArray());
            }
    }

    private void RequestThread(object clientFactory)
    {
        IReqSocket client = null;
        try
        {
            client = ((ClientFactory)clientFactory).Create();

            Console.WriteLine("Client connected.");
            string response = client.SendRequest(string.Format("Request #{0}", Thread.CurrentThread.ManagedThreadId));
            Console.WriteLine(response);
            Assert.AreEqual(string.Format("Reply (Request #{0})", Thread.CurrentThread.ManagedThreadId), response);
        }
        finally
        {
            if (client != null)
                ((ClientFactory)clientFactory).Release(client);
        }
    }
}

On peut constater que nous n’avons pas changé le code du socket client, ni celui du serveur. Nous avons simplement inséré le device entre les deux. Et nous utilisons une factory pour obtenir un socket client in-process. Il suffit d’injecter cette factory dans le contrôleur WebApi pour lui permettre de créer un socket in-process à chaque requête ASP.NET.

Voici un exemple de WebApi :

public class RequestController : ApiController
{
    private readonly ClientFactory _factory;

    public RequestController(ClientFactory clientFactory)
    {
        _factory = clientFactory;
    }

    public string Get(string id)
    {
        IReqSocket client = _factory.Create();
        try
        {
            return client.SendRequest(id);
        }
        finally
        {
            _factory.Release(client);
        }
    }
}

Deux points notables :

  • Le constructeur du device RouterDealerQueueDevice, avant de retourner, attend que le socket inproc soit prêt. Comme la connexion, l’écoute, l’envoi et la réception sont des opérations asynchrones, et que ZMQ a une limitation sur l’ordre de connexion des sockets in-process, nous sommes obligés de faire cette synchronisation ou d’attendre simplement un délai arbitraire avant de connecter effectivement un client in-process. Cette problématique est décrite dans mes précédents articles sur ZMQ.
  • On définit l’option Linger des sockets Router et Dealer, comme on l’a fait pour le socket Req. Cette option désactive l’attente d’envoi des messages restants en cas d’arrêt du contexte ZMQ. C’est particulièrement important si le serveur n’est pas opérationnel et que les messages ne sont pas délivrés.

Cluster serveur

Le device que nous utilisons côté client pour limiter le nombre de connexions TCP peut être réutilisé côté serveur pour distribuer les requêtes: si le serveur expose un cluster de deux sockets REP, les requêtes seront distribuées alternativement au premier et au second socket. Pour que ce soit possible, il faut bien sûr que le composant qui utilise le socket REP soit state-less: pas de maintien d’un état entre deux requêtes, puisque le composant n’est pas certain de recevoir toutes les requêtes. On pourrait même dire qu’il est certain du contraire si la configuration est constante, avec plus d’un socket. En général, on rendra cette fonctionnalité paramétrable afin de permettre 1 à N sockets côté serveur, en fonction de la charge attendue. On peut même imaginer une montée en charge automatique, mais ce n’est pas l’objet de cet article!

Cette fois, nous devons adapter légèrement le code du socket Rep de sorte qu’il se connecte au socket Dealer qui sera en écoute (bind). Nous parlons ici du device côté serveur. Du point de vue du client, le Dealer se connecte toujours au serveur, qui écoute.

Comme les changements de code sont vraiment mineurs, je n’ai pas inclus le code dans cet article. La solution sur GitHub contient l’exemple complet. Si nous reprenons le schéma présenté en début d’article, la nouvelle topologie consiste donc à dupliquer le device Router/Dealer : une instance côté serveur et une instance côté client.

Le test unitaire ServerClusterTest est identique au test SharedQueueClientTest présenté plus haut, à ceci près que l’on peut définir le nombre de sockets serveur. Le test dure moins d’une secondes avec quatre sockets au lieu des deux grosses secondes pour le test avec un seul socket.

Bon à savoir : quelques généralités…

Version de NetMQ

La version 3.3.0.11 actuellement publiée sur NuGet est relativement ancienne et contient des bugs qui ont été corrigés si vous compilez la librairie à partir du projet source sur GitHub.

Option Linger

La valeur par défaut de l’option Linger fait qu’un socket attend indéfiniment de pouvoir envoyer ses messages. Cela est problématique si le serveur n’est pas opérationnel: la libération du contexte bloquera indéfiniment. N’oubliez donc pas de définir cette options, spécialement côté client.

Bind / Connect

L’idée est d’écouter (bind) du côté le plus stable, le moins dynamique, et de connecter le côté le plus éphémère.

Dans un topologie Req / Rep, le socket Rep sera typiquement en écoute tandis que le socket Req sera connecté.

Avec un device côté serveur, l’élément le plus stable devient le device et non plus le socket Rep. Donc c’est le Dealer qui sera en écoute, tandis que le ou les sockets Rep seront connectés. Le socket Router est logiquement en écoute puisqu’il représente notre socket Rep du point de vue des clients qui vont se connecter.

Avec un device côté client, celui-ci est un singleton au niveau de l’application, tandis que les sockets Req sont éphémères: on va donc mettre en écoute le socket Router et connecter les sockets Req et le socket Dealer.

Load-balancing

Il est possible de distribuer les requêtes comme nous l’avons fait sur un seul processus qui héberge plusieurs threads (un thread par socket Rep), mais nous pourrions également distribuer les requêtes au sein de plusieurs processus (éventuellement sur différentes machines): il suffit de connecter le même socket Dealer du client à plus d’un serveur. Evidemment la topologie des serveurs doit être connue du ou des clients. Pour rendre cela plus dynamique, nous pouvons insérer un Broker qui se chargerait de cette distribution. Cependant, le problème est seulement déplacé du ou des clients vers le broker qui est également un Single Point Of Failure. Pour limiter ce dernier risque, une solution est de mettre en oeuvre deux brokers statiques, connus de tous les clients. Cette solution est intéressante dans une topologie à plusieurs processus clients. Une dernière alternative serait de rendre le réseau « discoverable »en permettant aux processus serveur d’aller et venir en fonction de la charge, et aux clients de détecter les serveurs disponibles. Ce type d’architecture est cependant bien plus complexe à construire de façon efficace. Mais c’est possible et cela vaut la peine d’être noté!

Références

Using DEALER and ROUTER Sockets
Chapitres Trois et Quatre du guide ZMQ

Using NetMQ and ASP.NET (blog de Doron Somech, le principal contributeur du portage NetMQ)