How To Publish MediatR Notifications In Parallel

How To Publish MediatR Notifications In Parallel

MediatR is a popular library with a simple mediator pattern implementation in .NET.

Here's a definiton taken from MediatR's GitHub: "In-process messaging with no dependencies."

With the rise in popularity of the CQRS pattern, MediatR became the go-to library to implement commands and queries.

However, MediatR also has support for the publish-subscribe pattern using notifications. You can publish an INotification instance and have multiple subscribers handle the published message.

Until recently, the handlers subscribing to an INotification message could only execute serially, one by one.

In this week's newsletter, I'll show you how to configure MediatR to execute the handlers in parallel.

How Publish-Subscribe Works With MediatR

Before I talk about notification publishing strategies, let's see how publish-subscribe works with MediatR.

You need a class implementing the INotification interface:

public record OrderCreated(Guid OrderId) : INotification;

Then you need a respective INotificationHandler implementation:

public class OrderCreatedHandler : INotificationHandler<OrderCreated>
{
    private readonly INotificationService _notificationService;

    public OrderCreatedHandler(INotificationService notificationService)
    {
        _notificationService = notificationService;
    }

    public async Task Handle(
        OrderCreated notification,
        CancellationToken cancellationToken)
    {
        await _notificationService.SendOrderCreatedEmail(
            notification.OrderId,
            cancellationToken);
    }
}

And then you simply publish a message using either IMediator or IPublisher. I prefer using the IPublisher because it's more expressive:

await publisher.Publish(new OrderCreated(order.Id), cancellationToken);

MediatR will invoke all the respective handlers.

Introducing Notification Publisher Strategies

Before MediatR v12, the publishing strategy would invoke each handler individually.

However, there's a new interface INotificationPublisher controlling how the handlers are called.

The default implementation of this interface is ForeachAwaitPublisher:

public class ForeachAwaitPublisher : INotificationPublisher
{
    public async Task Publish(
        IEnumerable<NotificationHandlerExecutor> handlerExecutors,
        INotification notification,
        CancellationToken cancellationToken)
    {
        foreach (var handler in handlerExecutors)
        {
            await handler
                .HandlerCallback(notification, cancellationToken)
                .ConfigureAwait(false);
        }
    }
}

But now you can also use the TaskWhenAllPublisher:

public class TaskWhenAllPublisher : INotificationPublisher
{
    public Task Publish(
        IEnumerable<NotificationHandlerExecutor> handlerExecutors,
        INotification notification,
        CancellationToken cancellationToken)
    {
        var tasks = handlerExecutors
            .Select(handler => handler.HandlerCallback(
                notification,
                cancellationToken))
            .ToArray();

        return Task.WhenAll(tasks);
    }
}

Here's a comparison between these two strategies.

ForeachAwaitPublisher:

  • Invokes each handler one by one

  • Fails when an exception occurs in one of the handlers

TaskWhenAllPublisher:

  • Invokes all the handlers at the same time

  • Executes all the handlers regardless of one of them throwing an exception

If you store the task returned by TaskWhenAllPublisher you can access the Task.Exception property, which will contain an AggregateException instance. You can then implement more robust exception handling.

Configuring MediatR Notification Publishing Strategy

How do we configure which INotificationPublisher strategy MediatR will use?

There's a new way to apply configuration options when calling the AddMediatR method.

You supply an Action<MediatRServiceConfiguration> delegate and configure the MediatRServiceConfiguration instance.

If you want to use the TaskWhenAllPublisher strategy, you can either:

  • Provide a value for the NotificationPublisher property

  • Specify the strategy type on the NotificationPublisherType property

services.AddMediatR(config => {
    config.RegisterServicesFromAssemblyContaining<Program>();

    // Setting the publisher directly will make the instance a Singleton.
    config.NotificationPublisher = new TaskWhenAllPublisher();

    // Seting the publisher type will:
    // 1. Override the value set on NotificationPublisher
    // 2. Use the service lifetime from the ServiceLifetime property below
    config.NotificationPublisherType = typeof(TaskWhenAllPublisher);

    config.ServiceLifetime = ServiceLifetime.Transient;
});

You can also implement a custom INotificationPublisher instance and use your own implementation instead.

How Is This Useful?

Being able to run notification handlers in parallel provides a significant performance improvement over the default behavior.

However, note that all handlers will use the same service scope.

If you have service instances that don't support concurrent access you may run into problems.

Unfortunately, one such service instance is the EF Core DbContext.

In any case, I think this is a great addition to the already amazing MediatR library.

That's all for today.

See you next week.


P.S. Whenever you’re ready, there are 2 ways I can help you:

  1. Pragmatic Clean Architecture: This comprehensive course will teach you the system I use to ship production-ready applications using Clean Architecture. Learn how to apply the best practices of modern software architecture. Join 950+ students here.

  2. Patreon Community: Think like a senior software engineer with access to the source code I use in my YouTube videos and exclusive discounts for my courses. Join 820+ engineers here.