I have a scenario, where all handlers on single node should operate on single "Unit of Work" that is commited once all handlers are invoked. I think the best way is to do following:
When message is received, perform these action as part of the pipeline:
Can you give me hint on how to customize the Rebus pipeline to meet the requirements above?
I've ended up with this:
private static IBus _bus;
public void ConfigureServices(IServiceCollection services)
{
services.AddDbContext<MyDbContext>();
services.AddMvc();
services.AddTransient<IBus>(sp => _bus);
}
public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
StartRebus(app);
...
}
public static void StartRebus(this IApplicationBuilder app)
{
var rebusServices = new ServiceCollection();
rebusServices.AutoRegisterHandlersFromAssemblyOf<ActivityDenormalizer>();
rebusServices.AddTransient<MyDbContext>(sp =>
{
var messageContext = MessageContext.Current
?? throw new InvalidOperationException("MessageContext.Current is null.");
return messageContext.TransactionContext.Items
.GetOrThrow<MyDbContext>(nameof(MyDbContext));
});
rebusServices.AddRebus((configure, sp) => configure
.Transport(t => t.UseInMemoryTransport(new InMemNetwork(), "Messages"))
.Options(o =>
{
o.EnableUnitOfWork<MyDbContext>(
unitOfWorkFactoryMethod: messageContext =>
{
//create new dbcontext instance regardless of ServiceLifeTime.
//Notice that I'm using ApplicationServices here, not RebusServices.
var dbContext = ActivatorUtilities.CreateInstance<MyDbContext>(app.ApplicationServices);
messageContext.TransactionContext.Items[nameof(MyDbContext)] = dbContext;
return dbContext;
},
commitAction: (messageContext, dbContext) => dbContext.SaveChanges(),
cleanupAction: (messageContext, dbContext) => dbContext.Dispose());
}));
var rebusServiceProvider = rebusServices.BuildServiceProvider();
rebusServiceProvider.UseRebus();
_bus = rebusServiceProvider.GetRequiredService<IBus>();
}
application services and rebus services are interconnected at two places:
IBus is resolved by rebusServiceProvider
, but the instance is registered also in application services, so that I can send messages to it from my app.
MyDbContext dependencies (DbContextOptions etc..) are resolved by ApplicationServices, but the DbContext is also instantiated in Rebus' unitOfWorkFactoryMethod
and registered in rebusServices so that it can be injected to Rebus handlers.
Yes – check out Rebus.UnitOfWork – it has the hooks you need.
Specifically, for Entity Framework, you would do something like this:
Configure.With(new CastleWindsorContainerAdapter(container))
.Transport(t => t.UseMsmq("test"))
.Options(o => o.EnableUnitOfWork(
async context => new CustomUnitOfWork(context, connectionString),
commitAction: async (context, uow) => await uow.Commit()
))
.Start();
where CustomUnitOfWork
would look something like this:
class CustomUnitOfWork
{
public const string ItemsKey = "current-db-context";
readonly MyDbContext _dbContext;
public CustomUnitOfWork(IMessageContext messageContext, string connectionString)
{
_dbContext = new MyDbContext(connectionString);
messageContext.TransactionContext.Items[ItemsKey] = this;
}
public async Task Commit()
{
await _dbContext.SaveChangesAsync();
}
public MyDbContext GetDbContext() => _dbContext;
}
and then you would set up your IoC container to resolve MyDbContext
by fetching it from the current message context – with Castle Windsor, that would be done like this:
container.Register(
Component.For<CustomUnitOfWork>()
.UsingFactoryMethod(k =>
{
var messageContext = MessageContext.Current
?? throw new InvalidOperationException("Can't inject uow outside of Rebus handler");
return messageContext.TransactionContext.Items
.GetOrThrow<CustomUnitOfWork>(CustomUnitOfWork.ItemsKey);
})
.LifestyleTransient(),
Component.For<MyDbContext>()
.UsingFactoryMethod(k => k.Resolve<CustomUnitOfWork>().GetDbContext())
.LifestyleTransient()
);