当前位置: 首页 > news >正文

专业做logo的网站seo优化知识

专业做logo的网站,seo优化知识,官方网站如何做,wordpress判断函数系列文章目录 文章目录 系列文章目录前言MultiKafkaStarter [V2.2]一、功能特性二、快速开始(生产端)三、快速开始(消费端)四、其它特性五、变更记录六、参考文章 前言 在分布式服务的架构演进中,消息队列作为核心组件…

系列文章目录


文章目录

  • 系列文章目录
  • 前言
  • MultiKafkaStarter [V2.2]
    • 一、功能特性
    • 二、快速开始(生产端)
    • 三、快速开始(消费端)
    • 四、其它特性
    • 五、变更记录
    • 六、参考文章


前言

在分布式服务的架构演进中,消息队列作为核心组件之一,承载着解耦、异步、削峰填谷等关键职责。Apache Kafka 作为业界广泛使用的分布式流处理平台,因其高吞吐、低延迟的特性被大量应用在各类大数据场景中。然而,随着业务的复杂度不断提升,如何在 SpringBoot 中高效地整合并管理多个 Kafka 数据源,成为了一个值得探讨的问题。

在过去的一段时间里,我们通过系列文章详细阐述了如何在 SpringBoot 中以零代码或极低的代码侵入方式,实现多 Kafka 数据源的整合。从基础的配置到高级特性如 protobuf 支持、Aware 模式以及亿级消息生产者的优化,我们希望通过这些内容帮助开发者更加便捷地应对复杂的业务场景。

今天,我们将这些内容凝练成一个全新的 SpringBoot 插件——MultiKafkaStarter,旨在进一步降低开发者整合多 Kafka 数据源的门槛,提升系统的可维护性和扩展性。

核心特点

  • 无代码侵入:通过 SpringBoot 的自动配置机制,无需修改业务代码即可实现多 Kafka 数据源的整合。
  • 灵活配置:支持动态配置多个 Kafka 数据源,包括 bootstrap servers、group id、security protocol 等关键参数。
  • 全面特性支持:不仅支持基础的消息消费和生产功能,还提供了对 protobuf 序列化/反序列化的支持,以及对 Aware 模式的适配。
  • 亿级消息处理:针对高并发场景,提供了包括批量发送、分区策略优化等在内的多项性能优化措施,确保系统能够稳定处理亿级消息量。
  • 易用性与可维护性:插件采用模块化的设计思想,易于集成和升级,同时提供了丰富的文档和社区支持

国籍惯例,先上源码:Github源码

MultiKafkaStarter [V2.2]

SpringBoot 零代码方式整合多个kafka数据源,支持任意kafka集群,已封装为一个小模块,集成所有kafka配置,让注意力重新回归业务本身。

一、功能特性

  • SpringBoot无编程方式整合多个kafka数据源
  • 支持批量消费kafka并对单批次消息根据条件去重
  • 支持消费kafka消息类型为pb格式
  • 支持任意数量生产者

1、引入最新依赖包,如果找不到依赖包,请到工程目录mvn clean package install执行一下命令。

<dependency><groupId>io.github.vipjoey</groupId><artifactId>multi-kafka-starter</artifactId><version>2.2</version>
</dependency>

二、快速开始(生产端)

2、添加kafka地址等相关配置。


## json消息生产者
spring.kafka.four.enabled=true
spring.kafka.four.producer.count=1 ## 生产者数量,默认为1个
spring.kafka.four.producer.name=fourKafkaSender  ## 设置bean的名称,方便后续引用。如果没有设置,默认值为xxxKafkaSender
spring.kafka.four.producer.bootstrap-servers=${spring.embedded.kafka.brokers} ## 必须设置
spring.kafka.four.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.four.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer## pb 消息生产者
spring.kafka.five.enabled=true
spring.kafka.five.producer.name=fiveKafkaSender
spring.kafka.five.producer.bootstrap-servers=${spring.embedded.kafka.brokers}
spring.kafka.five.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.five.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer

3、根据名称注入生产者MmcKafkaMultiSender,就可以发送kafka消息。

@Resource(name = "fourKafkaSender")private MmcKafkaMultiSender mmcKafkaMultiSender;@Resource(name = "fiveKafkaSender")private MmcKafkaMultiSender mmcKafkaMultiSender;@Resourceprivate MmcKafkaOutputContainer mmcKafkaOutputContainer;// 方式一void produceMessage() {for (int i = 0; i < 10; i++) {DemoAwareMsg msg = new DemoAwareMsg();msg.setRoutekey("routekey" + i);msg.setName("name" + i);msg.setTimestamp(System.currentTimeMillis());String json = JsonUtil.toJsonStr(msg);mmcKafkaMultiSender.sendStringMessage(topicOne, "aaa", json);}}// 方式二void produceMessage() {MmcKafkaSender sender = mmcKafkaOutputContainer.getOutputs().get("xxxKafkaSender");sender.sendStringMessage(topic, sku.getRoutekey(), message);}

三、快速开始(消费端)

2、添加kafka地址等相关配置。

## topic1的kafka配置
spring.kafka.one.enabled=true
spring.kafka.one.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.one.topic=mmc-topic-one
spring.kafka.one.group-id=group-consumer-one
spring.kafka.one.processor=你的处理类bean名称(例如:oneProcessor)
spring.kafka.one.dupicate=true   ## 如果为true表示对批次内的kafka消息去重,需要实现MmcKafkaMsg接口,默认为false
spring.kafka.one.consumer.auto-offset-reset=latest
spring.kafka.one.consumer.max-poll-records=10
spring.kafka.one.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.one.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer## topic2的kafka配置
spring.kafka.two.enabled=true
spring.kafka.two.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.two.topic=mmc-topic-two
spring.kafka.two.group-id=group-consumer-two
spring.kafka.two.processor=你的处理类bean名称
spring.kafka.two.consumer.auto-offset-reset=latest
spring.kafka.two.consumer.max-poll-records=10
spring.kafka.two.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.two.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer## protobuf类型的消息的kafka配置
spring.kafka.pb.enabled=true
spring.kafka.pb.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.pb.topic=mmc-topic-pb
spring.kafka.pb.group-id=group-consumer-pb
spring.kafka.pb.processor=pbProcessor
spring.kafka.pb.consumer.auto-offset-reset=latest
spring.kafka.pb.consumer.max-poll-records=10
spring.kafka.pb.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.pb.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer

3、新建kafka消息对应的实体类,可以选择实现MmcMsgDistinctAware接口,例如

@Data
class DemoMsg implements MmcMsgDistinctAware {private String routekey;private String name;private Long timestamp;}如果你配置了spring.kafka.xxx.duplicate=fale,则不需要实现MmcMsgDistinctAware接口。PS:如果实现MmcMsgDistinctAware接口,就自动具备了消息去重能力

4、新建kafka消息处理类,要求继承MmcKafkaKafkaAbastrctProcessor,然后就可以愉快地编写你的业务逻辑了。

@Slf4j
@Service("oneProcessor") // 你的处理类bean名称,如果没有定义bean名称,那么默认就是首字母缩写的类名称
public class OneProcessor extends MmcKafkaKafkaAbastrctProcessor<DemoMsg> {@Overrideprotected void dealMessage(List<DemoMsg> datas) {// 下面开始编写你的业务代码}}@Slf4j
@Service("pbProcessor")
public class PbProcessor extends MmcKafkaKafkaAbastrctProcessor<DemoMsg> {@Overrideprotected Stream<DemoMsg> doParseProtobuf(byte[] record) {try {DemoPb.PbMsg msg = DemoPb.PbMsg.parseFrom(record);DemoMsg demo = new DemoMsg();BeanUtils.copyProperties(msg, demo);return Stream.of(demo);} catch (InvalidProtocolBufferException e) {log.error("parssPbError", e);return Stream.empty();}}@Overrideprotected void dealMessage(List<DemoMsg> datas) {System.out.println("PBdatas: " + datas);}
}

四、其它特性

1、支持单次拉取kafka的batch消息里去重,需要实现MmcMsgDistinctAware的getRoutekey()和getTimestamp()方法;如果为false,则不要实现MmcMsgDistinctAware接口。

spring.kafka.xxx.duplicate=true

2、支持字符串kafka消息,json是驼峰或者下划线

# 默认为支持驼峰的kafka消息,为ture则支持下划线的消息
spring.kafka.xxx.snakeCase=false

3、支持pb的kafka消息,需要自行重写父类的doParseProtobuf方法;

    @Overrideprotected Stream<DemoMsg> doParseProtobuf(byte[] record) {try {DemoMsg msg = new DemoMsg();DemoPb.PbMsg pb = DemoPb.PbMsg.parseFrom(record);BeanUtils.copyProperties(pb, msg);return Stream.of(msg);} catch (InvalidProtocolBufferException e) {log.error("doParseProtobuf error: {}", e.getMessage());return Stream.empty();}}

4、支持获取kafka的topic、offset属性,注入到实体类中,需要实现MmcMsgKafkaAware接口

@Data
class DemoAwareMsg implements MmcKafkaAware {private String routekey;private String name;private Long timestamp;private String topic;private long offset;}

五、变更记录

  • 20240623 v2.2 支持Kafka生产者,并对MultiKafkaConsumerStarter项目重命名为MultiKafkaStarter
  • 20240602 v2.1 支持获取kafka消息中topic、offset属性
  • 20240602 v2.0 支持protobuf、json格式
  • 20240430 v1.1 取消限定符
  • 20231111 v1.0 初始化

六、参考文章

  • 《搭建大型分布式服务(三十六)SpringBoot 零代码方式整合多个kafka数据源》
  • 《搭建大型分布式服务(三十七)SpringBoot 整合多个kafka数据源-取消限定符》
  • 《搭建大型分布式服务(三十八)SpringBoot 整合多个kafka数据源-支持protobuf》
  • 《搭建大型分布式服务(三十九)SpringBoot 整合多个kafka数据源-支持Aware模式》
  • 《搭建大型分布式服务(四十)SpringBoot 整合多个kafka数据源-支持生产者》
  • 《搭建大型分布式服务(四十一)SpringBoot 整合多个kafka数据源-支持亿级消息生产者》
  • 《搭建大型分布式服务(四十二)SpringBoot 无代码侵入实现多Kafka数据源整合插件发布》
  • 《搭建大型分布式服务(四十三)SpringBoot 多Kafka数据源发布到Maven中央仓库:让世界看到你的作品!》

加我加群一起交流学习!更多干货下载、项目源码和大厂内推等着你

http://www.wangmingla.cn/news/160171.html

相关文章:

  • 信誉好的合肥网站建设泉州关键词优化报价
  • 与魔鬼做交易的真实网站内蒙古网站seo
  • vs网站开发实例2022国内外重大新闻事件10条
  • 免费拒绝收费网站2023年7月疫情还会严重吗
  • 做购物网站今日头条国际军事新闻
  • 焦作做网站优化品牌推广的概念
  • 网站建设咨询什么太原网站开发
  • 菏泽企业做网站中文搜索引擎排行榜
  • 做图模板下载网站新闻头条国内大事
  • 南充网站建设百度关键词排名靠前
  • 郑州淘宝网站建设百度关键词挖掘
  • 哈尔滨网站建设1元钱一键制作网站
  • 上海防伪网站建设seo博客大全
  • 网站备案 营业执照网络营销是学什么的
  • wordpress站点错误日结app推广联盟
  • vs做网站怎么添加子页一年的百度指数
  • 不用代码的网站建设防城港网站seo
  • 怎么看网站是什么时候做的怎么给网站做优化
  • 外贸网站建设河南做网站优化
  • 网站标题优化百度免费优化
  • php做电子商务网站的种类嘉兴网站建设
  • 企业网站的技术维护内容主要包括百度营稍
  • 大连做网站多少钱网络电商推广方案
  • 网站与系统开发北京百度推广官网首页
  • 做网站推销自己的产品这可行吗下拉关键词排名
  • 电子商务网站建设结业论文石家庄seo排名外包
  • 浙江省建设执业注册中心网站seo整站优化新站快速排名
  • 正能量不良网站推荐2020长沙建设网站制作
  • 怎么建立一个公司网站杭州百度快照优化排名推广
  • 企业官方网站建设规划重庆专业seo