LMAX Disruptor pattern: une file non bloquante à ultra basse latence

LMAX Disruptor est un pattern et une implémentation conçue par la société LMAX pour des applications de trading haute fréquence (basse latence et haut débit). C’est un projet open-source Java pour lequel il existe une réécriture .NET.

Introduction à LMAX Disruptor

L’implémentation proposée par LMAX est d’une rare efficacité : il y a en fait très peu de composants à connaître et ceux-ci exposent peu d’opérations. Apprendre à l’utiliser est donc très rapide et il y a peu de risque de « mal » l’utiliser (comparé à ZMQ par exemple). C’est pour moi un modèle en terme d’implémentation d’API.

Il y a évidemment quelques concepts à appréhender et la manière de penser son application est un peu différente car vous ne pourrez pas transposer directement LMAX Disruptor à une file d’attente classique (comme la classe Queue(T)). Il y a aussi différentes façons, plus ou moins avancées, de l’utiliser. Les exemples présentés dans cet article se veulent simples. Il y a en fait beaucoup de façons de « tuner » son processeur, ce ne sera pas le sujet de cet article introductif. Il y a notamment un « wizard » qui facilite l’implémentation d’une file en masquant la présence de composants plus bas-niveau.

LMAX a également produit beaucoup de documentation sur le sujet, à la fois académique et technique sur le fonctionnement interne de leur API (voir les références en bas de page).

Voici, pour moi, les deux principales idées exposées par l’équipe LMAX

1) Synchronisation non bloquante

La principale affirmation, partagée aujourd’hui par d’autres librairies comme ZMQ, est que l’utilisation de mécanismes de synchronisation bloquants est un anti-pattern contre-productif: il est plus rapide d’exécuter une pile de traitements avec un seul thread que de lancer plusieurs threads qui devront se synchroniser (bloquer un thread pour en attendre un autre).

Lorsque l’on parle de synchronisation « bloquante », il s’agit des primitives telles que les verrous (lock et Monitor, Mutex, Semaphore, WaitHandle, Barrier…). On parle habituellement de mécanismes non bloquants (lock free) lorsque l’on utilise Interlocked . Ce dernier n’est cependant pas gratuit: il est beaucoup plus performant qu’un lock car il tire directement partie de fonctionnalités matérielles du processeur alors que les locks sont une surcouche au niveau de l’OS. Mais il a toujours un coût, de l’ordre de dizaines à centaines de fois le coût d’une instruction normale, d’après un article MSDN Magazine de Vance Morrison (à comparer à quelques milliers de fois pour un lock). Il n’est pas toujours clair qu’une librairie lock free utilise ou non des instructions de synchronisation atomiques. J’aime remplacer cette expression par Context switch free. Cela n’indique pas qu’un thread n’attendra pas dans une boucle (spinlocks).

Dans le cas de Disruptor, les opérations atomiques d’Interlocked sont bien utilisées, là où la plupart des implémentation utilisent un verrou (type Monitor). Consultez le guide de démarrage rapide de Disruptor pour vous assurer de ne pas utiliser de locks (Optionally Lock-free et Alternative Wait Strategies).

2) Réutilisation d’objets

La seconde affirmation est que la création d’objets pendant le cycle normal de l’application nuit à ses performances: en effet, la création d’objet, que ce soit en .NET ou en Java, est ce qui déclenche le Garbage Collector, susceptible d’arrêter toute exécution de l’application (sauf cas particulier, comme l’instanciation de types valeurs qui ne dépendent pas de types références). Bien que cette pause soit de très courte durée, cela fausse le déterminisme d’une mesure de performances sur une application devant exécuter très rapidement un très grand nombre de traitements.

Caractéristiques principales

  • Communication inter-thread (intra-process).
  • Non bloquant (aka lock free).
  • Non persistent : toutes les ressources sont en mémoire. Pour de la persistence, il suffit d’implémenter un Event Handler responsable de l’enregistrement des messages.
  • Object pool : aucune création d’objet lors de la mise en file d’attente.
  • Multiples producteurs.
  • Multiples consommateurs (un thread par consommateur).

Noter que ces caractéristiques s’appliquent au chemin critique de l’application: il faut bien sûr créer des objets en phase d’initialisation (typiquement au démarrage de l’application) et il y a bien des mécanismes de synchronisation bloquante dans certains cas, mais pas pendant la phase normale de fonctionnement de l’application.

Concepts clés

Ring Buffer


Source de l’illustration:Dissecting the Disruptor (Trisha Gee)

Le Ring Buffer est le principal composant de l’API. Comme son nom l’indique, c’est un buffer circulaire, techniquement sans fin (en pratique, il a bien une limite mais il faudrait 300 ans pour l’atteindre à raison de 1 milliard de messages par seconde).

C’est lui qui permet la publication de messages sans bloquer les threads producteurs, grâce à une technique intelligente de transaction basée sur un CAS atomique (Compare & Swap). De plus, le Ring Buffer est également un pool d’objets (du type des messages à publier). La publication de chaque message ne donne donc pas lieu à la création d’un nouvel objet (si cette fonctionnalité est correctement utilisée). Lors de l’initialisation du Ring Buffer, on lui spécifie une capacité (une puissance de deux). Autant d’instances de messages seront créées au démarrage du buffer (pour être exact: la puissance de deux immédiatement supérieure à la capacité spécifiée).

Concrètement, un producteur doit publier son message en deux opérations, entre les deux, il renseigne le message:

  • Demande d’un slot libre dans le Ring Buffer.
  • … définition du message dans le slot obtenu.
  • Publication du slot dans le Ring Buffer.

La non allocation d’objet repose sur votre responsabilité de recopier les valeurs du message à publier. Par exemple :

class Event
{
	public bool Flag1 { get; set; }
	public int Number1 { get; set; }
}

long seqNo = ringBuffer.Next(); // step 1
Event msg = ringBuffer[seqNo];
msg.Flag1 = true;
msg.Number1 = 10;
ringBuffer.Publish(seqNo); // step 2

Dans l’exemple ci-dessus, il n’y a aucune allocation d’objet car msg est extrait du pool d’objets géré par le Ring Buffer et nous recopions des types valeurs. Si en revanche, nous avions recopié des types références, il y a de fortes chances pour que les références copiées aient donné lieu, précédemment, à la création d’un objet (à moins que vous ne gériez votre propre pool).

Event handlers

L’event est le message publié par un producteur et consommé par un consommateur. Un event handler est simplement un consommateur. Celui-ci doit implémenter une interface constituée d’une seule méthode destinée à recevoir le message.

Autres composants

Il existe d’autres composants que je ne décrirai pas dans cet article et qui permettent un assemblage plus fin de votre file. Ici, j’utilise le « wizard » Disruptor qui encapsule ces composants et n’expose que les deux principaux cités plus haut: le Ring Buffer et les Event Handlers.

Exemples

Premier exemple : une simple file (un unique consommateur)

Le code source complet est disponible sur GitHub (SimpleQueue).

Cet exemple se résume en une seule classe qui représente notre processeur et qui inclus la gestion de la file et son traitement:

public sealed class SimpleQueueProcessor : IEventHandler<Event>, IDisposable
{
    private readonly Disruptor<Event> _disruptor;
    private RingBuffer<Event> _ringBuffer;
    private int _disposeCount;
    private bool _started;
    private readonly bool _enableZip;

    public SimpleQueueProcessor(SimpleQueueProcessorOptions options)
    {
        _enableZip = options.EnableZip;
        _disruptor = new Disruptor<Event>(() => new Event(), options.BufferLength, TaskScheduler.Default);
        _disruptor.HandleEventsWith(this);
    }

    public void Start()
    {
        _ringBuffer = _disruptor.Start();
        _started = true;
    }

    public void Publish(string filepath)
    {
        if (_disposeCount != 0)
            throw new ObjectDisposedException(this.GetType().Name);
        if (!_started)
            throw new InvalidOperationException("Method Start() must be called before this method.");
        if (string.IsNullOrEmpty(filepath))
            throw new ArgumentNullException("filepath");
        long seqNo;
        seqNo = _ringBuffer.Next();
        try
        {
            Event entry = _ringBuffer[seqNo];
            entry.Filepath = filepath;
        }
        finally
        {
            _ringBuffer.Publish(seqNo);
        }
    }

    void IEventHandler<Event>.OnNext(Event data, long sequence, bool endOfBatch)
    {
        if (!_enableZip)
            return;
        try
        {
            var zipPath = string.Format("{0}.zip", data.Filepath);
            using (ZipArchive zip = ZipFile.Open(zipPath, ZipArchiveMode.Create))
            {
                zip.CreateEntryFromFile(
                    data.Filepath,
                    Path.GetFileName(data.Filepath),
                    CompressionLevel.Optimal);
            }
            Console.WriteLine(string.Format("ZIP created: {0}", zipPath));
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.ToString());
        }
    }

    void IDisposable.Dispose()
    {
        if (Interlocked.Increment(ref _disposeCount) != 1)
            return;
        _disruptor.Shutdown();
    }
}

Voici un exemple d’utilisation :

var options = new SimpleQueueProcessorOptions();
using (var proc = new SimpleQueueProcessor(options))
{
	proc.Start();
	foreach (var path in filesSample)
	{
		Console.WriteLine("{1:HH:mm:ss.fff}: Publish file: {0}.", path, DateTime.Now);
		proc.Publish(path);
	}
}

Les options permettent d’activer ou de désactiver le traitement qui, ici, consiste à compresser le fichier (ex: c:\test.txt -> c:\test.txt.zip).
Voici un exemple de sortie console avec traitements désactivés:

15:23:04.932: File published: Temp\tmp637F.tmp.
15:23:04.932: File published: Temp\tmp6380.tmp.
15:23:04.932: File published: Temp\tmp6381.tmp.
15:23:04.932: File published: Temp\tmp6392.tmp.
15:23:04.932: File published: Temp\tmp6393.tmp.
15:23:04.932: File published: Temp\tmp6394.tmp.
15:23:04.932: File published: Temp\tmp6395.tmp.
15:23:04.932: File published: Temp\tmp6396.tmp.
15:23:04.932: File published: Temp\tmp6397.tmp.
15:23:04.932: File published: Temp\tmp6398.tmp.

Et un exemple avec la compression activée:

15:23:04.950: Publishing file: Temp\tmp6399.tmp.
15:23:04.951: Publishing file: Temp\tmp63A9.tmp.
15:23:04.951: Publishing file: Temp\tmp63AA.tmp.
15:23:04.951: Publishing file: Temp\tmp63AB.tmp.
15:23:04.951: Publishing file: Temp\tmp63AC.tmp.
15:23:04.952: ZIP created: Temp\tmp6399.tmp.zip
15:23:04.953: Publishing file: Temp\tmp63AD.tmp.
15:23:04.954: ZIP created: Temp\tmp63A9.tmp.zip
15:23:04.957: ZIP created: Temp\tmp63AA.tmp.zip
15:23:04.959: ZIP created: Temp\tmp63AB.tmp.zip
15:23:04.960: Publishing file: Temp\tmp63AE.tmp.
15:23:04.960: Publishing file: Temp\tmp63AF.tmp.
15:23:04.960: Publishing file: Temp\tmp63B0.tmp.
15:23:04.962: ZIP created: Temp\tmp63AC.tmp.zip
15:23:04.963: Publishing file: Temp\tmp63B1.tmp.
15:23:04.964: ZIP created: Temp\tmp63AD.tmp.zip
15:23:04.966: ZIP created: Temp\tmp63AE.tmp.zip
15:23:04.969: ZIP created: Temp\tmp63AF.tmp.zip
15:23:04.972: ZIP created: Temp\tmp63B0.tmp.zip
15:23:04.975: ZIP created: Temp\tmp63B1.tmp.zip

Plusieurs choses à noter :

La taille du Ring Buffer est 4 par défaut (défini arbitrairement dans la classe SimpleQueueProcessorOptions). Cela implique qu’il n’est pas possible de publier plus de quatre événements dans le buffer: le buffer doit se vider d’un slot avant de pouvoir recevoir un nouvel événement. Par conséquent, la cinquième publication bloque tant qu’un des quatre premiers événements n’a pas été traité.

La seconde sortie console devrait vous apparaitre plus cohérente avec cette information : on constate que 4 publications ont lieu. La cinquième est bloquée jusqu’à ce qu’un traitement soit terminé, ce qui correspond à la sixième ligne indiquant la création d’un ZIP. La septième ligne indique une nouvelle publication qui bloque une nouvelle fois. Les lignes suivantes indiquent la création de 3 ZIP, ce qui permet de publier 3 nouveaux messages, etc..

La première sortie console illustre la rapidité à laquelle la file est traitée (mêmes réglages, file de 4 positions): le traitement n’a aucune action (à part retirer un message de la file).

La classe SimpleQueueProcessor implémente :

  • La gestion du cycle de vie du « processeur », aka Disruptor<Event>: création, publication d’événements, arrêt (via IDisposable).
  • Le traitement des messages (via IEventHandler<Event>).

Dans l’exemple suivant, nous verrons à quel point il est simple (et en même temps plus propre) d’organiser notre code de façon à permettre plusieurs consommateurs.

Deuxième exemple : plusieurs consommateurs parallèles

Le code source est disponible dans le répertoire MultipleConsumersProcessor (GitHub).

Cet exemple est identique au premier à ceci près que nous avons deux IEventHandler<Event>, distincts du processeur:

public sealed class MultipleConsumersProcessor : IDisposable
{
    private readonly Disruptor<Event> _disruptor;
    private int _disposeCount;
    private RingBuffer<Event> _ringBuffer;
    private bool _started;

    public MultipleConsumersProcessor(MultipleConsumersProcessorOptions options)
    {
        _disruptor = new Disruptor<Event>(() => new Event(), options.BufferLength, TaskScheduler.Default);

        var logHandler = new LogEventHandler();
        var zipHandler = new ZipEventHandler();
        _disruptor.HandleEventsWith(logHandler, zipHandler); // <-- Difference here
    }

    // Rest of source identical to first example [...] 
    public void Start();
    public void Publish(string filepath);
    void IDisposable.Dispose();
}

Contrairement au premier exemple, notre processeur MultipleConsumersProcessor n’implémente plus d’Event Handler. Au contraire, il en branche deux qui seront exécutés de façon parallèle.

La publication d’événement ne change pas. Voici un exemple de sortie console:

15:23:04.758: Publishing file: Temp\tmp626C.tmp.
15:23:04.761: File path processing: Temp\tmp626C.tmp.
15:23:04.761: Publishing file: Temp\tmp626D.tmp.
15:23:04.761: File path processing: Temp\tmp626D.tmp.
15:23:04.761: Publishing file: Temp\tmp626E.tmp.
15:23:04.761: File path processing: Temp\tmp626E.tmp.
15:23:04.761: Publishing file: Temp\tmp628E.tmp.
15:23:04.761: File path processing: Temp\tmp628E.tmp.
15:23:04.761: Publishing file: Temp\tmp628F.tmp.
15:23:04.804: ZIP created: Temp\tmp626C.tmp.zip
15:23:04.806: ZIP created: Temp\tmp626D.tmp.zip
15:23:04.809: ZIP created: Temp\tmp626E.tmp.zip
15:23:04.812: ZIP created: Temp\tmp628E.tmp.zip
15:23:04.812: File path processing: Temp\tmp628F.tmp.
15:23:04.814: Publishing file: Temp\tmp62A0.tmp.
15:23:04.814: Publishing file: Temp\tmp62A1.tmp.
15:23:04.814: Publishing file: Temp\tmp62A2.tmp.
15:23:04.814: Publishing file: Temp\tmp62A3.tmp.
15:23:04.814: File path processing: Temp\tmp62A0.tmp.
15:23:04.814: File path processing: Temp\tmp62A1.tmp.
15:23:04.814: File path processing: Temp\tmp62A2.tmp.
15:23:04.817: ZIP created: Temp\tmp628F.tmp.zip
15:23:04.818: File path processing: Temp\tmp62A3.tmp.
15:23:04.818: Publishing file: Temp\tmp62C3.tmp.
15:23:04.819: ZIP created: Temp\tmp62A0.tmp.zip
15:23:04.822: ZIP created: Temp\tmp62A1.tmp.zip
15:23:04.824: ZIP created: Temp\tmp62A2.tmp.zip
15:23:04.825: File path processing: Temp\tmp62C3.tmp.
15:23:04.826: ZIP created: Temp\tmp62A3.tmp.zip
15:23:04.829: ZIP created: Temp\tmp62C3.tmp.zip

Bien que ce ne soit pas évident à interpréter, la compression (« ZIP created ») et le log (« File path processing ») sont effectués en parallèle, sans ordre défini. Comme le log est plus rapide à exécuter, il est visible systématiquement avant la compression du fichier.

Le troisième exemple illustre comment ordonner l’exécution entre plusieurs Event Handlers.

Troisième exemple: plusieurs consommateurs ordonnés

Le code source est disponible dans le répertoire OrderedConsumersProcessor (GitHub).

Le code est strictement identique, à l’exception d’une ligne dans le constructeur du processeur:

var logHandler = new LogEventHandler();
var zipHandler = new ZipEventHandler();
_disruptor.HandleEventsWith(logHandler).Then(zipHandler);

Ici, le processeur garantie de traiter chaque message d’abord avec LogEventHandler, puis avec ZipEventHandler. La sortie console est similaire au précédent exemple.

Ce dernier exemple illustre une façon d’utiliser Disruptor pour de l’Event Sourcing: on peut considérer que le rôle de LogEventHandler est de sauvegarder l’événement dans un Event Store et que ZipEventHandler est responsable du traitement métier (business domain).

Recommendations

Optimisations

Si vous utilisez Disruptor, je vous conseille de lire le guide de démarrage rapide du projet, notamment la section Basic Tuning Options.
Il est possible d’optimiser les performances du processeur si l’on prévoit le scénario dans lequel il sera utilisé, notamment s’il y aura un ou plusieurs producteur.

Contrôle de flux

Lorsque l’on utilise une file d’attente pour le traitement de messages, le principal danger est de produire plus vite que l’on ne consomme. Les conséquences sont variables selon l’API utilisée. Dans le cas de Disruptor, le producteur sera simplement bloqué jusqu’à pouvoir publier son message. Selon les applications, cela peut s’avérer fortement néfaste. Surtout si l’on ne pense pas au fait qu’une publication peut bloquer. Le but d’une publication de message est justement de retourner immédiatement pour permettre à l’application de continuer. Si ce cas n’est pas envisagé, l’application peut subitement « geler ».

Bien souvent, il est préférable d' »échouer » proprement, en rejetant une publication. À charge du producteur de republier son message un peu plus tard. C’est une façon de dire au(x) producteur(s) « Stop, vous êtes trop rapides, faites une pause ».
Disruptor propose la définition d’un timeout lors de la publication:

long seqNo;
try
{
    seqNo = _ringBuffer.Next(TimeSpan.FromMilliseconds(500));
}
catch(TimeoutException)
{
    return false;
}
try
{
    Event entry = _ringBuffer[seqNo];
    entry.Filepath = filepath;
}
finally
{
    _ringBuffer.Publish(seqNo);
}
return true;

Rappelons au passage qu’une fois qu’un slot est obtenu dans le Ring Buffer, il est indispensable de le publier, ce qui explique notre bloc try{} finaly{}.

Références

LMAX Disruptor (projet)
LMAX Disruptor: Performance Results
The Disruptor – Lock-free publishing
LMAX Disruptor White paper: High performance alternative to bounded queues for exchanging data between concurrent threads

Et aussi:
Using concurrency for scalability (MSDN Magazine, Jim Duffy, 2006-09)
Understand the Impact of Low-Lock Techniques in Multithreaded Apps (MSDN Magazine, Vance Morrison, 2005-10)