NServiceBus是.Net平台下的开源的消息服务框架

2021-04-25 12:26

阅读:677

NServiceBus入门

技术图片
如图所示,项目一共包括4个端点(Endpoint),也就是四个单独的项目,端点是NServiceBus中的核心概念,发送消息和事件发布订阅的基础都是Endpoint。这个项目中包括发送消息和事件的发布订阅。

完整的项目结构如图所示:
技术图片

ClientUI

class Program
    {
        private static ILog log = LogManager.GetLogger();
        static void Main(string[] args)
        {
            MainAsync().GetAwaiter().GetResult();
        }


        static async Task RunAsync(IEndpointInstance endpointInstance)
        {
            log.Info("Press ‘P‘ to place an order,press ‘Q‘ to quit");
            
            while (true)
            {
                
                var key = Console.ReadKey();
                Console.WriteLine();

                switch (key.Key)
                {
                    case ConsoleKey.P:
                    {
                        var command = new PlaceOrder
                        {
                            OrderId = Guid.NewGuid().ToString()
                        };
                        
                        log.Info($"Sending PlaceOrder with OrderId:{command.OrderId}");
                        //发送到Sales端点
                        await endpointInstance.Send("Sales",command).ConfigureAwait(false);
                        break;
                    }
                    
                    case ConsoleKey.Q:
                        return;
                    default:
                        log.Info("Please try again");
                        break;
                }
                
            }
        }
        
        static async Task MainAsync()
        {
            Console.Title = "Client-UI";
            var config = new EndpointConfiguration("ClientUI");//设置端点名称
            config.UseTransport(); //设置消息管道模式,LearningTransport仅仅用来学习,生产慎用
            config.UsePersistence();//持久化

            var endpointInstance =await Endpoint.Start(config).ConfigureAwait(false);

            await RunAsync(endpointInstance).ConfigureAwait(false); //RunAsync返回的是Task,所以这里使用ConfigureAwait()
            
            await endpointInstance.Stop().ConfigureAwait(false);

        }
    }

Sales

class Program
{
    static async Task Main(string[] args)
    {
        Console.Title = "Sales";

        var config = new EndpointConfiguration("Sales");
        config.UseTransport();
        config.UsePersistence();

        var endpointInstance = await Endpoint.Start(config).ConfigureAwait(false);

        Console.WriteLine("Press Enter to quit...");
        Console.ReadLine();

        await endpointInstance.Stop().ConfigureAwait(false);

    }
}

public class PlaceOrderHandler:IHandleMessagesPlaceOrder>
{
   private static ILog log = LogManager.GetLogger();
   public Task Handle(PlaceOrder message, IMessageHandlerContext context)
   {
      //接受端点消息
      log.Info($"Received PlaceOrder ,OrderId:{message.OrderId}");

      //发布OrderPlaced事件
      var order=new OrderPlaced();
      order.OrderId = message.OrderId;

      return context.Publish(order);
   }
}

Billing

static async Task Main(string[] args)
{
    Console.Title = "Sales";

    var config = new EndpointConfiguration("Billing");
    config.UseTransport();
    config.UsePersistence();

    var endpointInstance = await Endpoint.Start(config).ConfigureAwait(false);

    Console.WriteLine("Press Enter to quit...");
    Console.ReadLine();

    await endpointInstance.Stop().ConfigureAwait(false);
}

 public class OrderPlacedHandler:IHandleMessagesOrderPlaced>
 {
    private static ILog log = LogManager.GetLogger();
         
    public Task Handle(OrderPlaced message, IMessageHandlerContext context)
    {
       //订阅OrderPlaced事件
       log.Info($"Received OrderPlaced,OrderId {message.OrderId} - Charging credit card"); 
            
       //发布OrderBilled事件
       var order=new OrderBilled();
       order.OrderId = message.OrderId;
       return context.Publish(order);
     }
}

Shipping

static async Task Main(string[] args)
{
    Console.Title = "Sales";

    var config = new EndpointConfiguration("Shipping");
    config.UseTransport();
    config.UsePersistence();

    var endpointInstance = await Endpoint.Start(config).ConfigureAwait(false);

    Console.WriteLine("Press Enter to quit...");
    Console.ReadLine();

    await endpointInstance.Stop().ConfigureAwait(false);
}

public class OrderBilledHandler:IHandleMessagesOrderBilled>
{
  private static ILog log = LogManager.GetLogger();
  //处理OrderBilled订阅事件
  public Task Handle(OrderBilled message, IMessageHandlerContext context)
  {
      log.Info($"Received OrderBilled,OrderId={message.OrderId} Should we ship now?");
      return Task.CompletedTask;
  }
}

public class OrderPlacedHandler:IHandleMessagesOrderPlaced>
{
   private static ILog log = LogManager.GetLogger();
   //处理OrderPlaced订阅事件
   public Task Handle(OrderPlaced message, IMessageHandlerContext context)
   {
       log.Info($"Received OrderPlaced,OrderId={message.OrderId} Should we ship now?");
       return Task.CompletedTask;
   }
}
    

运行结果

技术图片

技术图片

技术图片

技术图片

总结

      NServiceBus的核心是在端点之间通信,通信的实体需要实现ICommand接口,通信的事件需要实现IEvent事件,NServiceBus会扫描实现这两个接口的类。每个端点之间的关键配置就是EndpointConfiguration。

NServiceBus是.Net平台下的开源的消息服务框架

标签:ali   one   code   start   class   comment   持久   point   arch   

原文地址:https://www.cnblogs.com/Leo_wl/p/12228986.html


评论


亲,登录后才可以留言!