当前位置: 首页 > news >正文

福州最好的网站建设网络公司seo裤子的关键词首页排名有哪些

福州最好的网站建设网络公司,seo裤子的关键词首页排名有哪些,电子商务网站设计岗位主要是,网络规划设计师 第2版 ed2k消息发送 生产者启动 入口 : org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean) 生产者在调用send()方法发送消息之前,需要调用start进行启动, 生产者启动过程中会启动一些服务和线程 启动过程中会启动MQClientInstance, 这个实例是针对一个项…

消息发送

生产者启动

入口 : org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)

生产者在调用send()方法发送消息之前,需要调用start进行启动, 生产者启动过程中会启动一些服务和线程

启动过程中会启动MQClientInstance, 这个实例是针对一个项目的全部生产者消费者, 而不是单个的生产者或消费者

MQClientInstance内部会启动一些服务和定时任务,如netty服务、内部生产者服务等

启动方法最后,则会发送心跳包给broker

生产者启动: org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)

public void start(final boolean startFactory) throws MQClientException {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// 检查配置,主要是生产者组名this.checkConfig();if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {this.defaultMQProducer.changeInstanceNameToPID();}// 创建 MQClientInstance 实例, 消费者启动时也有这一步(对于每个客户端来说, 只有一个客户端实例(一个项目有多个生产者、消费者))this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);// 将当前生产者注册到MQClientInstance中的producerTableboolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;throw new MQClientException();}// 自动创建topic的配置this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());if (startFactory) {/** 启动 MQClientInstance* netty服务、各种定时任务、拉取消息服务、rebalanceService服务*/mQClientFactory.start();}log.info("the producer [{}] start OK);this.serviceState = ServiceState.RUNNING;break;// ...省略default:break;}// 发送心跳信息给所有broker。this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();// 启动扫描 超时请求 的定时任务,this.startScheduledTask();}

客户端实例启动: org.apache.rocketmq.client.impl.factory.MQClientInstance#start

public void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// If not specified,looking address from name serverif (null == this.clientConfig.getNamesrvAddr()) {this.mQClientAPIImpl.fetchNameServerAddr();}// netty服务this.mQClientAPIImpl.start();// 启动各种定时任务this.startScheduledTask();// 拉取消息服务,针对消费者this.pullMessageService.start();// 重平衡服务,针对消费者this.rebalanceService.start();this.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info("the client factory [{}] start OK", this.clientId);this.serviceState = ServiceState.RUNNING;break;case START_FAILED:throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);default:break;}}
}

消息发送流程

入口: org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl

调用任一发送方法后,会一路调用到sendDefaultImpl方法

首先会检查消费者状态和消息的格式是否正确

之后会进入一个循环来发送消息,同步消息的循环次数为3次,即可以重试两次,其余消息只发送一次

在循环中,首先会按照轮询的方法选择一个queue进行发送,若发送出现异常则退出当前循环进入下一次循环(若开启故障延迟还会更新broker的故障表,设置隔离时间,隔离时间根据 MQFaultStrategy类中的latencyMax和notAvailableDuration数组进行判断,如其中超时在0.55s - 1s内则隔离30s)

在重新获取queue时,若开启故障延迟,在选择时则会选择【不在故障列表中,或者在故障列表但是时间已经过了其下一次可用的时间点的可用broker】,以实现高可用。若未开启故障延迟,则会传入上一次选择的broker,在这次选择时避开,选择方式也是轮询。

private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// 检查生产者状态this.makeSureStateOK();/*** 检查消息格式是否合法:* 1. msg是否为null* 2. topic 是否为空、长度是否大于127、字符串是否有非法字符、是否是系统topic(比如延时topic)* 3. 消息体 是否为空、大小是否大于4MB*/Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID = random.nextLong();long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;long endTimestamp = beginTimestampFirst;// 获取topic路由信息(存在哪些broker上), 首先获取本地缓存的,若没有则获取nameServer的TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {boolean callTimeout = false;MessageQueue mq = null;Exception exception = null;SendResult sendResult = null;// 计算最大发送次数,同步模式为3,即默认允许重试2次,可更改重试次数// 其他模式为1,即不允许重试,不可更改。int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];for (; times < timesTotal; times++) {/*** 如果mq为空则说明第一次进入,则不存在lastBrokerName* 否则,说明为循环进入,则上一次发送失败,则获取上一次失败的brokerName*/String lastBrokerName = null == mq ? null : mq.getBrokerName();/*** 选择一个queue** selectOneMessageQueue方法内,可选故障转移为开启, 需要sendLatencyFaultEnable设置为true* 开启:*      对于请求响应较慢的broker,可以在一段时间内将其状态置为不可用(下方catch中有调用的updateFaultItem方法)*      消息队列选择时,会过滤掉mq认为不可用的broker,以此来避免不断向宕机的broker发送消息*      选取一个延迟较短的broker,实现消息发送高可用。* 不开启:*     则传入lastBrokerName,即不会再次选择上次发送失败的broker**/MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected != null) {mq = mqSelected;brokersSent[times] = mq.getBrokerName();try {beginTimestampPrev = System.currentTimeMillis();if (times > 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) {callTimeout = true;break;}// 发送消息sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();// 这里调用并传入false,是为了在发送时间超过550ms时,把broker置为故障,// 隔离时间根据 MQFaultStrategy类中的latencyMax和notAvailableDuration数组进行判断,如其中超时在0.55s - 1s内则隔离30sthis.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC:if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}} catch (RemotingException e) {endTimestamp = System.currentTimeMillis();// 异常传入为true,表示隔离时间采用默认的30sthis.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;}// ... 省略代码

选择queue : org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {// 判断是否启用故障延迟机制,默认不启用if (this.sendLatencyFaultEnable) {try {int index = tpInfo.getSendWhichQueue().incrementAndGet();for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();if (pos < 0)pos = 0;// 轮询获取到一个MessageQueue mq = tpInfo.getMessageQueueList().get(pos);// 如果该broker不在故障列表中,或者在故障列表但是时间已经过了其下一次可用的时间点,则为可用,直接返回if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))return mq;}// 到这里说明全部不正常// 没有选出无故障的mq,那么从故障集合中随机选择一个final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();// 如果写队列数大于0,那么选择该brokerint writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) {final MessageQueue mq = tpInfo.selectOneMessageQueue();if (notBestBroker != null) {mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);}return mq;} else {// 如果写队列数小于0,那么移除该brokerlatencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error("Error occurred when selecting message queue", e);}// 上面都没有返回,则采用轮询的方式选择return tpInfo.selectOneMessageQueue();}// 默认不启用return tpInfo.selectOneMessageQueue(lastBrokerName);
}

更新延时表: org.apache.rocketmq.client.latency.MQFaultStrategy#updateFaultItem

判断延时时间: org.apache.rocketmq.client.latency.MQFaultStrategy#computeNotAvailableDuration

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {if (this.sendLatencyFaultEnable) {// 若isolation为true则默认延时30s,否则调用方法根据超时时间来获取延时时间long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);//更新故障记录表this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);}
}private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30 * 1000L, 60 * 1000L, 120 * 1000L/* 2min */, 180000L/* 3min */, 600000L/* 10min */};private long computeNotAvailableDuration(final long currentLatency) {/*** 根据latencyMax和notAvailableDuration的下标一一对应,若超时时间大于等于notAvailableDuration,则延时latencyMax对应下标的时间* 小于0.55s : 0s* [0.55s,1s) : 30s* [1s,2s)    : 60s* ....省略*/for (int i = latencyMax.length - 1; i >= 0; i--) {if (currentLatency >= latencyMax[i])return this.notAvailableDuration[i];}return 0;
}
http://www.wangmingla.cn/news/131366.html

相关文章:

  • 最好的编程培训机构北京seo人员
  • 萧山网站制作公司上海专业seo
  • 从化专业做网站搜索关键词查询
  • 电子商务网站建设的安全性深圳网站公司排名
  • 专做定制的网站如何开发软件app
  • 香港网站建设展览谷歌推广哪家公司好
  • 中华人民共和国建设网站线上营销策划案例
  • 经典的jq查询网站搜索引擎营销的作用
  • 小型电子商务网站规划网页设计免费模板
  • 做网站属于广告公司吗站长工具的使用seo综合查询运营
  • 网站建立的连接不安全腾讯3大外包公司
  • 医院建设网站友链购买有效果吗
  • 哪个网站找做软件下载seo课程在哪培训好
  • 做政府网站哪家公司好今日新闻头条官网
  • wordpress 用ip访问河北seo基础
  • 郑州网站开发公司seo的基本步骤顺序正确的是
  • 零食天堂 专做零食推荐的网站广州品牌营销策划公司排名
  • 甘德网站建设上海网站排名优化
  • 安徽网站建设费用网站建设详细方案模板
  • 做移动网站优化排名站外引流推广渠道
  • 网站制作计算机网站设计需要什么
  • 东莞做公司网站seo关键词排名优化怎样收费
  • 大朗镇做网站网络广告有哪些形式
  • 黄骅贴吧在线安卓优化大师官方版
  • asp 网站seo服务外包
  • 网站闪图怎么做的百度地图在线使用
  • 广州公关公司排行榜关键词优化的方法有哪些
  • 淘宝客网站建设平台企业网站制作步骤
  • 公司电商网站开发合同数据分析系统
  • 如何建立网站做微商免费制作自己的网页