日志

.NET 云原生架构师训练营(模块二 基础巩固 RabbitMQ Masstransit 详解)--学习笔记

 来源    2021-01-13    0  

2.6.7 RabbitMQ -- Masstransit 详解

  • Consumer 消费者
  • Producer 生产者
  • Request-Response 请求-响应

Consumer 消费者

在 MassTransit 中,一个消费者可以消费一种或多种消息

消费者的类型包括:普通消费者,saga,saga 状态机,路由活动(分布式追踪),处理器 handlers,工作消费者 job comsumers

  • Consumer
  • Instance
  • Handler
  • Others

Consumer

public class Program
{
    public static async Task Main()
    {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.ReceiveEndpoint("order-service", e =>
            {
                e.Consumer<SubmitOrderConsumer>();
            });
        });
    }
}

继承 IConsumer,实现 Consume 方法

class SubmitOrderConsumer :
    IConsumer<SubmitOrder>
{
    public async Task Consume(ConsumeContext<SubmitOrder> context)
    {
        await context.Publish<OrderSubmitted>(new
        {
            context.Message.OrderId
        });
    }
}

三个原则:

  • 拥抱 The Hollywood Principle, which states, "Dont't call us, we'll call you."
  • Consume 方法是一个被等待的方法,在执行中时其他消费者无法接收到这个消息,当这个方法完成的时候,消息被 ack,并且从队列中移除
  • Task 方法异常会导致消息触发 retry,如果没有配置重试,消息将被投递到失败队列

Instance

public class Program
{
    public static async Task Main()
    {
        var submitOrderConsumer = new SubmitOrderConsumer();

        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.ReceiveEndpoint("order-service", e =>
            {
                e.Instance(submitOrderConsumer);
            });
        });
    }
}

所有接收到的消息都由一个消费者来实例来处理(请确保这个消费者类是线程安全)

Consumer 每次接收到消息都会 new 一个实例

Handler

public class Program
{
    public static async Task Main()
    {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.ReceiveEndpoint("order-service", e =>
            {
                e.Handler<SubmitOrder>(async context =>
                {
                    await Console.Out.WriteLineAsync($"Submit Order Received: {context.Message.OrderId}");
                });
            });
        });
    }
}

通过一个委托 Lambda 方法,来消费消息

Others

  • Saga<>
  • StateMachineSaga<>

Producer 生产者

消息的生产可以通过两种方式产生:发送和发布

发送的时候需要指定一个具体的地址 DestinationAddress,发布的时候消息会被广播给所有订阅了这个消息类型的消费者

基于这两种规则,消息被定义为:命令 command 和事件 event

  • send
  • publish

send

可以调用以下对象的 send 方法来发送 command:

  • ConsumeContext (在 Consumer 的 Consumer 方法参数中传递)
  • ISendEndpointProvider(可以从 DI 中获取)
  • IBusControl(最顶层的控制对象,用来启动和停止 masstransit 的控制器)

ConsumeContext

public class SubmitOrderConsumer : 
    IConsumer<SubmitOrder>
{
    private readonly IOrderSubmitter _orderSubmitter;

    public SubmitOrderConsumer(IOrderSubmitter submitter)
        => _orderSubmitter = submitter;

    public async Task Consume(IConsumeContext<SubmitOrder> context)
    {
        await _orderSubmitter.Process(context.Message);

        await context.Send(new StartDelivery(context.Message.OrderId, DateTime.UtcNow));
    }
}

ISendEndpointProvider

public async Task SendOrder(ISendEndpointProvider sendEndpointProvider)
{
    var endpoint = await sendEndpointProvider.GetSendEndpoint(_serviceAddress);

    await endpoint.Send(new SubmitOrder { OrderId = "123" });
}

publish

  • 发送地址
  • 短地址
  • Convention Map

发送地址

  • rabbitmq://localhost/input-queue
  • rabbitmq://localhost/input-queue?durable=false

短地址

  • GetSendEndpoint(new Uri("queue:input-queue"))

Convention Map

在配置文件中指定 map 规则

EndpointConvention.Map<StartDelivery>(new Uri(ConfigurationManager.AppSettings["deliveryServiceQueue"]));

直接发送

public class SubmitOrderConsumer : 
    IConsumer<SubmitOrder>
{
    private readonly IOrderSubmitter _orderSubmitter;

    public SubmitOrderConsumer(IOrderSubmitter submitter)
        => _orderSubmitter = submitter;

    public async Task Consume(IConsumeContext<SubmitOrder> context)
    {
        await _orderSubmitter.Process(context.Message);

        await context.Send(new StartDelivery(context.Message.OrderId, DateTime.UtcNow));
    }
}

可以调用以下对象的 publish 方法来发送 event:

  • ConsumeContext (在 Consumer 的 Consumer 方法参数中传递)
  • IPublishEndpoint(可以从 DI 中获取)
  • IBusControl(最顶层的控制对象,用来启动和停止 masstransit 的控制器)

IPublishEndpoint

public async Task NotifyOrderSubmitted(IPublishEndpoint publishEndpoint)
{
    await publishEndpoint.Publish<OrderSubmitted>(new
    {
        OrderId = "27",
        OrderDate = DateTime.UtcNow,
    });
}

Request-Response 请求-响应

Request-Response 模式让应用程序之间解耦之后,依然采用同步的方式

  • Consumer
  • IClientFactory
  • IRequestClient
  • Send a request

Consumer

public async Task Consume(ConsumeContext<CheckOrderStatus> context)
{
    var order = await _orderRepository.Get(context.Message.OrderId);
    if (order == null)
        throw new InvalidOperationException("Order not found");
    
    await context.RespondAsync<OrderStatusResult>(new 
    {
        OrderId = order.Id,
        order.Timestamp,
        order.StatusCode,
        order.StatusText
    });
}

需要处理返回类型 OrderStatusResult,异步方式模拟同步,实际上同样有消息队列,消费者处理过程

IClientFactory

public interface IClientFactory 
{
    IRequestClient<T> CreateRequestClient<T>(ConsumeContext context, Uri destinationAddress, RequestTimeout timeout);

    IRequestClient<T> CreateRequestClient<T>(Uri destinationAddress, RequestTimeout timeout);

    RequestHandle<T> CreateRequest<T>(T request, Uri destinationAddress, CancellationToken cancellationToken, RequestTimeout timeout);

    RequestHandle<T> CreateRequest<T>(ConsumeContext context, T request, Uri destinationAddress, CancellationToken cancellationToken, RequestTimeout timeout);
}

通过 IBusControl 的 CreateClientFactory 方法可以得到 ClientFactory

IRequestClient

public interface IRequestClient<TRequest>
    where TRequest : class
{
    RequestHandle<TRequest> Create(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout);

    Task<Response<T>> GetResponse<T>(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout);
}

RequestClient 可以创建请求,或者直接获得响应

Send a request

var serviceAddress = new Uri("rabbitmq://localhost/check-order-status");
var client = bus.CreateRequestClient<CheckOrderStatus>(serviceAddress);

var response = await client.GetResponse<OrderStatusResult>(new { OrderId = id});

本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。

欢迎转载、使用、重新发布,但务必保留文章署名 郑子铭 (包含链接: https://www.cnblogs.com/MingsonZheng/ ),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。

如有任何疑问,请与我联系 (MingsonZheng@outlook.com) 。

相关文章
Docker Data Center系列(一)- 快速搭建云原生架构的实践环境
日志本系列文章演示如何快速搭建一个简单的云原生架构的实践环境. 基于这个基础架构,可以持续部署微服务架构的应用栈,演练敏捷开发过程,提升DevOps实践能力. 1 整体规划 1.1 拓扑架构 1.2 基础 ...
1
架构师养成记--1线程基础
日志为什么要拿线程开头呢?因为线程太重要了,先不说工作中有多重要,单是和money直接挂钩的开发工程师面试就必问题,要是面试没问相关问题的话,要么是应聘者太菜面试官懒得问了,要么是公司太菜,根本不关心多线 ...
3
大数据系统架构师 0.2 Linux基础
日志1. Linux基本环境 1.1 大数据Hadoop前置大纲讲解 1)Linux系统,基本命令 2)Java语言,JavaSE相关知识 3)MySQL基本的DML和DDL 1.2 常见Linux系统. ...
1
彻底告别加解密模块代码拷贝-JCE核心Cpiher详解
日志前提 javax.crypto.Cipher,翻译为密码,其实叫做密码器更加合适.Cipher是JCA(Java Cryptographic Extension,Java加密扩展)的核心,提供基于多种 ...
2
vuex源码分析(二) state及strict属性 详解
日志state也就是vuex里的值,也即是整个vuex的状态,而strict和state的设置有关,如果设置strict为true,那么不能直接修改state里的值,只能通过mutation来设置 例1: ...
2
Appium+python自动化(十一)- 元素定位秘籍助你打通任督二脉 - 下卷(超详解)
日志简介 宏哥看你骨骼惊奇,印堂发亮,必是练武之奇才! 按照上一篇的节目预告,这一篇还是继续由宏哥给小伙伴们分享元素定位,是不是按照上一篇的秘籍修炼,是不是感觉到头顶盖好像被掀开,内气从头上冒出去,顿时觉 ...
1
Appium+python自动化(十)- 元素定位秘籍助你打通任督二脉 - 上卷(超详解)
日志简介 你有道灵光从天灵盖喷出来你知道吗,年纪轻轻就有一身横练的筋骨,简直百年一见的练武奇才啊,如果有一天让你打通任督二脉,那还不飞龙上天啊.正所谓我不入地狱谁入地狱,警恶惩奸维护世界和平这个任务就交个 ...
1
原生JS:Function对象(apply、call、bind)详解
日志Function对象(apply.call.bind) 原创文章,转摘请注明出处:苏福:https://www.cnblogs.com/susufufu/p/5850180.html 本文参考MDN做 ...
2
visual Studio 2017 扩展开发(二)《菜单图标详解》
日志在上一篇我们在菜单栏创建了一个菜单,菜单上显示了一个图标跟文本.那么我们自己创建的菜单如何修改自定义的菜单图标呢.下面娓娓道来..... 首先你要有一个图,创建一个32位的位图.这个位图的像素是16p ...
2
如何设计提高服务API的安全性(二)API密钥方式详解
日志在上文已经讲述了基础介绍,这篇文章详细讲解API密钥方式. 利用何种加密方式呢?       经过上面加密算法的理解,单向加密不仅性能高,而且有压缩性,即长度一致,有效减少网络传输过程中的字节大小.适 ...
1
odoo模块__ manifest __.py文件详解
日志自动创建的模块中的__ manifest __.py # -*- coding: utf-8 -*- { # 模块名称 'name': "cmd_test", # 关键词 'sum ...
3
(二十三)原型模式详解(clone方法源码的简单剖析)
日志                作者:zuoxiaolong8810(左潇龙),转载请注明出处,特别说明:本博文来自博主原博客,为保证新博客中博文的完整性,特复制到此留存,如需转载请注明新博客地址即可 ...
1
(二十一)状态模式详解(DOTA版)
日志              作者:zuoxiaolong8810(左潇龙),转载请注明出处,特别说明:本博文来自博主原博客,为保证新博客中博文的完整性,特复制到此留存,如需转载请注明新博客地址即可. ...
1
RequireJS进阶-模块的优化及配置的详解
日志概述 关于RequireJS已经有很多文章介绍过了.这个工具可以将你的JavaScript代码轻易的分割成苦干个模块(module)并且保持你的代码模块化与易维护性.这样,你将获得一些具有互相依赖关系 ...
1
Python之路第十二篇续 jQuery案例详解
日志jQuery 1.jQuery和JS和HTML的关系 首先了HTML是实际展示在用户面前的用户可以直接体验到的,JS是操作HTML的他能改变HTML实际展示给用户的效果! 首先了解JS是一门语言,他是 ...
4
单点登录(二):功能实现详解
日志上一篇<单点登录(一):思考>介绍了我在做单点登录功能过程中的一些思考,本篇内容将基于这些思考作代码实现详细的介绍. 票据的定义 票据是用户登录成功后发给用户的凭据,在本篇博客中,票据可被 ...
1
STL之二:vector容器用法详解
日志转载于:http://blog.csdn.net/longshengguoji/article/details/8507394 vector类称作向量类,它实现了动态数组,用于元素数量变化的对象数组. ...
1
二十三:原型模式详解(clone复制方法源码)
日志定义:用原型实例指定创建对象的种类,并且通过拷贝这些原型创建新的对象.                 定义比较简单,总结一下是通过实例指定种类,通过拷贝创建对象.                 在 ...
1
phpcms V9 二次开发------(获取点击数详解)
日志关于phpcms V9的点击数的使用应该有不少数是直接调用网上搜索到的代码,但是对于一些想要深入研究开发的人来说,看到网上的代码后更是不解,本人这几天看了看,了解了一些东西,在这里写出来分享一下,首先 ...
1
WCF全析(二) --服务配置部署详解
日志        上篇文章主要讨论了WCF的基本内容,其中包括WCF的术语.创建方法及WCF在开发过程中使用的意义,它不仅能够提供程序之间的通信,而且还能提供程序和数据间的通信,WCF提供了多样化的程序 ...
5