您现在的位置是:网站首页> 编程资料编程资料

.NET中的MassTransit分布式应用框架详解_实用技巧_

2023-05-24 388人已围观

简介 .NET中的MassTransit分布式应用框架详解_实用技巧_

MassTransit是一款优秀的分布式应用框架,可作为分布式应用的消息总线,也可以用作单体应用的事件总线。

引言

A free, open-source distributed application framework for .NET.
一个免费、开源的.NET 分布式应用框架。-- MassTransit 官网

MassTransit,直译公共交通, 是由Chris Patterson开发的基于消息驱动的.NET 分布式应用框架,其核心思想是借助消息来实现服务之间的松耦合异步通信,进而确保应用更高的可用性、可靠性和可扩展性。通过对消息模型的高度抽象,以及对主流的消息代理(包括RabbitMQ、ActiveMQ、Kafaka、Azure Service Bus、Amazon SQS等)的集成,大大简化了基于消息驱动的开发门槛,同时内置了连接管理、消息序列化和消费者生命周期管理,以及诸如重试、限流、断路器等异常处理机制,让开发者更好的专注于业务实现。
简而言之,MassTransit实现了消息代理透明化。无需面向消息代理编程进行诸如连接管理、队列的申明和绑定等操作,即可轻松实现应用间消息的传递和消费。

快速体验

空口无凭,创建一个项目快速体验一下。

  • 基于worker模板创建一个基础项目:dotnet new worker -n MassTransit.Demo
  • 打开项目,添加NuGet包:MassTransit
  • 定义订单创建事件消息契约:
using System; namespace MassTransit.Demo { public record OrderCreatedEvent { public Guid OrderId { get; set; } } }

4.修改Worker类,发送订单创建事件:

namespace MassTransit.Demo; public class Worker : BackgroundService { readonly IBus _bus;//注册总线 public Worker(IBus bus) { _bus = bus; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { //模拟并发送订单创建事件 await _bus.Publish(new OrderCreatedEvent(Guid.NewGuid()), stoppingToken); await Task.Delay(1000, stoppingToken); } } }

5.仅需实现IConsumer泛型接口,即可实现消息的订阅:

public class OrderCreatedEventConsumer: IConsumer { private readonly ILogger _logger; public OrderCreatedEventConsumer(ILogger logger) { _logger = logger; } public Task Consume(ConsumeContext context) { _logger.LogInformation($"Received Order:{context.Message.OrderId}"); return Task.CompletedTask; } }

6.注册服务:

using MassTransit; using MassTransit.Demo; IHost host = Host.CreateDefaultBuilder(args) .ConfigureServices(services => { services.AddHostedService(); services.AddMassTransit(configurator => { //注册消费者 configurator.AddConsumer(); //使用基于内存的消息路由传输 configurator.UsingInMemory((context, cfg) => { cfg.ConfigureEndpoints(context); }); }); }) .Build(); await host.RunAsync();

7.运行项目,一个简单的进程内事件发布订阅的应用就完成了。

如果需要使用RabbitMQ 消息代理进行消息传输,则仅需安装MassTransit.RabbitMQNuGet包,然后指定使用RabbitMQ 传输消息即可。

using MassTransit; using MassTransit.Demo; IHost host = Host.CreateDefaultBuilder(args) .ConfigureServices(services => { services.AddHostedService(); services.AddMassTransit(configurator => { configurator.AddConsumer(); // configurator.UsingInMemory((context, cfg) => // { // cfg.ConfigureEndpoints(context); // }); configurator.UsingRabbitMq((context, cfg) => { cfg.Host( host: "localhost", port: 5672, virtualHost: "/", configure: hostConfig => { hostConfig.Username("guest"); hostConfig.Password("guest"); }); cfg.ConfigureEndpoints(context); }); }); }) .Build(); await host.RunAsync();

运行项目,MassTransit会自动在指定的RabbitMQ上创建一个类型为fanoutMassTransit.Demo.OrderCreatedEventExchange和一个与OrderCreatedEvent同名的队列进行消息传输,如下图所示。

核心概念

MassTranist 为了实现消息代理的透明化和应用间消息的高效传输,抽象了以下概念,其中消息流转流程如下图所示:

  • Message:消息契约,定义了消息生产者和消息消费者之间的契约。
  • Producer:生产者,发送消息的一方都可以称为生产者。
  • SendEndpoint:发送端点,用于将消息内容序列化,并发送到传输模块。
  • Transport:传输模块,消息代理透明化的核心,用于和消息代理通信,负责发送和接收消息。
  • ReceiveEndpoint:接收端点,用于从传输模块接收消息,反序列化消息内容,并将消息路由到消费者。
  • Consumer:消费者,用于消息消费。

从上图可知,本质上还是发布订阅模式的实现,接下来就核心概念进行详解。

Message

Message:消息,可以使用class、interface、struct和record来创建,消息作为一个契约,需确保创建后不能篡改,因此应只保留只读属性且不应包含方法和行为。MassTransit使用的是包含命名空间的完全限定名即typeof(T).FullName来表示特定的消息类型。因此若在另外的项目中消费同名的消息类型,需确保消息的命名空间相同。另外需注意消息不应继承,以避免发送基类消息类型造成的不可预期的结果。为避免此类情况,官方建议使用接口来定义消息。在MassTransit中,消息主要分为两种类型:

  • Command:命令,用于告诉服务做什么,命令被发送到指定端点,仅被一个服务接收并执行。一般以动名词结构命名,如:UpdateAddress、CancelOrder。
  • Event:事件,用于告诉服务什么发生了,事件被发布到多个端点,可以被多个服务消费。 一般以过去式结构命名,如:AddressUpdated,OrderCanceled。

经过MassTransit发送的消息,会使用信封包装,包含一些附加信息,数据结构举例如下:

{ "messageId": "6c600000-873b-00ff-9a8f-08da8da85542", "requestId": null, "correlationId": null, "conversationId": "6c600000-873b-00ff-9526-08da8da85544", "initiatorId": null, "sourceAddress": "rabbitmq://localhost/THINKPAD_MassTransitDemo_bus_ptoyyyr88cyx9s1gbdpe5kniy1?temporary=true", "destinationAddress": "rabbitmq://localhost/MassTransit.Demo:OrderCreatedEvent", "responseAddress": null, "faultAddress": null, "messageType": [ "urn:message:MassTransit.Demo:OrderCreatedEvent" ], "message": { "orderId": "fd8a3598-4c3a-4ec9-bbf9-d5f508e1a0d8" }, "expirationTime": null, "sentTime": "2022-09-03T12:32:15.0796943Z", "headers": {}, "host": { "machineName": "THINKPAD", "processName": "MassTransit.Demo", "processId": 24684, "assembly": "MassTransit.Demo", "assemblyVersion": "1.0.0.0", "frameworkVersion": "6.0.5", "massTransitVersion": "8.0.6.0", "operatingSystemVersion": "Microsoft Windows NT 10.0.19044.0" } } 

从以上消息实例中可以看出一个包装后的消息包含以下核心属性:

  • messageId:全局唯一的消息ID
  • messageType:消息类型
  • message:消息体,也就是具体的消息实例
  • sourceAddress:消息来源地址
  • destinationAddress:消息目标地址
  • responseAddress:响应地址,在请求响应模式中使用
  • faultAddress:消息异常发送地址,用于存储异常消费消息
  • headers:消息头,允许应用自定义扩展信息
  • correlationId:关联Id,在Saga状态机中会用到,用来关联系列事件
  • host:宿主,消息来源应用的宿主信息

Producer

Producer,生产者,即用于生产消息。在MassTransit主要借助以下对象进行命令的发送和事件的发布。

从以上类图可以看出,消息的发送主要核心依赖于两个接口:

  • ISendEndpoint:提供了Send方法,用于发送命令。
  • IPublishEndpoint:提供了Publish方法,用于发布事件。

但基于上图的继承体系,可以看出通过IBusISendEndpointProviderConsumeContext进行命令的发送;通过IBusIPublishEndpointProvider进行事件的发布。具体举例如下:

发送命令

1.通过IBus发送:

private readonly IBus _bus; public async Task Post(CreateOrderRequest request) { //通过以下方式配置对应消息类型的目标地址 EndpointConvention.Map(new Uri("queue:create-order")); await _bus.Send(request); }

2.通过ISendEndpointProvider发送:

private readonly ISendEndpointProvider _sendEndpointProvider; public async Task Post(CreateOrderRequest request) { var serviceAddress = new Uri("queue:create-order"); var endpoint = await _sendEndpointProvider.GetSendEndpoint(serviceAddress); await endpoint.Send(request); }

3.通过ConsumeContext发送:

public class CreateOrderRequestConsumer:IConsumer { public async Task Consume(ConsumeContext context) { //do something else var destinationAddress = new Uri("queue:lock-stock"); var command = new LockStockRequest(context.Message.OrderId); await context.Send(destinationAddress, command); // 也可以通过获取`SendEndpoint`发送命令 // var endpoint = await context.GetSendEndpoint(destinationAddress); // await endpoint.Send(command); } }

发布事件

1.通过IBus发布:

private readonly IBus _bus; public async Task Post(CreateOrderRequest request) { //do something await _bus.Publish(request); }

2.通过IPublishEndpoint发布:

private readonly IPublishEndpoint _publishEndpoint; public async Task Post(CreateOrderRequest request) { //do something var order = CreateOrder(request); await _publishEndpoint.Publish(new OrderCreateEvent(order.Id)); }

3.通过ConsumeContext发布:

public class CreateOrderRequestConsumer: IConsumer { public async Task Consume(ConsumeContext context) { var order = CreateOrder(conext.Message); await context.Publish(new OrderCreateEvent(order.Id)); } }

Consumer

Consumer,消费者,即用于消费消息。MassTransit 包括多种消费者类型,主要分为无状态和有状态两种消费者类型。

提示: 本文由整理自网络,如有侵权请联系本站删除!
本站声明:
1、本站所有资源均来源于互联网,不保证100%完整、不提供任何技术支持;
2、本站所发布的文章以及附件仅限用于学习和研究目的;不得将用于商业或者非法用途;否则由此产生的法律后果,本站概不负责!

-六神源码网