NetCore微服务实现事务一致性masstransit之saga使用
demo如下,一个订单处理的小例子:
首先看看结果很简单:
核心代码如下:
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using OrderProcessor.Event;
using ServiceModel;
using ServiceModel.Command;
using ServiceModel.DTO;
using ServiceModel.Event;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace OrderProcessor.Service
{
public class OrderProcessorStateMachine:MassTransitStateMachine
{
private readonly ILogger
public OrderProcessorStateMachine()
{
this.logger = GlobalServiceProvider.Instance.CreateScope().ServiceProvider.GetService<ILogger<OrderProcessorStateMachine>>();
this.InstanceState(x => x.State);
this.State(() => this.Processing);
this.ConfigureCorrelationIds();
this.Initially(this.SetOrderSummitedHandler());
this.During(Processing, this.SetStockReservedHandler(), SetPaymentProcessedHandler(), SetOrderShippedHandler());
SetCompletedWhenFinalized();
}
private void ConfigureCorrelationIds()
{
this.Event(() => this.OrderSubmitted, x => x.CorrelateById(c => c.Message.CorrelationId).SelectId(c => c.Message.CorrelationId));
this.Event(() => this.StockReserved, x => x.CorrelateById(c => c.Message.CorrelationId));
this.Event(() => this.PaymentProcessed, x => x.CorrelateById(c => c.Message.CorrelationId));
this.Event(() => this.OrderShipped, x => x.CorrelateById(c => c.Message.CorrelationId));
}
private EventActivityBinder<ProcessingOrderState, IOrderSubmitted> SetOrderSummitedHandler() =>
When(OrderSubmitted).Then(c \=> this.UpdateSagaState(c.Instance, c.Data.Order))
.Then(c \=> this.logger.LogInformation($"Order submitted to {c.Data.CorrelationId} received"))
.ThenAsync(c \=> this.SendCommand<IReserveStock>("rabbitWarehouseQueue", c))
.TransitionTo(Processing);
private EventActivityBinder<ProcessingOrderState, IStockReserved> SetStockReservedHandler() =>
When(StockReserved).Then(c \=> this.UpdateSagaState(c.Instance, c.Data.Order))
.Then(c \=> this.logger.LogInformation($"Stock reserved to {c.Data.CorrelationId} received"))
.ThenAsync(c \=> this.SendCommand<IProcessPayment>("rabbitCashierQueue", c));
private EventActivityBinder<ProcessingOrderState, IPaymentProcessed> SetPaymentProcessedHandler() =>
When(PaymentProcessed).Then(c \=> this.UpdateSagaState(c.Instance, c.Data.Order))
.Then(c \=> this.logger.LogInformation($"Payment processed to {c.Data.CorrelationId} received"))
.ThenAsync(c \=> this.SendCommand<IShipOrder>("rabbitDispatcherQueue", c));
private EventActivityBinder<ProcessingOrderState, IOrderShipped> SetOrderShippedHandler() =>
When(OrderShipped).Then(c \=>
{
this.UpdateSagaState(c.Instance, c.Data.Order);
c.Instance.Order.Status \= Status.Processed;
})
.Publish(c \=> new OrderProcessed(c.Data.CorrelationId, c.Data.Order))
.Finalize();
private void UpdateSagaState(ProcessingOrderState state, Order order)
{
var currentDate = DateTime.Now;
state.Created \= currentDate;
state.Updated \= currentDate;
state.Order \= order;
}
private async Task SendCommand<TCommand>(string endpointKey, BehaviorContext<ProcessingOrderState, IMessage> context)
where TCommand : class, IMessage
{
var sendEndpoint = await context.GetSendEndpoint(new Uri(""));
await sendEndpoint.Send<TCommand>(new
{
CorrelationId \= context.Data.CorrelationId,
Order \= context.Data.Order
});
}
public State Processing { get; private set; }
public Event<IOrderSubmitted> OrderSubmitted { get; private set; }
public Event<IOrderShipped> OrderShipped { get; set; }
public Event<IPaymentProcessed> PaymentProcessed { get; private set; }
public Event<IStockReserved> StockReserved { get; private set; }
}
}
using MassTransit;
using MassTransit.MongoDbIntegration.Saga;
using OrderProcessor;
using OrderProcessor.Service;
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
builder.Services.AddControllers();
builder.Services.AddMassTransit(x =>
{
x.UsingRabbitMq((context, cfg) =>
{
var connection = “amqp://lx:admin@ip:5672/my_vhost”;//不加主机会报错
cfg.Host(connection);
cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
cfg.UseMessageRetry(r => r.Immediate(5));
cfg.ConfigureEndpoints(context);
cfg.ReceiveEndpoint("", ep =>
{
ep.StateMachineSaga(new OrderProcessorStateMachine(), MongoDbSagaRepository<ProcessingOrderState>.Create("connecturl","db"));
});
});
});
var app = builder.Build();
app.Run();
这是整个订单的几个步骤。
想把代码都贴出来,过程梳理给大家参考,但是时间有限这个点没那么多了,而且我理应要把这个程序跑起来的。明天照常上班,暂不过多研究。
整个demo代码:
exercise/MassTransitDemo/MassTransitSagasDemo at master · liuzhixin405/exercise (github.com)
有兴趣可以还有一个demo:
exercise/MassTransitDemo/SagaTest-master at master · liuzhixin405/exercise (github.com)
masstransit官网:
MassTransit (masstransit-project.com)
不得不说这个东西真的很不错,不过暂时没找到翻译,大概的过了下文档,还有好多不清楚的,英文水平有限。demo都是来自外国大佬贡献的,很遗憾国内有这方面的文章,但是深入一点的都是国外友人的贡献,而且现成的微服务demo写的很好很多,视情况项目可借鉴。
此demo有待后续完善,或大佬帮忙补充后,再完整这个随笔的流程和代码,今天只是起个头。