Damian Hickey

Mostly software and .NET related. Mostly.

Getting contextual state into a RavenDB's document store listener

Scenario: In multi-thread message driven application, I want to add contextual metadata to each and every modified and stored document. I would like to encapsulate this feature in such a way as so that it is done in one place and does not burden the event handlers with the responsibility.

An event handler may be supplied by seperate assembly and a contrived one that creates documents from events may look something like this:

public class MyEventHandler : IHandle<MyEvent>()
{
    private readonly IDocumentStore _store;
    
    public MyEventHandler(IDocumentStore store)
    {
        _store = store;
    }
    
    public void Handle(MyEvent @event)
    {
        using(var session = _store.OpenSession())
        {
            var doc = new MyDocument { // Populate properties from message };
            session.Store(doc);
            session.SaveChanges();
        }
    }
}

This handler is typically invoked at the point where the message enters this system in the main application and then

public void OnMessage<T>(Message<T> message)
{
    var handlers = _handlerResolver.ResolveAll<T>();
    foreach(var handler in handlers)
    {
        handler.Handle(message.Body);
    }
}

This method is reentrant and may (will) be called from more than one thread as the same time.

I would like to add contextual information, here message.Id, to each document that is created or modified as a result handling this message. Do do so I need to intercept the document etity instance just before it is saved. This is achieved through implementing the IDocumentStoreListener interface which looks like...

public interface IDocumentStoreListener
{
    void AfterStore(string key, object entityInstance, RavenJObject metadata);
    bool BeforeStore(string key, object entityInstance, RavenJObject metadata);
}

DocumentStoreListeners are registered before initializing the document store and like the DocumentStore, the listener's lifecyle is singleton with respect to the main application. Therefore it must be reentrant and thread safe.

var store = new DocumentStore { // Uri = etc };
store.RegisterListener(new EventIdDocumentStoreListener);
store.Initialize();

...where my implementation may look something like this (just BeforeStore() implemention shown for berevity):

public class EventIdDocumentStoreListener : IDocumentStoreListener
{
    public const string PropertyName = "LastMessageId";
    
    public bool BeforeStore(string key, object entityInstance, RavenJObject metadata)
    {
        RavenJToken token = RavenJToken.FromObject(messageId);
        if (metadata[PropertyName] == null)
        {
            metadata.Add(PropertyName, token);
        }
        else
        {
            metadata[PropertyName] = token;
        }
        return false;
    }
}

See the problem here? I have no obvious mechanism to supply the messageId into this method from which the RavenJToken is then constructed from. The solution I have arrived at is to use thread local storage, specifically .NET4's ThreadLocal<T>. This will allow me to store messageId value in a place that is specific to the running thread. I set it in the OnMessage dispatcher and pick it up in the listener. This ThreadLocal instance is also singleton with respect to the main application.

Now our listener will depend on a ThreadLocal:

public class EventIdDocumentStoreListener : IDocumentStoreListener
{
    public const string PropertyName = "LastMessageId";
    private readonly ThreadLocal<Guid> messageIdThreadLocal;
    
    public EventIdDocumentStoreListener(ThreadLocal<Guid> messageIdThreadLocal)
    {
        _messageIdThreadLocal = messageIdThreadLocal;
    }
    
    bool BeforeStore(string key, object entityInstance, RavenJObject metadata)
    {
            RavenJToken token = RavenJToken.FromObject(_messageIdThreadLocal.Value);
            if (metaData["LastMessageId"] == null)
            {
                metaData.Add(PropertyName, token);
            }
            else
            {
                metaData["LastMessageId"] = token;
            }
        }
    }
}

And our OnMessage dispatcher can set the message ID for the specific thread before calling the handlers:

public void OnMessage<T>(Message<T> message)
{
    var handlers = _handlerResolver.ResolveAll<T>();
    _messageIdThreadLocal.Value = message.Id;
    foreach(var handler in handlers)
    {
        handler.Handle(message.Body);
    }
}

So that is how I supply contextual state to a listeners in a safe and transparent manner.

Notes
- This code above is to demonstrate using ThreadLocal to pass contextual state into a RavenDB IDocumentStoreListener. It does not represent a production quality solution :)
- This solution does not work in an asp.net / wcf environment. Modifying above to do that is left as an exercise to the reader (hint: HttpContext.Items)

blog comments powered by Disqus