织梦更新网站地图迅雷磁力链bt磁力天堂
skynet源码学习-skynet_mq队列
- 核心数据结构分析
 - 核心接口详解
 - 设计优势分析
 - 工作流程详解
 - 消息传递全流程
 - 队列扩容流程
 
核心数据结构分析
- 消息结构 (skynet_message)
 
struct skynet_message {uint32_t source;  // 消息来源服务句柄int session;      // 会话ID(用于RPC)void * data;      // 消息数据指针size_t sz;        // 消息大小(含类型信息)
};
 
- 消息类型编码:通过sz的高8位存储
 
#define MESSAGE_TYPE_MASK (SIZE_MAX >> 8)
#define MESSAGE_TYPE_SHIFT ((sizeof(size_t)-1) * 8)
 
- 服务消息队列 (message_queue)
 
struct message_queue {struct spinlock lock;        // 自旋锁uint32_t handle;             // 所属服务句柄int cap;                     // 队列容量int head;                    // 队列头指针int tail;                    // 队列尾指针int release;                 // 释放标记int in_global;               // 是否在全局队列中int overload;                // 过载计数int overload_threshold;      // 过载阈值(动态调整)struct skynet_message *queue; // 环形缓冲区struct message_queue *next;   // 全局队列链表指针
};
 
- 全局队列 (global_queue)
 
struct global_queue {struct message_queue *head;  // 队列头struct message_queue *tail;  // 队列尾struct spinlock lock;        // 自旋锁
};
 
- 全局单例:static struct global_queue *Q = NULL;
 
核心接口详解
- void skynet_mq_init()
 
void 
skynet_mq_init() {struct global_queue *q = skynet_malloc(sizeof(*q));memset(q,0,sizeof(*q));SPIN_INIT(q);Q=q;
}
 
- 功能:初始化全局消息队列系统
 - 流程: 
- 分配全局队列内存
 - 初始化头尾指针为NULL
 - 初始化自旋锁
 
 - 调用时机:Skynet启动时
 
- struct message_queue * skynet_mq_create(uint32_t handle)
 
struct message_queue * 
skynet_mq_create(uint32_t handle) {struct message_queue *q = skynet_malloc(sizeof(*q));q->handle = handle;q->cap = DEFAULT_QUEUE_SIZE;q->head = 0;q->tail = 0;SPIN_INIT(q)// When the queue is create (always between service create and service init) ,// set in_global flag to avoid push it to global queue .// If the service init success, skynet_context_new will call skynet_mq_push to push it to global queue.q->in_global = MQ_IN_GLOBAL;q->release = 0;q->overload = 0;q->overload_threshold = MQ_OVERLOAD;q->queue = skynet_malloc(sizeof(struct skynet_message) * q->cap);q->next = NULL;return q;
}
 
-  
功能:创建服务私有消息队列
 -  
参数:handle - 服务句柄
 -  
流程:

 -  
初始值:
- 容量:64
 - in_global=1(初始状态)
 - overload_threshold=1024
 
 
- void skynet_mq_push(struct message_queue *q, struct skynet_message *message)
 
void 
skynet_mq_push(struct message_queue *q, struct skynet_message *message) {assert(message);SPIN_LOCK(q)q->queue[q->tail] = *message;if (++ q->tail >= q->cap) {q->tail = 0;}if (q->head == q->tail) {expand_queue(q);}if (q->in_global == 0) {q->in_global = MQ_IN_GLOBAL;skynet_globalmq_push(q);}SPIN_UNLOCK(q)
}
 
- 功能:向服务队列推送消息
 - 核心逻辑:
 
SPIN_LOCK(q)
1. 消息存入环形缓冲区尾部
2. 如果缓冲区满则扩容(expand_queue)
3. 若队列不在全局队列中:q->in_global = MQ_IN_GLOBAL调用 skynet_globalmq_push(q)
SPIN_UNLOCK(q)
 
- int skynet_mq_pop(struct message_queue *q, struct skynet_message *message)
 
int
skynet_mq_pop(struct message_queue *q, struct skynet_message *message) {int ret = 1;SPIN_LOCK(q)if (q->head != q->tail) {*message = q->queue[q->head++];ret = 0;int head = q->head;int tail = q->tail;int cap = q->cap;if (head >= cap) {q->head = head = 0;}int length = tail - head;if (length < 0) {length += cap;}while (length > q->overload_threshold) {q->overload = length;q->overload_threshold *= 2;}} else {// reset overload_threshold when queue is emptyq->overload_threshold = MQ_OVERLOAD;}if (ret) {q->in_global = 0;}SPIN_UNLOCK(q)return ret;
}
 
- 功能:从服务队列取出消息
 - 返回值:0成功,1无消息
 - 核心逻辑:
 
SPIN_LOCK(q)
if (队列非空) {1. 复制头部消息到*message2. 移动头指针3. 动态调整过载阈值:当队列长度 > overload_threshold 时:overload = 当前长度overload_threshold *= 2
} else {重置overload_threshold = MQ_OVERLOAD标记q->in_global = 0(移出全局队列)
}
SPIN_UNLOCK(q)
 
- void skynet_globalmq_push(struct message_queue *queue)
 
void 
skynet_globalmq_push(struct message_queue * queue) {struct global_queue *q= Q;SPIN_LOCK(q)assert(queue->next == NULL);if(q->tail) {q->tail->next = queue;q->tail = queue;} else {q->head = q->tail = queue;}SPIN_UNLOCK(q)
}
 
-  
功能:将服务队列加入全局队列
 -  
流程:

 -  
设计要点:全局队列是服务队列的链表
 
- struct message_queue * skynet_globalmq_pop()
 
struct message_queue * 
skynet_globalmq_pop() {struct global_queue *q = Q;SPIN_LOCK(q)struct message_queue *mq = q->head;if(mq) {q->head = mq->next;if(q->head == NULL) {assert(mq == q->tail);q->tail = NULL;}mq->next = NULL;}SPIN_UNLOCK(q)return mq;
}
 
- 功能:从全局队列获取一个待处理的服务队列
 - 流程:
 
SPIN_LOCK(Q)
1. 从链表头部取出一个服务队列
2. 调整链表头指针
SPIN_UNLOCK(Q)
返回服务队列指针
 
- void skynet_mq_mark_release(struct message_queue *q)
 
void 
skynet_mq_mark_release(struct message_queue *q) {SPIN_LOCK(q)assert(q->release == 0);q->release = 1;if (q->in_global != MQ_IN_GLOBAL) {skynet_globalmq_push(q);}SPIN_UNLOCK(q)
}
 
- 功能:标记服务队列待释放
 - 流程:
 
SPIN_LOCK(q)
1. 设置 q->release = 1
2. 若不在全局队列中,加入全局队列
SPIN_UNLOCK(q)
 
- 目的:确保待释放队列能被正确处理
 
- void skynet_mq_release(struct message_queue *q, message_drop drop_func, void *ud)
 
void 
skynet_mq_release(struct message_queue *q, message_drop drop_func, void *ud) {SPIN_LOCK(q)if (q->release) {SPIN_UNLOCK(q)_drop_queue(q, drop_func, ud);} else {skynet_globalmq_push(q);SPIN_UNLOCK(q)}
}
 
- 功能:释放服务队列资源
 - 参数: 
- drop_func:消息丢弃回调函数
 - ud:用户数据
 
 - 流程:

 
- 辅助接口
 
- uint32_t skynet_mq_handle():获取队列所属服务句柄
 
uint32_t 
skynet_mq_handle(struct message_queue *q) {return q->handle;
}
 
- int skynet_mq_length():计算当前队列消息数
 
int
skynet_mq_length(struct message_queue *q) {int head, tail,cap;SPIN_LOCK(q)head = q->head;tail = q->tail;cap = q->cap;SPIN_UNLOCK(q)if (head <= tail) {return tail - head;}return tail + cap - head;
}
 
- int skynet_mq_overload():获取并重置过载计数
 
int
skynet_mq_overload(struct message_queue *q) {if (q->overload) {int overload = q->overload;q->overload = 0;return overload;} return 0;
}
 
设计优势分析
- 三级队列结构

 
- 优势: 
- 全局队列:O(1)获取待处理服务
 - 服务队列:隔离不同服务的消息
 - 避免全局锁竞争
 
 
- 环形缓冲区设计
 
- 内存布局:
 
[头] 消息1 | 消息2 | ... | 消息N [尾]
 
- 优势: 
- 避免内存碎片
 - 动态扩容(翻倍策略)
 - 高效头尾指针操作
 
 
- 智能过载处理
 
// 动态阈值调整
while (length > q->overload_threshold) {q->overload = length;q->overload_threshold *= 2; // 指数退避
}
 
- 优势: 
- 避免频繁触发过载检测
 - 指数退避减少检测开销
 - 提供过载监控接口
 
 
- 精细锁控制
 
- 锁策略:
 
| 资源 | 锁类型 | 粒度 | 
|---|---|---|
| 全局队列 | 自旋锁 | 全局 | 
| 服务队列 | 自旋锁 | 单个服务 | 
| 消息处理 | 无锁 | - | 
- 优势: 
- 服务间并行处理
 - 最小化临界区
 
 
- 优雅释放机制 
- 标记阶段:skynet_mq_mark_release
 - 清理阶段:skynet_mq_release
 - 消息回调:通过drop_func处理残留消息
 
 
- 优势: 
- 安全释放资源
 - 避免消息丢失
 - 支持自定义清理逻辑
 
 
工作流程详解
消息传递全流程

队列扩容流程

