网站策划书的主题有哪些网站推广途径
系列文章目录
第一章 Java线程池技术应用
 第二章 CountDownLatch和Semaphone的应用
 第三章 Spring Cloud 简介
 第四章 Spring Cloud Netflix 之 Eureka
 第五章 Spring Cloud Netflix 之 Ribbon
 第六章 Spring Cloud 之 OpenFeign
 第七章 Spring Cloud 之 GateWay
 第八章 Spring Cloud Netflix 之 Hystrix
 第九章 代码管理gitlab 使用
 第十章 SpringCloud Alibaba 之 Nacos discovery
 第十一章 SpringCloud Alibaba 之 Nacos Config
 第十二章 Spring Cloud Alibaba 之 Sentinel
 第十三章 JWT
 第十四章 RabbitMQ应用
 第十五章 RabbitMQ 延迟队列
 第十六章 spring-cloud-stream

  文章目录
 - 系列文章目录
 - @[TOC](文章目录)
 
 - 前言
 - 1、stream设计思想
 - 2、编码常用的注解
 - 3、编码步骤
 - 3.1、添加依赖
 - 3.2、修改配置文件
 - 3.3、生产
 - 3.4、消费
 - 3.5、延迟队列
 - 3.5.1、修改配置文件
 - 3.5.2、生产端
 - 3.5.2、消息确认机制 消费端
 
 
 - 总结
 
 
 
文章目录
- 系列文章目录
 - @[TOC](文章目录)
 
- 前言
 - 1、stream设计思想
 - 2、编码常用的注解
 - 3、编码步骤
 - 3.1、添加依赖
 - 3.2、修改配置文件
 - 3.3、生产
 - 3.4、消费
 - 3.5、延迟队列
 - 3.5.1、修改配置文件
 - 3.5.2、生产端
 - 3.5.2、消息确认机制 消费端
 
- 总结
 
前言
https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit
 官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。应用程序通过inputs或者outputs来与Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。
SpringCloud stream通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
Stream让我们不再关注具体MQ的细节我们只需要用一种适配绑定的方式,自动的给我们在各种MQ内切换,总的来说Stream能够屏蔽底层消息中间件的差异、降低切换成本,是统一消息的编程模型。
1、stream设计思想

 
- Binder:很方便的连接中间件,屏蔽差异
 - Channel:通道是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
 - Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。
 
2、编码常用的注解

| 组成 | 说明 | 
|---|---|
| Middleware | 中间件,目前只支持RabbitMQ和Kafka | 
| Binder | Binder是应用与消息中间件之间的封装,目前实现了Kafka和RabbitMQ的Binder,通过BInder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现。 | 
| @Input | 注解标识输入通道,通过该输入通道接收到的消息进入应用程序 | 
| @Output | 注解标识输出通道,发布的消息将通过该通道离开应用程序 | 
| @StreamListener | 监听队列,用于消费者的队列的消息接收 | 
| @EnableBinding | 指信道channel和exchange绑定在一起 | 
3、编码步骤
3.1、添加依赖
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
 
3.2、修改配置文件
server:port: 8088spring:cloud:stream:binders: #需要绑定的rabbitmq的服务信息defaultRabbit:  #定义的名称,用于bidding整合type: rabbit  #消息组件类型environment:  #配置rabbimq连接环境spring:rabbitmq:host: localhost   #rabbitmq 服务器的地址port: 5672           #rabbitmq 服务器端口username: tiger       #rabbitmq 用户名password: tiger       #rabbitmq 密码virtual-host: tiger_vh  #虚拟路径bindings:        #服务的整合处理saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道destination: exchange-saveOrder     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。content-type: application/json      #设置消息的类型,本次为jsondefault-binder: defaultRabbitgroup: saveOrderGroup               #分组saveOrderInput: #生产者绑定,这个是消息通道的名称---> 保存订单输入通道destination: exchange-saveOrder     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。content-type: application/json      #设置消息的类型,本次为jsondefault-binder: defaultRabbitgroup: saveOrderGroup               #分组
 
3.3、生产
/*** 订单消息输出通道处理器*/
@Component
public interface OrderOutputChannelProcesor {@Output("saveOrderOutput")MessageChannel saveOrderOutput();
} 
@Slf4j
@EnableBinding(OrderOutputChannelProcesor.class)
public class OrderMessageProducer {@Autowired@Output("saveOrderOutput")private MessageChannel messageChannel;public void sentMsg(UserInfo userInfo){messageChannel.send(MessageBuilder.withPayload(userInfo).build());log.info("消息发送成功:" + userInfo);}
} 
3.4、消费
/*** 订单消息输入通道处理器*/
@Component
public interface OrderInputChannelProcesor {@Input("saveOrderInput")SubscribableChannel saveOrderInput();
}
 
@Slf4j
@EnableBinding(OrderInputChannelProcesor.class)
public class OrderMessageConsumer {@StreamListener("saveOrderInput")public void receiveMsg(Message<UserInfo> userInfoMessage){log.info("接收消息成功:" + userInfoMessage.getPayload());}
}
 
3.5、延迟队列
安装延迟队列插件:
 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.11.1/rabbitmq_delayed_message_exchange-3.11.1.ez
 下载解压,到plugins目录,执行以下的命令:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
3.5.1、修改配置文件
server:port: 8088spring:cloud:stream:binders: #需要绑定的rabbitmq的服务信息defaultRabbit:  #定义的名称,用于bidding整合type: rabbit  #消息组件类型environment:  #配置rabbimq连接环境spring:rabbitmq:host: localhost   #rabbitmq 服务器的地址port: 5672           #rabbitmq 服务器端口username: tiger       #rabbitmq 用户名password: tiger       #rabbitmq 密码virtual-host: tiger_vh  #虚拟路径bindings:        #服务的整合处理saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道destination: exchange-saveOrder-delay     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。content-type: application/json      #设置消息的类型,本次为jsondefault-binder: defaultRabbitgroup: saveOrderGroup               #分组saveOrderInput: #生产者绑定,这个是消息通道的名称---> 保存订单输入通道destination: exchange-saveOrder-delay     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。content-type: application/json      #设置消息的类型,本次为jsondefault-binder: defaultRabbitgroup: saveOrderGroup               #分组rabbit:bindings: #服务的整合处理saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道producer:delayed-exchange: truesaveOrderInput:consumer:delayed-exchange: true
 
3.5.2、生产端
@Slf4j
@EnableBinding(OrderOutputChannelProcesor.class)
public class OrderMessageProducer {@Autowired@Output("saveOrderOutput")private MessageChannel messageChannel;public void sentMsg(UserInfo userInfo){messageChannel.send(MessageBuilder.withPayload(userInfo).setHeader("x-delay", 5000).build());log.info("消息发送成功:" + userInfo);}
}
 
3.5.2、消息确认机制 消费端
rabbit:bindings: #服务的整合处理saveOrderInput:consumer:acknowledge-mode: MANUAL #手动确认
 
@StreamListener("saveOrderInput")
public void receiveMsg(Message<UserInfo> userInfoMessage){log.info("接收消息成功:" + userInfoMessage.getPayload());Channel channel = (Channel) userInfoMessage.getHeaders().get(AmqpHeaders.CHANNEL);Long delieverTag = (Long) userInfoMessage.getHeaders().get(AmqpHeaders.DELIVERY_TAG);/** deliveryTag:Channel的消息投递的唯一标识符。* multiple:是否否定应答多条消息。如果设置为true,则否定应答带指定deliveryTag的消息及该deliveryTag之前的多条消息;* 如果设置为false,则仅否定应答带指定deliveryTag的单条消息。* requeue:被否定应答的消息是否重入队列。如果设置为true,则消息重入队列;* 如果设置为false,则消息被丢弃或发送到死信Exchange。*/try {channel.basicAck(delieverTag,true);} catch (IOException e) {e.printStackTrace();}
}
 
定义交换机类型为direct
rabbit:bindings: #服务的整合处理saveOrderInput:consumer:bindingRoutingKey: orderRoutingKeybindQueue: trueexchangeType: directsaveOrderOutput:producer:routingKeyExpression: orderRoutingKeyexchangeType: direct
 
总结
spring-cloud-stream目前支持RabbitMQ和Kafka,与spring-cloud无缝集成,非常方便。
