文章

RTI-DDS

简介

系统基本构成

DDS (Data Distribution Service,数据分发服务 ) 是一种基于数据的通信中间件标准,目的是建立分布式系统的高质量的数据通信。目前广泛应用于航空航天、汽车自动驾驶以及机器人、物联网等领域。 DDS 的本质是一个数据总线,通过在提供一个共享数据空间,实现各个系统之间的通信。

rti-dds1

DDS 使用发布-订阅通信模式来创建去中心化、架构独立、可扩展的异步网络。DDS的标准框架中,系统之间可以通过对一个Topic的发布和订阅进行通信。在Topic上可以指定QoS以保证服务质量。

rti-dds2

DDS 首先定义了通信的基础,也就是一个共享数据空间,有如下构成:全局数据空间(Global Data Space)、域 (Domain)、主题(Topic)。

rti-dds3

Global Data Space

在 DDS 中, DDS 网络被称为全局数据空间(Global Data Space)。

Domain

代表一个通信平面,由Domain ID唯一标识,只有在同一个域内的通信实体才可以通信;如果考虑车内通信,可以只划分1个Domain,也可以按照交互规则或其他规则,定义多个Domain。

Topic

数据的抽象概念,由TopicName标识,关联相应的数据类型(DataType),如果把车内所涉及的所有Topic集合在一起,这样就形成一个虚拟的全局数据空间“Global Data Space”,进一步弱化了节点的概念,所以域参与者已经不是节点的概念了。

DDS标准模型定义了基于数据的操作对象和服务质量参数,包含:发布者(Publisher)、订阅者(Subscriber)、数据读取者(DataReader)、数据写入者(DataWriter)、域参与者(Domain Participant)、服务质量(QoS)。

Domain Participant

代表域内通信的应用程序的本地成员身份,简单来说,就是说明同一数据域内的通信成员。

DataWriter

据写入者,类似缓存,把需要发布的主题数据从应用层写入到DataWriter中。

DataReader

数据读取者,同样可以理解为一种缓存,从订阅者得到主题数据,随之传给应用层。

Publisher

发布者,发布主题数据,至少与1个DataWriter关联,通过调用DataWriter的相关函数将数据发出去。

Subscriber

订阅者,订阅主题数据,至少与1个DataReader关联。当数据到达时,应用程序可能忙于执行其他操作或应用程序只是等待该消息时,这样就会存在两种情况,同步访问和异步通知。

服务质量QoS

除了能够发送和接收数据之外,还可以为Publisher、 Subscriber, 或者DataReaders、 DataWriters 指定描述数据传输质量的服务质量(QoS) .众所周知, DDS 有几个 QoS 策略来帮助确保对数据传输的精确控制.您可以为这些实体分配单独的 QoS 策略,也可以分配一组称为 QoS 配置文件的 QoS 策略。

注意

一个 Publisher 可以有多个DataWriter;

一个Subscriber可以有多个 DataReader ;

DataWriter 只有一个Topic ;

DataReader 只有一个Topic;

一个 Topic 可以有许多 DataReader 和 DataWriter ;

一个Publisher可以有许多关联的Subscriber ;

一个Subscriber可以有许多关联的Publisher 。

DCPS的元模型简化图如下所示:

rti-dds4

DDS在各个系统之间的通信过程如下:

  • Publisher通过DataWriter写入数据到Topic;
  • Subscriber获得通知;
  • Subscriber通过DataReader从Topic读取数据。

为了保证实时性和性能要求,可以在Topic上面定义能够满足性能需求的QoS服务质量参数,这样订阅和通知就可以基于QoS参数确定交互的时机,进而保证实时性。

rti-dds5

服务发现

Discovery是在不同节点上的Connext DDS对象(DomainParticipants、DataWriters、DataReaders)发现彼此的幕后方式。每个DomainParticipant维护一个关于在同一DDS domain中所有活跃的DataWriter和DataReader的信息数据库。这个数据库使得DataWriter和DataReader的交流成为可能。要创建和更新数据库,每个应用程序遵循一个通用的发现过程。

Simple Discovery Protocol

Simple Participant Discovery

该阶段的简单发现协议由Simple Participant Discovery Protocol(SPDP)执行。在参与者发现阶段,域参与者通过发送参与者声明信息(DATA submessages or announcements),来通知同一DDS Domain中的所有其他域参与者。详细信息包括域参与者的唯一标识码(GUID or Globally Unique ID)、传输定位器(addresses and prot numbers)、QoS。这些信息通过best-effort的通信方式定期发送。

​ 当接收到远程参与者发现信息时,Connext DDS将确定本地参与者是否与远程参与者匹配。只有当两者具有相同的Domain ID和Domain Tag,才会发生匹配。

​ 当删除域参与者时,将发送带有域参与者标识的GUID的participant DATA(delete)消息。

​ GUID是对一个实体的唯一引用,它由GUID prefix和Entity ID组成。默认情况下,GUID prefix是从IP地址和进程ID计算而来,Entity ID由Connext DDS设置。

​ 一旦一对参与者发现了对方,他们就可以进入端点发现阶段,来发现彼此的DataWriters和DataReaders。

Simple Endpoint Discovery

该阶段的简单发现协议由Simple Endpoint Dsicovery Protocol(SEDP)执行。在端点发现阶段,Connext DDS匹配DataWriters和DataReaders。GUID、Qos等关于应用程序的DataWriters和DataReaders的信息,通过发送DATA信息中的publication/subsrciption声明进行交换。通常将这些DATA信息称为publication DATAs和subscription DATAs。端点发现阶段使用可靠的通信方式。

当发现远程DataWriter/DataReader时,Connext DDS将确定本地应用程序是否具有相匹配的DataReader/DataWriter。只有当两者具有相同的topic、data type和兼容的QoSPolicies,才会发生匹配。此外,如果域参与者被设置为忽略某些DataWriter/DataReader时,这些实体在匹配过程中就会被忽略。DataWriter和DataReader只有在各自的应用程序都将本地实体与匹配的远程实体连接起来时才能相互通信。

Discovery Implementation

发现是使用内置的DataWriters和DataReaders来实现的。也就是说,它们也是DDSDataWriter/DDSDataReader。对于每个DomainParticipant,自动创建三个内置的DataWriters和DataReaders来用于发现目的。

rti-dds6

对于每个DomainParticipant,有6个自动创建用于发现目的的对象。前两个对象用于发送/接收参与者DATA消息,这些消息在参与者发现阶段用去查找远程域参与者,该阶段使用best-effort通信方式。一旦参与者意识到彼此,他们就会进入端点发现阶段,以了解彼此的DataWriters和DataReaders,该阶段使用reliable通信方式。

Discovery的实现可分为两个独立的协议:SPDP + SEDP = SDP

Participant Discovery

当创建域参与者时,会自动创建DataWriter和DataReader,以交换网络中participant DATA消息。这些DataWriters和DataReaders是特殊的,因为DataWriter可以发送到给定的目的地列表,无论目的地上是否有Connext DDS应用程序,并且DataReaders可以从任何源接收数据,无论该源以前是否已知。也就是说,这些特殊的DataWriters和DataReaders在能够相互交流之前,是不需要发现远程实体并进行匹配的。

当域参与者加入或离开网络时,它需要通知其他对等参与者。通过participant DATA消息通知远程参与者。此外,如果参与者的QoS被修改为需要其他参与者知道发生更改,将立即发送新的participant DATA消息。participant DATA也可用来保持参与者的活跃状态。

如果新的远程参与者在本地参与者的对等方列表中,本地参与者将把远程参与者添加到它的数据库中。一旦一个远程参与者被添加到Connext DDS数据库中,Connext DDS将跟踪该远程参与者的participant_liveliness_lease_duration。如果该参与者的participant DATA消息在participant_liveliness_lease_duration内没有收到至少一次,则远程参与者将认为是过时的,远程参与者及其所有实体将从本地参与者的数据库中删除。为了避免被其他参与者清除,每个参与者都需要定期发送一个participant DATA来刷新其活力。该发送速率由participant_liveliness_assert_period控制。

rti-dds7

在节点A上的域参与者向节点B(它在节点A的对等列表中)发送“participant DATA”,无论节点B上是否有Connext DDS应用程序。

上图中,绿色短虚线是周期性的particitant DATAs。这些消息之间的时间间隔由DiscoveryConfig QoSPolicy中的participant_liveliness_assert_period控制。

除了周期性的particitant DATAs外,蓝色长虚线的“initial repeat messages”从A发送到B。这些消息在min_initial_participant_announcement_period和max_initial_participant_announcement_period之间的一个随机时间发送,发送数量由initial_participant_announcements设置。

rti-dds8

上图中,如果参与者A没有在规定的活跃时间内被刷新,则它将从参与者B的数据库中删除。为了简化,图中省略了从B发往A的“participant DATA”,此外参与者A的“initial repeat messages”也被省略了。

Refresh Mechanism

为了确保后加入的参与者不需要等到远程参与者数据的下一次刷新后才发现远程参与者,存在一种重新发送机制。如果接收到的参与者数据来自之前从未见过的远程参与者,并且在本地参与者的对等列表中,则应用程序将把自己的参与者数据重新发送给所有对等点。这种重新发送可能会进行多次,在两者之间有一个随机的睡眠时间。

rti-dds9

上图中,参与者A在其对等列表中有参与者B,参与者B在其对等列表中没有参与者A,但是DiscoveryQosPolicy.accept_unknown_peers被设置为DDS_BOOLEAN_TRUE。参与者A在B发送其initial announcement后加入系统。在B发现A后,他等待时间A,然后重新发送它的参与者信息。图中initial repeat messages被省略了。

下图中,提供了参与者发现阶段所发送的消息的总体过程,参与者A和B在它们彼此的对等列表中,A先被创建。

rti-dds10

Maintaining DataWriter Libeliness for kinds AUTOMATIC and MANUAL_BY_PARTICIPANT

Connext DDS使用内置的DataWriter和DataReader对,也被称为inter-participant reader和inter-participant writer,来保持DataWriters(被设置为AUTOMATIC or MANUAL_BY_PARTICIPANT)的活力。

当域参与者中有任何DataWriters的Liveliness QosPolicy被设置为AUTOMATIC,inter-participant writer将以这些DataWriters的最短lease_duration为时间间隔,周期可靠的广播AUTOMATIC liveliness消息。如下图中所示:

rti-dds11

当域参与者中有任何DataWriters的Liveliness QosPolicy被设置为MANUAL_BY_PARTICIPANT,Connext DDS将会定期检查它们是否调用了write()、assert_liveliness()、dispose() or unregister()。这个检查的速率是每X秒,X是所有域参与者中MANUAL_BY_PARTICIPANT DataWriters的最短lease_duration。如果其中任何DataWriters调用了这些操作,inter-participant writer将会可靠广播一个MANUAL liveliness消息。

如果调用了域参与者的assert_liveliness()操作,并且该域参与者中有任意一个MANUAL_BY_PARTICIPANT DataWriter,inter-participant writer将在X时间段内可靠广播一个MANUAL liveliness消息。如下图中所有:

rti-dds12

inter-participant reader从远程inter-participant writer接收数据,并相应的断言远程域参与者端点的活动性。如果域参与者没有MANUAL_BY_PARTICIPANT or AUTOMATIC DataWriter,那么inter-participant writer不会发送liveliness消息。

Endpoint Discovery

端点发现阶段的目标是把远程端点添加到本地数据库中,以便用户创建的端点(应用程序的DataWriters和DataReaders)可以相互通信。

当发现新的远程域参与者并添加到参与者的数据库时,Connext DDS自动为该远程域参与者添加两个发现端点reader和两个发现端点writer到本地数据库中。并于本地发现端点writer和reader进行匹配,然后就可以在两个域参与者的发现端点writer和reader之间传输publication DATAs和subscription DATAs。

当为用户数据创建DATAReader/DATAWriter时,描述新创建对象的subscription DATA/publication DATA将从本地发现端点writer发送到本地数据库中的远程域参与者的远程发现端点reader。当删除DATAReader/DATAWriter时,同理。

除了发送subscription DATA/publication DATA外,发现端点writer还会定期检查远程发现端点reader是否是最新的。如果发现端点writer没有确认远程端点reader收到最新的DATA,发现端点writer会发送一条特殊的HB消息(F=0),来请求从远程端点reader的确认,如下图所示:

rti-dds13

即使当本地用户的DATAWriter和DATAReader在远程域参与者被发现前已经创建,远程参与者仍然可以被告知之前创建的DATAWriter和DATAReader。通过HB和ACK/NACK实现,当发现一个新的远程参与者时,它们分别由built-int endpoint writer和built-int endpoint reader发送。

下图中,通过HB来触发后加入者发现DataWriter。参与者A的writer C在发现参与者B之前被创建。对于A,响应接收到participant B DATA消息,participant A DATA消息被发送给B。在A上的端点发现reader也会发送ACK/NACK给B上的端点发现writer。

rti-dds14

下图中,通过ACKNACK来触发后加入者发现DataWriter。参与者A的writer C在发现参与者B之前被创建。当B接收到新的Participant A DATA消息时,发送Participant B DATA给A。B上的发现端点writer也会给A上的发现端点reader发送HB消息。为了简化,这些过程在下图中被省略了。

rti-dds15

通信报文

RTPS协议

RTPS层包含header(红色)和submessages(绿色)。

rti-dds16

Default port mapping:不是header本身的一部分,而是从数据包的目的端口提取的信息。通过了解目的端口,可以找到domain_id, participant_index, 和the nature of the traffic(unicast or multicast, user traffic or meta traffic)。

RTPS Header

rti-dds17

rti-dds18

类别说明
Magic表示该报文类型为RTPS
Protocol version表示该版本号为2.3;由主版本和次版本组成
vendorId表示供应商为Connext DDS
guidPrefix由host ID,app ID,instance ID组成,唯一标识该RTPS参与者

RTPS Submessages

Submessages包含flags。flags的含义取决于submessage,通常是”endianness”flag。

INFO_TS(0x09)

显示数据包的源时间戳。Endianness表示它是小字节序(1)还是大字节序(0)。

rti-dds19

INFO_DST(0x0e)

INFO_DST指示在该条RTPS消息中,INFO_DST后面的子消息处理者RTPS reader实体的GuidPrefix为INFO_DST.GuidPrefix。

rti-dds20

HEARTBEAT(0x07)

HEARTBEAT指示DataWriter队列中样本的可用性。在下面的示例中,DataWriter宣布序列号为1至6的样本可用。这是此DataWriter发送的第6号心跳。

rti-dds21

该消息有两个功能:

  1. 它通知Reader在Writer的历史缓存中可用的序列号,这样Reader就可以请求(使用一个AckNack消息)任何已经错过的内容。

  2. 它要求Reader发送对已进入Reader历史缓存的CacheChange更改的确认,以便Writer了解Reader的状态。

rti-dds22

HEARTBEAT_FRAG(0x13)

当对数据进行分段时,直到所有分段都可用,HEARTBEATFRAG子消息从RTPS Writer发送到RTPS Reader,以告知有哪些分段可用。这实现了片段级的可靠通信。一旦所有片段都可用,就使用常规Heartbeat消息。

ACKNACK (0x06)

rti-dds23

该子消息用于将Reader的状态通知给Writer。该子消息允许Reader告知Writer所收到的序列号,以及它仍然缺少的序列号。出现在集合里面的序列号是Reader没有收到的序列号。

rti-dds24

rti-dds25

下图中的ACKNACK.readerSNState的bitmapBase和numBits都为0,可以理解为是一条初始化ACKNACK消息。该消息通常在StatefulReader匹配一个StatefulWriter的时候被发送。当StatefulWriter收到该消息时,响应一条HEARTBEAT消息。

rti-dds26

NACK_FRAG (0x12)

NACK _FRAG子消息用于将Reader的状态传达给Writer。当数据更改作为一系列片段发送时,NACKFRAG子消息允许Reader通知Writer它仍然缺少的特定片段编号。

该子消息只能包含否定确认。注意这不同于ACKNACK子消息,ACKNACK子消息包括肯定和否定确认。

DATA(p) (0x15)

数据消息,包含参与者的属性信息。

DATA (w)(0x15)

数据消息,包含Writer的属性消息,当发布端writer创建成功后,该消息会被发送给订阅端。

DATA (r)(0x15)

数据消息,包含Reader的属性信息,当订阅端reader创建成功后,该消息会被发送给发布端。

DATA (0x15)

rti-dds27

DATA表示数据消息,包含用户发送的实际消息内容。

DATA子消息包含用户数据或元流量数据。用户数据是应用程序通过write()操作发送的数据。元流量数据由Connext DDS出于不同目的发送(发现、匹配内置端点、DDS安全握手等)。

rti-dds28

如果Wireshark知道DataWriter GUID,则意味着这是元流量。例如,上面的DataWriter GUID是未知的(它是应用程序定义的编写器),但下面的DataWriter GUID是已知的(BUILTIN_PUBLICATIONS_WRITER,DataWriter的发现流量)。

rti-dds29

当DATA子消息包含元流量时,Wireshark会正确解析该信息。在这种情况下,有一个来自发现过程的参与者公告(writerEntityId为BUILTIN_SDP_PARTICIPANT_WRITER,其中SDP代表Simple Discovery Protocol)

rti-dds30

DATA_FRAG (0x16)

DATA_FRAG子消息通过将序列化数据分段并作为多个DATA_FRAG子消息发送来扩展DATA子消息。然后,RTPS Reader重新组装DATA_FRAG子消息中的片段。

SEC_PREFIX (0x31)

SEC_PREFIX子消息提供了一种在合法RTPS子消息中添加安全内容前缀的方法。

rti-dds31

其中Transformation Kind有如下的value:

rti-dds32

SEC_POSTFIX (0x32)

SEC_POSTFIX子消息提供了一种验证SEC _PREFIX之前的RTPS子消息的有效性和来源的方法。应用的加密转换在相关的SEC_PREFIX中标识。

SEC_BODY (0x30)

SEC_BODY子消息包装了一条或多条常规RTPS子消息,其内容通过加密、消息验证和或数字签名得到保护。

SRTPS_PREFIX (0x33)

SRTPS_PREFIX子消息提供了一种为RTPS子消息列表添加前缀的方法,这样它们就可以受到保护。SRTPS _PREFIX之后应是RTPS子消息列表,这些子消息之后应是SRTPS _POSTFIX子消息。

rti-dds33

其中Transformation Kind有如下的value:

rti-dds34

SRTPS_POSTFIX (0x34)

SRTPS_POSTFIX子消息提供了一种方法来验证相关SRTPS_PREFIX和SRTPS_POSTFIX子消息之间的RTPS子消息列表的有效性和来源。应用的加密转换在相关SRTPS_PREFIX子消息中标识。

GAP (0x08)

GAP子消息用于通知DataReader 特定样本在DataWriter的队列中不可用:例如,当应用程序在样本发送后启动并且DurabilityQos为VOLATILE时。另一个例子是在DataWriter端筛选出样本,而DataReader还不知道。

rti-dds35

PAD (0x01)

PAD子消息允许引入任何必要的填充以满足任何所需的内存对齐要求。此子消息没有内容。它只用子消息头部分就完成了它的目的。填充量由submessageLength的值决定。

报文分析

rti-dds36

在每个域参与者中都有六个built-in entity用来处理发现过程。

Participant Built-in DataWriterParticipant Built-in DataReader用在参与者发现阶段。他们之间分享DATA(p)包,该包包含了参与者的GUID,QoS,locator信息。

Publication Built-in DataWriterPublication Built-in DataReader用在端点发现阶段中对包含的DataWriter的声明。他们之间分享DATA(w)包,该包包含了DataWriter的类型,topic,QoS信息。

Subscription Built-in DataWriterSubscription Built-in DataReader用在端点发现阶段中对包含的DataReader的声明。他们之间分享DATA(r)包,该包包含了DataReader的类型,topic,QoS信息。

built-in writers和built-in readers的ID在DDS标准中定义了:

类别
Publications writer0x000003c2
Publications reader0x000003c7
Subscriptions writer0x000004c2
Subscriptions reader0x000004c7

[!NOTE]

小技巧:在wireshark中,使用关键字rtps.sm.wrEntityId可以对端点发现阶段中的DataReader和DataWriter相关的报文进行筛选:

1
2
rtps.sm.wrEntityId == 0x000003c2        # 筛选DataWriter
rtps.sm.wrEntityId == 0x000004c2        # 筛选DataReader

考虑一个涉及两台主机的示例场景,每台主机运行一个DDS应用:

rti-dds37

发现报文过程:

rti-dds38

#65 从应用程序的wrietr Ping应用程序reader‘s locator。

rti-dds39

其中,第76条中gapList.bitmapBase的值4,与Subscribe接下来从Publisher接收到的第一条报文中的writerseqNumber的值4相对应,表示在此之前Publisher发送的三条消息并未收到。

rti-dds40

rti-dds41

rti-dds42

本文由作者按照 CC BY 4.0 进行授权