How to commit UoW once all handlers are invoked

c# entity-framework-core rebus unit-of-work

Question

I have a scenario, where all handlers on single node should operate on single "Unit of Work" that is commited once all handlers are invoked. I think the best way is to do following:

When message is received, perform these action as part of the pipeline:

  1. Create new DbContext instance (UoW)
  2. Invoke handlers and pass the DbContext instance
  3. If all handlers are invoked without error call DbContext.SaveChanges
  4. Dispose DbContext

Can you give me hint on how to customize the Rebus pipeline to meet the requirements above?

EDIT:

I've ended up with this:

private static IBus _bus;

public void ConfigureServices(IServiceCollection services)
{
    services.AddDbContext<MyDbContext>();
    services.AddMvc();
    services.AddTransient<IBus>(sp => _bus);
}

public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
   StartRebus(app);
   ...
}

public static void StartRebus(this IApplicationBuilder app)
{
    var rebusServices = new ServiceCollection();
    rebusServices.AutoRegisterHandlersFromAssemblyOf<ActivityDenormalizer>();
    rebusServices.AddTransient<MyDbContext>(sp =>
    {
        var messageContext = MessageContext.Current
            ?? throw new InvalidOperationException("MessageContext.Current is null.");

        return messageContext.TransactionContext.Items
            .GetOrThrow<MyDbContext>(nameof(MyDbContext));
    });


    rebusServices.AddRebus((configure, sp) => configure
        .Transport(t => t.UseInMemoryTransport(new InMemNetwork(), "Messages"))
        .Options(o =>
        {
            o.EnableUnitOfWork<MyDbContext>(
                unitOfWorkFactoryMethod: messageContext =>
                {
                    //create new dbcontext instance regardless of ServiceLifeTime.
                    //Notice that I'm using ApplicationServices here, not RebusServices.
                    var dbContext = ActivatorUtilities.CreateInstance<MyDbContext>(app.ApplicationServices);
                    messageContext.TransactionContext.Items[nameof(MyDbContext)] = dbContext;
                    return dbContext;
                },
                commitAction: (messageContext, dbContext) => dbContext.SaveChanges(),
                cleanupAction: (messageContext, dbContext) => dbContext.Dispose());

        }));

    var rebusServiceProvider = rebusServices.BuildServiceProvider();
    rebusServiceProvider.UseRebus();
    _bus = rebusServiceProvider.GetRequiredService<IBus>();
}

application services and rebus services are interconnected at two places:

  1. IBus is resolved by rebusServiceProvider, but the instance is registered also in application services, so that I can send messages to it from my app.

  2. MyDbContext dependencies (DbContextOptions etc..) are resolved by ApplicationServices, but the DbContext is also instantiated in Rebus' unitOfWorkFactoryMethod and registered in rebusServices so that it can be injected to Rebus handlers.

1
2
2/6/2019 2:56:55 PM

Accepted Answer

Yes – check out Rebus.UnitOfWork – it has the hooks you need.

Specifically, for Entity Framework, you would do something like this:

Configure.With(new CastleWindsorContainerAdapter(container))
    .Transport(t => t.UseMsmq("test"))
    .Options(o => o.EnableUnitOfWork(
        async context => new CustomUnitOfWork(context, connectionString),
        commitAction: async (context, uow) => await uow.Commit()
    ))
    .Start();

where CustomUnitOfWork would look something like this:

class CustomUnitOfWork
{
    public const string ItemsKey = "current-db-context";

    readonly MyDbContext _dbContext;

    public CustomUnitOfWork(IMessageContext messageContext, string connectionString)
    {
        _dbContext = new MyDbContext(connectionString);
        messageContext.TransactionContext.Items[ItemsKey] = this;
    }

    public async Task Commit()
    {
        await _dbContext.SaveChangesAsync();
    }

    public MyDbContext GetDbContext() => _dbContext;
}

and then you would set up your IoC container to resolve MyDbContext by fetching it from the current message context – with Castle Windsor, that would be done like this:

container.Register(
    Component.For<CustomUnitOfWork>()
        .UsingFactoryMethod(k =>
        {
            var messageContext = MessageContext.Current
                ?? throw new InvalidOperationException("Can't inject uow outside of Rebus handler");

            return messageContext.TransactionContext.Items
                .GetOrThrow<CustomUnitOfWork>(CustomUnitOfWork.ItemsKey);
        })
        .LifestyleTransient(),

    Component.For<MyDbContext>()
        .UsingFactoryMethod(k => k.Resolve<CustomUnitOfWork>().GetDbContext())
        .LifestyleTransient()
);
0
1/10/2019 2:19:52 PM


Related Questions





Related

Licensed under: CC-BY-SA with attribution
Not affiliated with Stack Overflow
Licensed under: CC-BY-SA with attribution
Not affiliated with Stack Overflow