CyberRT源码剖析--03通信架构

- 序列化模块:序列化模块负责将用户自定义的数据结构转换为可传输的二进制格式,并支持反向解析。本模块采用嵌入式序列化方案,用户仅需定义C++数据结构体并通过标准化宏调用即可完成序列化操作。该方案支持C++基本数据类型(如int、float)以及STL容器(如vector、map)
- 发布订阅系统:发布-订阅系统是本设计中分布式通信中间件的核心模块之一,负责实现节点间高效、解耦的通信。该系统采用基于消息驱动的架构,由发布者、订阅者和消息代理三个部分组成。发布者负责发布特定主题(Channel)的消息,订阅者则订阅感兴趣的主题,消息代理充当中枢,接收并转发发布者的消息至对应的订阅者。在分布式系统中,节点间的通信可能涉及不同的网络环境,如同一主机内的通信与跨主机的通信。针对这一问题,当节点位于同一主机时发布订阅框架会优先选择共享内存进行通信,这样可以避免网络协议栈的开销,从而实现更低延迟的通信;而当节点位于不同主机时,框架则采用基于FastRTPS网络通信方式。在多层次的网络架构中,通信方式会根据实际情况动态选择,确保在不同环境下都能够提供高效的通信服务。
- 服务发现机制:在点对点通信中,节点需要了解目标节点的信息以建立通信连接。为解决这一问题,设计了动态拓扑发现机制,节点间的连接信息通过有向图进行维护。每当节点加入或退出通信平面时,它会广播自身的信息,其他节点根据这些信息动态更新拓扑结构
- 高性能基础库:为满足分布式通信中间件对基础功能的需求,设计了一个高性能基础库,涵盖了数据结构、线程同步机制以及设计模式等多种基础功能。该库提供了如无锁队列、无锁哈希表、线程池、目标池等关键模块
- 日志库:日志库用于记录中间件在运行过程中不同级别的日志信息,支持输出至文件和控制台。该日志库基于C++的ofstream类实现文件写入,并定义了一组宏:log_debug、log_info、log_warn、log_error、log_fatal,分别对应不同的日志级别,方便用户在代码中输出相应的日志信息,帮助调试和运行时问题定位。
3.1 序列化
在分布式系统与高性能计算领域,序列化作为数据交换和存储的重要环节,其性能直接影响系统的整体效率和数据传输能力。尤其在数据量庞大且通信频繁的场景下,传统序列化方法可能因效率不足而成为系统瓶颈。因此,设计一种高性能序列化机制需在确保数据正确性和完整性的前提下,尽量减少计算资源消耗并提升传输效率。本节详细阐述本文设计的高性能序列化机制,序列化机制主要目标包括:通过优化二进制格式和减少序列化数据量降低网络带宽占用;通过减少内存复制和重复分配优化内存使用效率;通过灵活的接口支持不同数据结构的序列化需求。
该机制的核心组件为DataStream模块,如图所示,此模块负责将数据结构转换为二进制流并完成存储与解析任务。DataStream内部使用基于C++ STL的std::vector<char>容器作为缓冲区(buffer),每个元素占用1字节,支持动态长度调整。序列化过程通过write函数和重载的>>操作符将数据写入缓冲区,反序列化则通过read函数和重载的<<操作符从缓冲区读取数据。利用C++函数重载特性,DataStream能够处理多种数据类型的读写操作。模块中定义了一个pos变量,用于记录缓冲区的当前读写位置:数据写入时,pos根据写入字节数增加;数据读取时,pos根据读取字节数减少。

- 对于基本数据类型(如整型、浮点型、字符型等),框架通过预定义的序列化策略实现紧凑存储。每个数据字段在序列化时以1字节的DataType枚举值开头,用于标识数据类型,后接具体值。例如,bool和char类型各占用2字节(DataType 1字节 + 值1字节),int32和uint32占用5字节(DataType 1字节 + 值4字节),int64和uint64占用9字节(DataType 1字节 + 值8字节),float占用5字节,double占用9字节,enum占用5字节。对于字符串等变长类型,序列化时需额外存储长度字段(以int32编码,占5字节),后接实际字符数据(长度可变),以便反序列化时根据长度准确读取数据。具体字段长度和编码格式如表3.1所示。
表3.1 基本数据类型序列化协议
| 字段类型 | 长度(字节) | 底层编码格式 |
|---|---|---|
| bool | 2 | DataType(1) + Value(1) |
| char | 2 | DataType(1) + Value(1) |
| int32 | 5 | DataType(1) + Value(4) |
| uint32 | 5 | DataType(1) + Value(4) |
| int64 | 9 | DataType(1) + Value(8) |
| uint64 | 9 | DataType(1) + Value(8) |
| float | 5 | DataType(1) + Value(4) |
| double | 9 | DataType(1) + Value(8) |
| enum | 5 | DataType(1) + Value(4) |
| string | 可变长度 | DataType(1) +Length(5) + Value(length) |
- 例如现在有三个数据:int a,bool b,float c需要储存,则根据上述协议进行序列化存储后DataStream模块的buffer内存分布如图3.3所示,总共占据12字节的内存大小。

图3.3 基础数据类型序列化示意图
- 对于C++的STL容器(如std::vector、std::list、std::map、std::set),序列化框架采用递归调用方式处理其元素。以std::vector
为例,序列化时首先写入1字节的DataType字段和5字节的Length字段(表示元素个数,按int32处理),随后对每个元素根据类型T调用对应的序列化方法。若T为基本类型,则按表3.1的策略处理;若T为复合类型,则继续递归分解。对于std::map<k,v>,序列化时写入DataType和Length字段(表示键值对数量),然后按插入顺序依次序列化每个键k和值v。反序列化时,先读取DataType和Length,再根据Length依次解析每个元素或键值对,重建容器结构。各容器类型的序列化格式如表3.2所示。
表3.2 复合数据类型序列化协议
| 字段类型 | 长度(字节) | 底层编码格式 |
|---|---|---|
| std::vector |
可变长度 | DataType(1) + Length(5) + Value(T + T +T + …) |
| std::list |
可变长度 | DataType(1) + Length(5) + Value(T + T +T + …) |
| std::map<k,v> | 可变长度 | DataType(1) + Length(5) + Value((K,V) + (K,V) + …) |
| std::set |
可变长度 | DataType(1) + Length(5) + Value(T + T +T + …) |
- 如图3.4所示现在有一个
std::vector<int>a={1,2}的数据结构需要储存,根据上述协议序列化后的DataStream模块的buffer内存分布如图所示,a中的每一个元素都是int32类型,头部写入了类型和长度信息。

图3.4 复合数据类型序列化示意图
- 针对自定义类,设计了特定的序列化协议。自定义类的序列化格式如表3.3所示。自定义类的序列化结构包括1字节的Type字段(标识自定义类类型)和后续的成员变量数据(长度可变,包含D1 + D2 + D3 + …)。为实现此功能,自定义类需继承Serializable接口类,并使用SERIALIZE宏重载serialize和unserialize函数。Serializable类声明了serialize和unserialize两个纯虚函数,子类通过继承自动获得这两个接口。SERIALIZE宏接受可变参数(即类的成员变量),在serialize函数中首先写入Type字段,随后递归将所有成员变量写入DataStream的缓冲区。反序列化时,unserialize函数读取Type字段确认类型后,从DataStream的缓冲区依次解析成员变量数据。
表3.3 自定义数据结构序列化协议
| 字段类型 | 字段长度(字节) | 底层编码格式 |
|---|---|---|
| 自定义类 | 可变长度 | Type(1) + Value(D1 + D2 + D3 + …) |
3.2 发布订阅系统
将通信流程明确划分为发布方和订阅方两个独立模块,其通信模型如图3.5所示。在该架构中,发布方被定义为Transmitter,负责生成并传输数据;订阅方被定义为Receiver,负责接收并处理数据。发布方与订阅方之间无需建立直接交互,数据传输通过共享的通信主题(在本设计中称为channel)得以实现。具体而言,发布方将数据写入指定的channel,而订阅方通过订阅该channel自动获取相关数据。

如图3.6所示,在发布方,Transmitter将用户定义的消息(Message)序列化为字节流,随后根据所选通信模式将其传输至网络或共享内存。在订阅方,Receiver通过集成的Dispatcher组件监测新数据的到达。Dispatcher将接收到的字节流反序列化为用户定义的消息格式,并将其传递至Receiver的回调函数以供后续处理。为优化资源利用效率,每个进程仅维护单一的Dispatcher实例,供所有Receiver共享。本设计支持两种通信模式:基于共享内存的同主机通信和基于FastRTPS的网络通信。为确保两种模式操作接口的统一性,在发布-订阅层下抽象出一套通用接口,并通过C++的继承与函数重载机制实现。

本文设计了一个中央通信层(Transport),负责创建发布者和订阅者。在发布方,设计了RtpsTransmitter和ShmTransmitter两个类,二者均继承自Transmitter基类。其中,RtpsTransmitter适用于基于FastRTPS的网络通信,而ShmTransmitter适用于基于共享内存的近域通信。在Transmitter类中定义了一系列抽象接口,RtpsTransmitter和ShmTransmitter根据各自通信方式的特点对这些接口进行重载实现。在订阅方,根据底层通信机制的差异,分别设计了RtpsReceiver和ShmReceiver,二者均继承自Receiver基类。与之对应的Dispatcher组件亦分为RtpsDispatcher和ShmDispatcher两种类型,分别适配不同的通信模式。
源码层级:

3.3 通信实体身份标识设计
在上述发布-订阅通信架构中,系统包含多个Transmitter(发布方)和Receiver(订阅方)。这些实体既可能共存于进程节点,也可能分布于不同的进程节点。在这种复杂的分布式环境下,需要对Transmitter和Receiver进行准确且唯一的身份标识。
为此,CyberRT提出了一种基于64位无符号哈希值的身份标识方案。具体实现过程如下:每当创建Transmitter或Receiver实例时,系统利用uuid库生成一个64位唯一标识符。随后,该标识符经过哈希处理,生成一个64位无符号哈希值,命名为Identity,并与对应的Transmitter或Receiver实例关联。
为实现通信实体与其运行环境的有效关联,设计了一种配置结构体,命名为RoleAttributes,用于封装与主机和进程相关的描述信息。通过Identity与RoleAttributes的结合,可对系统中任一主机上的通信实体进行精确标识。
| 名称 | 类型 | 描述 |
|---|---|---|
| Host_name | string | 此字段记录了主机的名称,使进程能够标识其运行的物理或虚拟主机环境。 |
| Host_ip | string | 定义了主机的网络IP地址,为进程间的网络通信提供了必要的地址信息。 |
| Process_id | int32_t | 标识了与通信实体关联的进程ID,确保了进程级别的唯一识别。 |
| Channel_name | string | 记录了用于通信的通道名称,这个名称是通信双方约定的逻辑通道,用于区分不同的通信内容或策略。 |
| Channel_id | uint64_t | 基于通道名称的哈希值生成的通道ID,为每个通信通道提供了一个全局唯一的标识符。 |

基于上述两个核心标识元素,进一步设计了Endpoint类。该类的设计目标在于整合Identity和RoleAttributes,建立一个统一的通信实体身份与属性描述框架。在Endpoint类的结构中,定义了两个关键成员变量:id_和attr_。其中,id_用于存储Identity,以完整记录通信实体的唯一身份标识;attr_用于存储RoleAttributes,确保通信实体运行环境的相关信息得以全面保留。
Transmitter和Receiver作为核心通信实体,均继承自Endpoint类。这种继承设计使得Transmitter和Receiver天然具备Endpoint类定义的身份标识与运行环境描述功能,从而能够在通信过程中高效支持身份识别、环境适配以及通信管理。
3.4 通信顶层Transport设计
Transport类被设计为全局单例类。这一设计特性确保了在系统整个生命周期内,Transport类仅维持单一实例。Transport类对外提供了两个核心接口:CreateTransmitter和CreateReceiver,这两者构成了通信中间件系统中创建发布方和订阅方的关键功能实现。CreateTransmitter接口用于生成发布方(Transmitter)实例,其接收两个必要参数:一是RoleAttributes类型的参数,用于精确标识通信实体的身份及环境属性;二是OptionMode类型的参数,用于指定通信模式。根据OptionMode的取值,接口将创建相应的Transmitter实例。具体而言,当OptionMode取值为RTPS时,将实例化一个RTPSTransmitter对象;当OptionMode取值为SHM时,则生成一个ShmTransmitter对象。CreateReceiver接口则负责创建订阅方(Receiver)实例,其参数设计与CreateTransmitter接口类似,同样包含RoleAttributes和OptionMode。此外,该接口还需额外传入一个回调函数,该函数在Receiver接收到数据时被触发,以执行后续的数据处理任务。在C++11编程环境中,通过std::function对回调函数进行封装,从而实现灵活且类型安全的回调机制。

3.5 发送/接收模块设计
3.5.1 Transmitter
在上述通信架构中,Transmitter类继承自Endpoint类,作为发送端的核心组件,其设计主要聚焦于数据发送、序列化、帧序号管理以及通信模式的选择。

具体包括以下几个方面:
(1)抽象化接口:为确保Transmitter能够灵活支持多种通信模式,本文将Transmitter定义为一个抽象模板基类,所有具体通信实现(如基于共享内存的ShmTransmitter或基于FastRTPS的RTPSTransmitter)均继承自该基类。模板参数指定为用户欲传输的数据结构类型,从而实现通用性与灵活性的结合。
| 方法名称 | 核心职责 |
|---|---|
| Enable() | 激活传输通道 |
| Disable() | 关闭传输通道,释放资源 |
| Transmit() | 执行协议相关的消息序列化与发送,子类必须实现传输策略 |
(2)帧序号管理:在Transmitter内部定义了一个无符号整型变量seq_num_,用于为每条消息分配唯一的帧序号。这一设计旨在确保消息传输的顺序性,有效防止数据丢失或乱序现象的发生。
(3)支持消息附加信息:本文设计了msg_info_数据结构,用于存储消息的附加信息。该结构包含三部分关键字段,并提供了对应的获取与设置接口。每条实际发送的消息均附带一条msg_info_实例。Transmitter的构造函数负责对msg_info_进行初始化。
| 名称 | 类型 | 描述 |
|---|---|---|
| seq_num | uint64_t | 发送数据的帧号 |
| sender_id | Identify | 当前Transmitter的id |
| channel_id | uint64_t | 发送的channel |
3.5.2 Receiver
Receiver类作为接收端的核心组件,同样继承Endpoint类,负责处理从Transmitter接收到的消息并进行相应响应。
为实现消息接收与数据处理的解耦,避免在Receiver内部直接实现复杂的业务逻辑,本文设计了一个统一的C++可调用对象MessageListener。该对象在构造Receiver时作为参数传入,
| 参数名称 | 描述 |
|---|---|
| MessagePtr | 指向接收到的消息对象的智能指针 |
| MessageInfo | 接收到的消息相关的附加信息 |
| Roleattributes | 前接收端角色的属性 |
Receiver的构造函数需接收两个参数:一是RoleAttributes,用于初始化继承自Endpoint类的属性;二是MessageListener,用于初始化Receiver内部维护的回调函数对象msg_listener_。Receiver类定义了三个核心接口
| 方法名称 | 核心职责 |
|---|---|
| Enable() | 激活接收通道,初始化资源 |
| Disable() | 关闭接收通道,释放资源 |
| OnNewMessage(const MessagePtr&,const MessageInfo&) | 数据回调处理,调用 MessageListener 进行处理 |
其中,Enable()和Disable()的实现与Transmitter中类似,需由具体子类根据通信模式重写以确保适配性。OnNewMessage()为Receiver中的数据回调处理方法,每当接收到新消息时,该方法将被触发。它将消息对象(MessagePtr)、消息附加信息(MessageInfo)以及接收端的角色属性(RoleAttributes)传递至msg_listener_,并执行相应的调用操作。

3.6 数据分发器设计
Dispatcher组件的核心功能在于接收Transmitter发送至共享主题的数据,并将其精确分发至对应的Receiver,从而触发相应的回调处理逻辑。为实现这一目标,Dispatcher需具备以下三项关键能力:
- (1)Receiver注册管理:Dispatcher负责管理所有Receiver实例,确保每个Receiver能够正确接收并处理其所订阅的消息。
- (2)消息获取与转发:Dispatcher需从不同共享主题中获取数据,并根据消息内容将其转发至适当的Receiver。
- (3)回调函数触发:在成功获取并转发数据后,Dispatcher需触发与接收数据关联的回调函数,以确保消息得到有效处理。
基于上述需求,在设计Dispatcher时引入了信号-槽机制。该机制是一种广泛应用于事件驱动系统的设计模式,能够有效解耦组件间的依赖关系,同时保障消息传递与处理的顺畅性。其核心概念包括:
- 信号(Signal):表示事件或数据的发送,通常由发送方(如Transmitter)触发。当数据准备就绪或事件发生时,信号被激活。
- 槽(Slot):表示对信号的响应或处理逻辑,通常由接收方(如Receiver)定义,描述消息或事件的处理方式。

Slot类内部维护了一个支持可变模板参数包的可调用对象(Callback),并通过布尔变量标记当前槽的激活状态。由于一个信号可被多个槽观察,Signal类内部维护了一个双向链表(SlotList)以存储相关槽

每个订阅该信号的槽通过Signal类的Connect函数添加至链表。该函数根据传入的Callback创建并激活新槽,将其插入SlotList,并返回一个表示信号与槽连接关系的Connection对象。Connection类内部包含两个关键变量:槽实例及其所观察的信号。当信号触发时,Signal类通过重载的()操作符遍历SlotList,依次执行各槽中的回调函数,从而实现信号至槽的高效传递。这种设计确保了信号与槽之间的松耦合,同时为异步事件处理提供了灵活且高效的机制。
通信模型中,一个channel的数据可被多个Receiver订阅。因此,可将一个channel绑定至一个信号,订阅该channel的Receiver则作为槽,通过信号-槽连接将其回调函数注册至信号。当channel接收到数据时,绑定的信号被触发,通知所有关联槽执行相应Receiver的回调函数。为此,设计了ListenerHandler类以管理channel与Signal的绑定关系。此外,信号与槽连接后生成多个Connection实例,ListenerHandler还负责管理这些连接,

ListenerHandler维护了一个信号对象MessageSignal以及一个存储多个Connection的映射结构ConnectionMap。每个channel绑定一个独立的MessageSignal,用于管理该channel上所有Receiver的回调函数,而ConnectionMap以Receiver的Identity作为索引存储所有连接信息。当Receiver订阅channel时,ListenerHandler通过Connect方法将Receiver的回调函数注册至MessageSignal,并返回Connection记录该绑定关系;当channel数据到达时,MessageSignal调用Run函数依次触发所有订阅者的回调函数。
前文已详细阐述了Dispatcher组件的设计逻辑与信号-槽机制的基本架构。以下将进一步探讨Dispatcher的具体实现过程,。Dispatcher被设计为虚基类,旨在提供通用的消息分发机制。然而,由于底层通信方式的差异,数据获取方法需由Dispatcher子类根据具体需求实现。因此,Dispatcher为不同通信协议预留了定制化的数据获取接口。

为实现channel与信号的绑定,Dispatcher内部设计了一个名为msg_listeners_的映射结构,用于保存channel与MessageSignal的对应关系。每个channel对应一个ListenerHandler实例,而每个ListenerHandler内部维护其ConnectionMap,通过两级索引机制建立Receiver与channel的订阅关系。Dispatcher的AddListener函数负责建立此二级连接:首先根据传入的channel_id在msg_listeners_中进行索引,若无对应ListenerHandler则创建新实例并建立与channel_id的映射关系;若存在则直接获取对应实例。随后,调用ListenerHandler的Connect方法注册传入的回调函数。

通过上述两级索引机制,可精确定义Receiver与channel的对应关系。前文提及,基于底层通信方式的差异,Dispatcher派生出两个子类。这些子类额外实现获取channel数据的具体方法,并在数据到达时依据两级索引依次执行所有关联回调函数。







![Xhyper剖析[6]--中断虚拟化](/2026/01/20/Xhyper%E5%89%96%E6%9E%90-6-%E4%B8%AD%E6%96%AD%E8%99%9A%E6%8B%9F%E5%8C%96/17689244713913.png)
![Xhyper剖析[5]--MMIO虚拟化](/2026/01/20/Xhyper%E5%89%96%E6%9E%90-5-MMIO%E8%99%9A%E6%8B%9F%E5%8C%96/17689243819381.png)
![Xhyper剖析[4]--XhyperCPU虚拟化](/2026/01/20/Xhyper%E5%89%96%E6%9E%90-4-XhyperCPU%E8%99%9A%E6%8B%9F%E5%8C%96/17689241119091.png)
![Xhyper剖析[3]--Xhyper内存虚拟化](/2026/01/20/Xhyper%E5%89%96%E6%9E%90-3-Xhyper%E5%86%85%E5%AD%98%E8%99%9A%E6%8B%9F%E5%8C%96/176892388736321.png)
![Xhyper剖析[2]--Xhyper启动](/2026/01/20/Xhyper%E5%89%96%E6%9E%90-2-Xhyper%E5%90%AF%E5%8A%A8/17689237041079.png)