fastdds服务发现源码分析
内置协议
BuiltinProtocols类是内置协议的主要处理器。BuiltinProtocols包括以下主要数据:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//协议属性字段
BuiltinAttributes m_att;
//服务发现的PDP对象
PDP* mp_PDP;
//WriterLiveness 对象,用于心跳保活
WLP* mp_WLP;
//!Pointer to the TypeLookupManager
fastdds::dds::builtin::TypeLookupManager* tlm_;
//!Locator list for metatraffic
LocatorList_t m_metatrafficMulticastLocatorList;
//!Locator List for metatraffic unicast
LocatorList_t m_metatrafficUnicastLocatorList;
//! Initial peers
LocatorList_t m_initialPeersList;
//! Known discovery and backup server container
std::list<eprosima::fastdds::rtps::RemoteServerAttributes> m_DiscoveryServers;
BuiltinAttributes类用于配置BuiltinProtocols所需要的内容,主要有以下字段:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
DiscoverySettings discovery_config;
//是否使用保活协议(WriterLiveliness protocol)
bool use_WriterLivelinessProtocol = true;
//! TypeLookup Service settings
TypeLookupSettings typelookup_config;
//单播地址,这个默认是7410,0.0.0.0 EDP
LocatorList_t metatrafficUnicastLocatorList;
//组播地址,默认为7400,239.255.0.1 PDP
LocatorList_t metatrafficMulticastLocatorList;
//! The collection of external locators to use for communication on metatraffic topics.
fastdds::rtps::ExternalLocators metatraffic_external_unicast_locators;
//! Initial peers.
LocatorList_t initialPeersList;
//! Memory policy for builtin readers
MemoryManagementPolicy_t readerHistoryMemoryPolicy =
MemoryManagementPolicy_t::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;
//! Maximum payload size for builtin readers
uint32_t readerPayloadSize = BUILTIN_DATA_MAX_SIZE;
//! Memory policy for builtin writers
MemoryManagementPolicy_t writerHistoryMemoryPolicy =
MemoryManagementPolicy_t::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;
//! Maximum payload size for builtin writers
uint32_t writerPayloadSize = BUILTIN_DATA_MAX_SIZE;
//! Mutation tries if the port is being used.
uint32_t mutation_tries = 100u;
BuiltinAttributes对象在什么地方被初始化?
这些内置协议包括:
- 参与者发现协议(Participant Discovery Protocol):该协议让参与者(即DDS应用)可以发现网络中的其他参与者。
- 发布者/订阅者发现协议(Publisher/Subscriber Discovery Protocol):这允许参与者发现网络中其他参与者的发布者和订阅者。
- 读写器发现协议(Reader/Writer Discovery Protocol):这使参与者可以发现网络中其他参与者的数据读写器。
- WLP保活协议(心跳,WriterLiveliness),主要由类WLP来实现
这些内置协议在Fast DDS中是自动启动并运行的,而BuiltinProtocols类就是用来管理和处理这些内置协议的。总的来说,BuiltinProtocols类的主要作用是管理和处理Fast DDS的内置协议,这些协议使得DDS应用可以在网络中发现和与其他应用交互。
发现阶段
服务发现的创建和发现都是在BuiltinProtocols类中处理。所有的服务发现都可以分为两个阶段:
- Participant Discovery Phase(PDP):这个阶段DomainParticipant通过周期性的广播消息获取互相的存在。
- Endpoint Discovery Phase (EDP):这个阶段DomainParticipant会广播其publihser和Subscriber的信息,同时也会接收其他DomainPariticipant的Puiblisher和Subscriber信息。对于两个要匹配的端点而言,topic和data type必须要一致。一旦Puiblisher和Subscriber匹配上之后,就可以接收和发送用户数据了。
发现协议
- Simple:值为SIMPLE,由RTPS标准指定的Simple discovery协议
- Static:值为STATIC,SPDP和XML文件中手动指定的EDP
- Discovery Server:
- SERVER:DomainParticipant作为仓库,接收和发布发信信息
- CLIENT:在发现过程中DomainParticipant作为client。它给server发送发现信息,并从server只接收与自己相关的发现信息
- SUPER_CLIENT:在发现过程中DomainParticipant扮演client。它给server发送发现信息,并接收所有从server收到的发现信息
- BACKUP:创建一个有持久化的sqlite数据库的SERVER DomainParticipant。BACKUP server可以在一开始加载数据库,这种服务可以让服务发现更加有弹性
- Manual:NONE,没有PDP阶段,因此也就没有EDP阶段,所有匹配只能通过RTPS层的addReaderLocator, addReaderProxy, addWriterProxy方法
开发视图
PDP类图
EDP类图
PDPEndPoints类图
服务发现时序
PDP阶段
发送
在类BuiltinProtocols::initBuiltinProtocols
中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// PDP
switch (m_att.discovery_config.discoveryProtocol)
{
case DiscoveryProtocol_t::NONE:
EPROSIMA_LOG_WARNING(RTPS_PDP, "No participant discovery protocol specified");
return true;
case DiscoveryProtocol_t::SIMPLE:
mp_PDP = new PDPSimple(this, allocation);
break;
case DiscoveryProtocol_t::EXTERNAL:
EPROSIMA_LOG_ERROR(RTPS_PDP, "Flag only present for debugging purposes");
return false;
case DiscoveryProtocol_t::CLIENT:
mp_PDP = new fastdds::rtps::PDPClient(this, allocation);
break;
case DiscoveryProtocol_t::SERVER:
mp_PDP = new fastdds::rtps::PDPServer(this, allocation, DurabilityKind_t::TRANSIENT_LOCAL);
break;
#if HAVE_SQLITE3
case DiscoveryProtocol_t::BACKUP:
mp_PDP = new fastdds::rtps::PDPServer(this, allocation, DurabilityKind_t::TRANSIENT);
break;
#endif // if HAVE_SQLITE3
case DiscoveryProtocol_t::SUPER_CLIENT:
mp_PDP = new fastdds::rtps::PDPClient(this, allocation, true);
break;
default:
EPROSIMA_LOG_ERROR(RTPS_PDP, "Unknown DiscoveryProtocol_t specified.");
return false;
}
if (!mp_PDP->init(mp_participantImpl))
{
EPROSIMA_LOG_ERROR(RTPS_PDP, "Participant discovery configuration failed");
delete mp_PDP;
mp_PDP = nullptr;
return false;
}
根据具体的发现模式创建对应PDP实例,然后初始化此实例,这里以PDPSimple
来细说
PDPSimple对象创建,同时创建多个ParticipantProxyData,ReaderProxyData,WriterProxyData用于存储数据,这部分是在PDPSimple构造函数中执行的
PDP对象初始化,创建PDPEndpoints(SimplePDPEndpoints)对象, 这个对象主要用于存储SPDP的端点信息,这个对象中包含两个成员:
BuiltinReader<fastrtps::rtps::StatelessReader> reader
BuiltinWriter<fastrtps::rtps::StatelessWriter> writer
创建是使用的
createPDPEndpoints
这是一个虚借口在每个对应的PDP实例中都会实现创建内置topic
DCPSParticipant
这里将对应的读写端点创建了填充PDP数据
这里调用了initializeParticipantProxyData去填充发送的PDP数据结构,下面把代码直接拷贝过来加注释, 注意这个数据填充是先调用基类PDP总的初始化,然后再调用派生类的初始化(这里是PDPSimple)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
void PDP::initializeParticipantProxyData( ParticipantProxyData* participant_data) { RTPSParticipantAttributes& attributes = mp_RTPSParticipant->getAttributes(); bool announce_locators = !mp_RTPSParticipant->is_intraprocess_only(); participant_data->m_leaseDuration = attributes.builtin.discovery_config.leaseDuration; //set_VendorId_eProsima(participant_data->m_VendorId); /* 厂商ID */ participant_data->m_VendorId = c_VendorId_eProsima; // TODO: participant_data->m_availableBuiltinEndpoints |= mp_builtin->available_builtin_endpoints(); /* 这里开始有相当一段代码都在填充m_availableBuiltinEndpoints这个结构 这个对应的是PID_VENDOR_BUILTIN_ENDPOINT_SET */ participant_data->m_availableBuiltinEndpoints |= builtin_endpoints_->builtin_endpoints(); if (attributes.builtin.use_WriterLivelinessProtocol) { participant_data->m_availableBuiltinEndpoints |= BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER; participant_data->m_availableBuiltinEndpoints |= BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER; #if HAVE_SECURITY if (mp_RTPSParticipant->is_secure()) { participant_data->m_availableBuiltinEndpoints |= BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_SECURE_DATA_WRITER; participant_data->m_availableBuiltinEndpoints |= BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_SECURE_DATA_READER; } #endif // if HAVE_SECURITY } if (attributes.builtin.typelookup_config.use_server) { participant_data->m_availableBuiltinEndpoints |= BUILTIN_ENDPOINT_TYPELOOKUP_SERVICE_REQUEST_DATA_READER; participant_data->m_availableBuiltinEndpoints |= BUILTIN_ENDPOINT_TYPELOOKUP_SERVICE_REPLY_DATA_WRITER; } if (attributes.builtin.typelookup_config.use_client) { participant_data->m_availableBuiltinEndpoints |= BUILTIN_ENDPOINT_TYPELOOKUP_SERVICE_REQUEST_DATA_WRITER; participant_data->m_availableBuiltinEndpoints |= BUILTIN_ENDPOINT_TYPELOOKUP_SERVICE_REPLY_DATA_READER; } #if HAVE_SECURITY if (mp_RTPSParticipant->is_secure()) { participant_data->m_availableBuiltinEndpoints |= mp_RTPSParticipant->security_manager().builtin_endpoints(); } #endif // if HAVE_SECURITY /* 这里是-> PID_DEFAULT_UNICAST_LOCATOR和PID_DEFAULT_MULTICAST_LOCATOR */ if (announce_locators) { participant_data->m_networkConfiguration = attributes.builtin.network_configuration; for (const Locator_t& loc : attributes.defaultUnicastLocatorList) { participant_data->default_locators.add_unicast_locator(loc); } for (const Locator_t& loc : attributes.defaultMulticastLocatorList) { participant_data->default_locators.add_multicast_locator(loc); } } /* 这个在fastdds中是写死的,也就是说QOS永远不会内联 */ participant_data->m_expectsInlineQos = false; /* 设置GUID */ participant_data->m_guid = mp_RTPSParticipant->getGuid(); memcpy( participant_data->m_key.value, participant_data->m_guid.guidPrefix.value, 12); memcpy( participant_data->m_key.value + 12, participant_data->m_guid.entityId.value, 4); // Keep persistence Guid_Prefix_t in a specific property. This info must be propagated to all builtin endpoints { // Use user set persistence guid GuidPrefix_t persistent = mp_RTPSParticipant->get_persistence_guid_prefix(); // If it has not been set, use guid if (persistent == c_GuidPrefix_Unknown) { persistent = attributes.prefix; } // If persistent is set, set it into the participant proxy if (persistent != c_GuidPrefix_Unknown) { participant_data->set_persistence_guid( GUID_t( persistent, c_EntityId_RTPSParticipant)); } } /* 元流量相关的单播和组播 */ participant_data->metatraffic_locators.unicast.clear(); if (announce_locators) { for (const Locator_t& loc : this->mp_builtin->m_metatrafficUnicastLocatorList) { participant_data->metatraffic_locators.add_unicast_locator(loc); } } participant_data->metatraffic_locators.multicast.clear(); if (announce_locators) { if (!m_discovery.avoid_builtin_multicast || participant_data->metatraffic_locators.unicast.empty()) { for (const Locator_t& loc: this->mp_builtin->m_metatrafficMulticastLocatorList) { participant_data->metatraffic_locators.add_multicast_locator(loc); } } fastdds::rtps::network::external_locators::add_external_locators(*participant_data, attributes.builtin.metatraffic_external_unicast_locators, attributes.default_external_unicast_locators); } /* 参与者名称 */ participant_data->m_participantName = std::string(attributes.getName()); /* 用户数据 */ participant_data->m_userData = attributes.userData; #if HAVE_SECURITY if (mp_RTPSParticipant->is_secure()) { IdentityToken* identity_token = nullptr; if (mp_RTPSParticipant->security_manager().get_identity_token(&identity_token) && identity_token != nullptr) { participant_data->identity_token_ = std::move(*identity_token); mp_RTPSParticipant->security_manager().return_identity_token(identity_token); } PermissionsToken* permissions_token = nullptr; if (mp_RTPSParticipant->security_manager().get_permissions_token(&permissions_token) && permissions_token != nullptr) { participant_data->permissions_token_ = std::move(*permissions_token); mp_RTPSParticipant->security_manager().return_permissions_token(permissions_token); } const security::ParticipantSecurityAttributes& sec_attrs = mp_RTPSParticipant->security_attributes(); participant_data->security_attributes_ = sec_attrs.mask(); participant_data->plugin_security_attributes_ = sec_attrs.plugin_participant_attributes; } else { participant_data->security_attributes_ = 0UL; participant_data->plugin_security_attributes_ = 0UL; } #endif // if HAVE_SECURITY // Set properties that will be sent to Proxy Data set_external_participant_properties_(participant_data); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
void PDPSimple::initializeParticipantProxyData( ParticipantProxyData* participant_data) { PDP::initializeParticipantProxyData(participant_data); if (getRTPSParticipant()->getAttributes().builtin.discovery_config. use_SIMPLE_EndpointDiscoveryProtocol) { if (getRTPSParticipant()->getAttributes().builtin.discovery_config.m_simpleEDP. use_PublicationWriterANDSubscriptionReader) { participant_data->m_availableBuiltinEndpoints |= DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER; participant_data->m_availableBuiltinEndpoints |= DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR; } if (getRTPSParticipant()->getAttributes().builtin.discovery_config.m_simpleEDP. use_PublicationReaderANDSubscriptionWriter) { participant_data->m_availableBuiltinEndpoints |= DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR; participant_data->m_availableBuiltinEndpoints |= DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER; } #if HAVE_SECURITY if (getRTPSParticipant()->getAttributes().builtin.discovery_config.m_simpleEDP. enable_builtin_secure_publications_writer_and_subscriptions_reader) { participant_data->m_availableBuiltinEndpoints |= DISC_BUILTIN_ENDPOINT_PUBLICATION_SECURE_ANNOUNCER; participant_data->m_availableBuiltinEndpoints |= DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_SECURE_DETECTOR; } if (getRTPSParticipant()->getAttributes().builtin.discovery_config.m_simpleEDP. enable_builtin_secure_subscriptions_writer_and_publications_reader) { participant_data->m_availableBuiltinEndpoints |= DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_SECURE_ANNOUNCER; participant_data->m_availableBuiltinEndpoints |= DISC_BUILTIN_ENDPOINT_PUBLICATION_SECURE_DETECTOR; } #endif // if HAVE_SECURITY } else if (!getRTPSParticipant()->getAttributes().builtin.discovery_config. use_STATIC_EndpointDiscoveryProtocol) { EPROSIMA_LOG_ERROR(RTPS_PDP, "Neither EDP simple nor EDP static enabled. Endpoints will not be discovered."); } }
创建EDP
初始化EDP 这里将内置topic
DCPSSubscription
和DCPSPublication
创建出来了
到此处准备工作就基本完成了,相关的实例创建啥的就已经完成了
PDP接收
然后通过RTPSParticipant的enable函数的调用,会调用到 BuiltinProtocols::enable(),然后开始服务发现阶段
- 首先mp_PDP->enable()会创建两个定时器,第一个是判断参与者是否掉线的,掉线就删除 这个时间控制是leaseDuration 控制的 租赁时间默认是20秒,也就是说20秒内未收到公告数据就判定为掉线。
这里定时器用的都是RTPSParticipantImpl中创建的定时器资源 注意这里的时间传的都是0,是回掉函数内部又对定时器重新赋值过的
这里调用set_initial_announcement_interval时会掉用以下函数,这里面count的初始值是5 period是100ms 也就是说pdp的前五次公告数据会很频繁,然就会变为leaseDuration_announcementperiod设置的时间默认为3秒
最后调用enable_pdp_readers,这里最终会调用到
RTPSParticipantImpl::enableReader
,这里会将端点分配给接受资源assignEndpoint2LocatorList中继续调用 MessageReceiver::associateEndpoint函数,会将PDP对象的EntityID添加到接收消息的readers中:
PDP::announceParticipantState函数中,发布本端participant的信息
上面就是发送pdp的流程了
关于数据接收,这个比较绕,以下是大致的数据流向示意图
这个processCDRMsg收到消息后解析处理后,会通过findAllReaders函数将信息进行分发, 下面代码大概意思就是,如果实体ID不为空那就直接查找实体ID对应的读取器,然后将消息回掉出去,否则就遍历所有的读取器,找到能接收未知消息的读取器将消息回掉出去
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
template<typename Functor>
void MessageReceiver::findAllReaders(
const EntityId_t& readerID,
const Functor& callback) const
{
if (readerID != c_EntityId_Unknown)
{
const auto readers = associated_readers_.find(readerID);
if (readers != associated_readers_.end())
{
for (const auto& it : readers->second)
{
callback(it);
}
}
}
else
{
for (const auto& readers : associated_readers_)
{
for (const auto& it : readers.second)
{
if (it->m_acceptMessagesToUnknownReaders)
{
callback(it);
}
}
}
}
}
在processCDRMsg中主要都是在对协议的解析, 下面是子消息相关的
这里关注一个函数 proc_Submsg_Data 这是子消息DATA的解析函数,会调用到findAllReaders,然后callback会调用StatelessReader::processDataMsg函数,调用到PDP的监听器PDPListener的onNewCacheChangeAdded函数,接着调用到PDPSimple的assignRemoteEndpoints函数
assignRemoteEndpoints
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void PDPSimple::assignRemoteEndpoints(
ParticipantProxyData* pdata)
{
bool ignored = false;
notify_and_maybe_ignore_new_participant(pdata, ignored);
if (!ignored)
{
#if HAVE_SECURITY
auto endpoints = dynamic_cast<fastdds::rtps::SimplePDPEndpointsSecure*>(builtin_endpoints_.get());
if (nullptr != endpoints)
{
// This participant is secure.
// PDP should have been matched inside notifyAboveRemoteEndpoints after completing the authentication process.
// We now match the other builtin endpoints.
GUID_t remote_guid = pdata->m_guid;
remote_guid.entityId = c_EntityId_spdp_reliable_participant_secure_writer;
bool notify_secure = endpoints->secure_reader.reader_->matched_writer_is_matched(remote_guid);
assign_low_level_remote_endpoints(*pdata, notify_secure);
}
else
#endif // if HAVE_SECURITY
{
// This participant is not secure.
// Match PDP and other builtin endpoints.
match_pdp_remote_endpoints(*pdata, false);
assign_low_level_remote_endpoints(*pdata, false);
}
}
}
这里在assign_low_level_remote_endpoints 中调用了 mp_EDP->assignRemoteEndpoints(pdata, notify_secure_endpoints);
注意在代码中一般会看到 reader->matched_writer_add(writer_data)或者 writer->matched_reader_add(reader_data) 因为端点匹配本来就是read和write配对的
此外,服务发现的EntityID是标准固定,回调函数中通过固定EntityID来调用不同的监听对象。这些固定的Entity ID定义在EntityId_t.hpp中,主要有以下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
const EntityId_t c_EntityId_Unknown = ENTITYID_UNKNOWN;
const EntityId_t c_EntityId_SPDPReader = ENTITYID_SPDP_BUILTIN_RTPSParticipant_READER;
const EntityId_t c_EntityId_SPDPWriter = ENTITYID_SPDP_BUILTIN_RTPSParticipant_WRITER;
const EntityId_t c_EntityId_SEDPPubWriter = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER;
const EntityId_t c_EntityId_SEDPPubReader = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER;
const EntityId_t c_EntityId_SEDPSubWriter = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER;
const EntityId_t c_EntityId_SEDPSubReader = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER;
const EntityId_t c_EntityId_RTPSParticipant = ENTITYID_RTPSParticipant;
const EntityId_t c_EntityId_WriterLiveliness = ENTITYID_P2P_BUILTIN_RTPSParticipant_MESSAGE_WRITER;
const EntityId_t c_EntityId_ReaderLiveliness = ENTITYID_P2P_BUILTIN_RTPSParticipant_MESSAGE_READER;
const EntityId_t participant_stateless_message_writer_entity_id = ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_WRITER;
const EntityId_t participant_stateless_message_reader_entity_id = ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_READER;
const EntityId_t c_EntityId_TypeLookup_request_writer = ENTITYID_TL_SVC_REQ_WRITER;
const EntityId_t c_EntityId_TypeLookup_request_reader = ENTITYID_TL_SVC_REQ_READER;
const EntityId_t c_EntityId_TypeLookup_reply_writer = ENTITYID_TL_SVC_REPLY_WRITER;
const EntityId_t c_EntityId_TypeLookup_reply_reader = ENTITYID_TL_SVC_REPLY_READER;
暂时没解决
这个报文来源未找到, 现在已经知道是PDPSimple::match_pdp_remote_endpoints
中处理的此报文
EDP阶段
当收到PDP的数据后,进入EDPSimple::assignRemoteEndpoints
函数
这个阶段主要是reader和writer的配置
根据其数据标志位PID_VENDOR_BUILTIN_ENDPOINT_SET 如果存在对应的端点就添加
在StatefulWriter::matched_reader_add
中调用了send_heartbeat_nts_
填充数据,然后flush_and_reset
发送HEARTBEAT
注意这判断是否处于同一进程内,进行了不同的HEARTBEAT发送 这里理论上会发送三次,因为有subscriptions和publications 然后还有个WLP
这里的大致时序是( StatefulWriter/StatefulReader)
所以这里对应的StatefulReader::processHeartbeatMsg
就会收到一个HEARTBEAT消息,最终会执行到WriterProxy::process_heartbeat
在这里会根据报文判断是否需要响应ACK
如果需要的话这个ack,会由WriterProxy::perform_heartbeat_response
发送
这里发送的ACK会在 对端的StatefulWriter::process_acknack
中处理
这里抓包产生的数据如下
UserWriter和UserReader建立关联
以上PDP EDP截止到之前都是dds内部端点建立关联, 用户端点就是你自己建立的用来收发数据的端点
UserWriter在被创建时,它的属性信息被封装成数据,存放在发布端EDP的SEDPPubWriter的历史缓存中。UserWriter被创建时调用堆栈如下:
UserReader在被创建时,它的属性信息也被封装成数据,存放在订阅端EDP的SEDPSubWriter的历史缓存中。UserReader被创建时调用堆栈如下:
UserWriter是在BuiltinProtocols::addLocalWriter函数中调用EDP::newLocalWriterProxyData函数,在processLocalWriterProxyData中发送SEDP数据
这个是pub接受到sub发过来的DATA(p)报文进行匹配
注意这个接收是EDPSimpleSUBListener处理的
补充
Simple和Static时序
FastDDS 服务发现流程总结
这里忽略了wlp