diff -Npur -X ex.file a/cmake/dev/check_configuration.cmake b/cmake/dev/check_configuration.cmake --- a/cmake/dev/check_configuration.cmake 2019-01-30 16:06:46.345544075 +0800 +++ b/cmake/dev/check_configuration.cmake 2019-01-30 16:12:27.917537958 +0800 @@ -14,6 +14,7 @@ macro(check_stdcxx) # Check C++11 + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g") include(CheckCXXCompilerFlag) if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_COMPILER_IS_CLANG OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") diff -Npur -X ex.file a/include/fastrtps/Domain.h b/include/fastrtps/Domain.h --- a/include/fastrtps/Domain.h 2019-01-30 16:06:46.353544075 +0800 +++ b/include/fastrtps/Domain.h 2019-01-30 16:12:27.917537958 +0800 @@ -19,7 +19,7 @@ #ifndef DOMAIN_H_ #define DOMAIN_H_ - +#include #include "attributes/ParticipantAttributes.h" namespace eprosima{ @@ -164,7 +164,7 @@ private: static std::vector m_participants; static bool default_xml_profiles_loaded; - + static std::mutex m_lock; }; } /* namespace */ diff -Npur -X ex.file a/include/fastrtps/rtps/builtin/discovery/participant/PDPSimpleListener.h b/include/fastrtps/rtps/builtin/discovery/participant/PDPSimpleListener.h --- a/include/fastrtps/rtps/builtin/discovery/participant/PDPSimpleListener.h 2019-01-30 16:06:46.353544075 +0800 +++ b/include/fastrtps/rtps/builtin/discovery/participant/PDPSimpleListener.h 2019-01-30 16:12:27.921537958 +0800 @@ -70,9 +70,9 @@ public: */ bool getKey(CacheChange_t* change); //!Temporal RTPSParticipantProxyData object used to read the messages. - ParticipantProxyData m_ParticipantProxyData; + //ParticipantProxyData m_ParticipantProxyData; //!Auxiliary message. - CDRMessage_t aux_msg; + //CDRMessage_t aux_msg; }; diff -Npur -X ex.file a/include/fastrtps/rtps/messages/RTPS_messages.h b/include/fastrtps/rtps/messages/RTPS_messages.h --- a/include/fastrtps/rtps/messages/RTPS_messages.h 2019-01-30 16:06:46.357544075 +0800 +++ b/include/fastrtps/rtps/messages/RTPS_messages.h 2019-01-30 16:12:27.921537958 +0800 @@ -29,22 +29,22 @@ namespace fastrtps{ namespace rtps{ // //!@brief Enumeration of the different Submessages types -#define PAD 0x01 -#define ACKNACK 0x06 -#define HEARTBEAT 0x07 -#define GAP 0x08 -#define INFO_TS 0x09 -#define INFO_SRC 0x0c -#define INFO_REPLY_IP4 0x0d -#define INFO_DST 0x0e -#define INFO_REPLY 0x0f -#define NACK_FRAG 0x12 -#define HEARTBEAT_FRAG 0x13 -#define DATA 0x15 -#define DATA_FRAG 0x16 -#define SEC_PREFIX 0x31 -#define SRTPS_PREFIX 0x33 +const int PAD = 0x01; +const int ACKNACK = 0x06; +const int HEARTBEAT = 0x07; +const int GAP = 0x08; +const int INFO_TS = 0x09; +const int INFO_SRC = 0x0c; +const int INFO_REPLY_IP4 = 0x0d; +const int INFO_DST = 0x0e; +const int INFO_REPLY = 0x0f; +const int NACK_FRAG = 0x12; +const int HEARTBEAT_FRAG = 0x13; +const int DATA = 0x15; +const int DATA_FRAG = 0x16; +const int SEC_PREFIX = 0x31; +const int SRTPS_PREFIX = 0x33; //!@brief Structure Header_t, RTPS Message Header Structure. //!@ingroup COMMON_MODULE struct Header_t{ diff -Npur -X ex.file a/include/fastrtps/rtps/reader/StatefulReader.h b/include/fastrtps/rtps/reader/StatefulReader.h --- a/include/fastrtps/rtps/reader/StatefulReader.h 2019-01-30 16:06:46.357544075 +0800 +++ b/include/fastrtps/rtps/reader/StatefulReader.h 2019-01-30 16:12:27.921537958 +0800 @@ -165,6 +165,7 @@ class StatefulReader:public RTPSReader */ inline size_t getMatchedWritersSize() const { return matched_writers.size(); } + void sendFragAck(WriterProxy *mp_WP, CacheChange_t *cit); /*! * @brief Returns there is a clean state with all Writers. * It occurs when the Reader received all samples sent by Writers. In other words, diff -Npur -X ex.file a/include/fastrtps/utils/IPFinder.h b/include/fastrtps/utils/IPFinder.h --- a/include/fastrtps/utils/IPFinder.h 2019-01-30 16:06:46.357544075 +0800 +++ b/include/fastrtps/utils/IPFinder.h 2019-01-30 16:12:27.921537958 +0800 @@ -61,7 +61,7 @@ public: IPFinder(); virtual ~IPFinder(); - RTPS_DllAPI static bool getIPs(std::vector* vec_name, bool return_loopback = false); + RTPS_DllAPI static bool getIPs(std::vector* vec_name, bool return_loopback = true); /** * Get the IP4Adresses in all interfaces. diff -Npur -X ex.file a/src/cpp/Domain.cpp b/src/cpp/Domain.cpp --- a/src/cpp/Domain.cpp 2019-01-30 16:06:46.357544075 +0800 +++ b/src/cpp/Domain.cpp 2019-01-30 16:12:27.873537959 +0800 @@ -40,6 +40,7 @@ namespace fastrtps { std::vector Domain::m_participants; bool Domain::default_xml_profiles_loaded = false; +std::mutex Domain::m_lock; Domain::Domain() @@ -55,6 +56,7 @@ Domain::~Domain() void Domain::stopAll() { + std::lock_guard guard(m_lock); while(m_participants.size()>0) { Domain::removeParticipant(m_participants.begin()->first); @@ -65,6 +67,7 @@ void Domain::stopAll() bool Domain::removeParticipant(Participant* part) { + std::lock_guard guard(m_lock); if(part!=nullptr) { for(auto it = m_participants.begin();it!= m_participants.end();++it) @@ -83,6 +86,7 @@ bool Domain::removeParticipant(Participa bool Domain::removePublisher(Publisher* pub) { + std::lock_guard guard(m_lock); if(pub!=nullptr) { for(auto it = m_participants.begin();it!= m_participants.end();++it) @@ -99,6 +103,7 @@ bool Domain::removePublisher(Publisher* bool Domain::removeSubscriber(Subscriber* sub) { + std::lock_guard guard(m_lock); if(sub!=nullptr) { for(auto it = m_participants.begin();it!= m_participants.end();++it) @@ -132,6 +137,7 @@ Participant* Domain::createParticipant(c Participant* Domain::createParticipant(ParticipantAttributes& att,ParticipantListener* listen) { + std::lock_guard guard(m_lock); Participant* pubsubpar = new Participant(); ParticipantImpl* pspartimpl = new ParticipantImpl(att,pubsubpar,listen); RTPSParticipant* part = RTPSDomain::createParticipant(att.rtps,&pspartimpl->m_rtps_listener); @@ -165,6 +171,7 @@ Publisher* Domain::createPublisher(Parti Publisher* Domain::createPublisher(Participant *part, PublisherAttributes &att, PublisherListener *listen) { + std::lock_guard guard(m_lock); for (auto it = m_participants.begin(); it != m_participants.end(); ++it) { if(it->second->getGuid() == part->getGuid()) @@ -189,6 +196,7 @@ Subscriber* Domain::createSubscriber(Par Subscriber* Domain::createSubscriber(Participant *part, SubscriberAttributes &att, SubscriberListener *listen) { + std::lock_guard guard(m_lock); for (auto it = m_participants.begin(); it != m_participants.end(); ++it) { if(it->second->getGuid() == part->getGuid()) @@ -201,6 +209,7 @@ Subscriber* Domain::createSubscriber(Par bool Domain::getRegisteredType(Participant* part, const char* typeName, TopicDataType** type) { + std::lock_guard guard(m_lock); for (auto it = m_participants.begin(); it != m_participants.end();++it) { if(it->second->getGuid() == part->getGuid()) @@ -213,6 +222,7 @@ bool Domain::getRegisteredType(Participa bool Domain::registerType(Participant* part, TopicDataType* type) { + std::lock_guard guard(m_lock); //TODO El registro debería hacerse de manera que no tengamos un objeto del usuario sino que tengamos un objeto TopicDataTYpe propio para que no //haya problemas si el usuario lo destruye antes de tiempo. for (auto it = m_participants.begin(); it != m_participants.end();++it) @@ -227,6 +237,7 @@ bool Domain::registerType(Participant* p bool Domain::unregisterType(Participant* part, const char* typeName) { + std::lock_guard guard(m_lock); //TODO El registro debería hacerse de manera que no tengamos un objeto del usuario sino que tengamos un objeto TopicDataTYpe propio para que no //haya problemas si el usuario lo destruye antes de tiempo. for (auto it = m_participants.begin(); it != m_participants.end();++it) diff -Npur -X ex.file a/src/cpp/publisher/PublisherImpl.cpp b/src/cpp/publisher/PublisherImpl.cpp --- a/src/cpp/publisher/PublisherImpl.cpp 2019-01-30 16:06:46.357544075 +0800 +++ b/src/cpp/publisher/PublisherImpl.cpp 2019-01-30 16:12:27.877537959 +0800 @@ -117,6 +117,7 @@ bool PublisherImpl::create_new_change_wi } //TODO(Ricardo) This logic in a class. Then a user of rtps layer can use it. + high_mark_for_frag_ = 16 << 10; if(high_mark_for_frag_ == 0) { uint32_t max_data_size = mp_writer->getMaxDataSize(); diff -Npur -X ex.file a/src/cpp/rtps/builtin/BuiltinProtocols.cpp b/src/cpp/rtps/builtin/BuiltinProtocols.cpp --- a/src/cpp/rtps/builtin/BuiltinProtocols.cpp 2019-01-30 16:06:46.361544074 +0800 +++ b/src/cpp/rtps/builtin/BuiltinProtocols.cpp 2019-01-30 16:12:27.877537959 +0800 @@ -53,10 +53,12 @@ BuiltinProtocols::~BuiltinProtocols() { if(mp_PDP != nullptr) mp_PDP->announceParticipantState(true, true); // TODO Auto-generated destructor stub - if(mp_WLP!=nullptr) - delete(mp_WLP); if(mp_PDP!=nullptr) delete(mp_PDP); + + // last destroy, since PDP proxy data will use + if(mp_WLP!=nullptr) + delete(mp_WLP); } diff -Npur -X ex.file a/src/cpp/rtps/builtin/discovery/endpoint/EDP.cpp b/src/cpp/rtps/builtin/discovery/endpoint/EDP.cpp --- a/src/cpp/rtps/builtin/discovery/endpoint/EDP.cpp 2019-01-30 16:06:46.361544074 +0800 +++ b/src/cpp/rtps/builtin/discovery/endpoint/EDP.cpp 2019-01-30 16:12:27.877537959 +0800 @@ -75,6 +75,7 @@ bool EDP::newLocalReaderProxyData(RTPSRe reader->m_acceptMessagesFromUnkownWriters = false; //ADD IT TO THE LIST OF READERPROXYDATA ParticipantProxyData* pdata = nullptr; + std::lock_guard pguard(*mp_PDP->getMutex()); if(!this->mp_PDP->addReaderProxyData(rpd, false, nullptr, &pdata)) { delete(rpd); @@ -106,6 +107,7 @@ bool EDP::newLocalWriterProxyData(RTPSWr wpd->userDefinedId(writer->getAttributes()->getUserDefinedID()); //ADD IT TO THE LIST OF READERPROXYDATA ParticipantProxyData* pdata = nullptr; + std::lock_guard pguard(*mp_PDP->getMutex()); if(!this->mp_PDP->addWriterProxyData(wpd, false, nullptr, &pdata)) { delete(wpd); @@ -123,6 +125,7 @@ bool EDP::updatedLocalReader(RTPSReader* { ParticipantProxyData* pdata = nullptr; ReaderProxyData* rdata = nullptr; + std::lock_guard pguard(*mp_PDP->getMutex()); if(this->mp_PDP->lookupReaderProxyData(R->getGuid(),&rdata, &pdata)) { rdata->m_qos.setQos(rqos,false); @@ -140,6 +143,7 @@ bool EDP::updatedLocalWriter(RTPSWriter* { ParticipantProxyData* pdata = nullptr; WriterProxyData* wdata = nullptr; + std::lock_guard pguard(*mp_PDP->getMutex()); if(this->mp_PDP->lookupWriterProxyData(W->getGuid(),&wdata, &pdata)) { wdata->m_qos.setQos(wqos,false); @@ -159,6 +163,7 @@ bool EDP::removeWriterProxy(const GUID_t ParticipantProxyData* pdata = nullptr; WriterProxyData* wdata = nullptr; // Block because other thread can be removing the participant. + //std::lock_guard guard(*mp_RTPSParticipant->getParticipantMutex()); std::lock_guard pguard(*mp_PDP->getMutex()); if(this->mp_PDP->lookupWriterProxyData(writer,&wdata, &pdata)) { @@ -176,6 +181,7 @@ bool EDP::removeReaderProxy(const GUID_t ParticipantProxyData* pdata = nullptr; ReaderProxyData* rdata = nullptr; // Block because other thread can be removing the participant. + //std::lock_guard guard(*mp_RTPSParticipant->getParticipantMutex()); std::lock_guard pguard(*mp_PDP->getMutex()); if(this->mp_PDP->lookupReaderProxyData(reader,&rdata, &pdata)) { @@ -432,10 +438,10 @@ bool EDP::pairingReader(RTPSReader* R) { ParticipantProxyData* pdata = nullptr; ReaderProxyData* rdata = nullptr; + std::lock_guard pguard(*mp_PDP->getMutex()); if(this->mp_PDP->lookupReaderProxyData(R->getGuid(),&rdata, &pdata)) { logInfo(RTPS_EDP,R->getGuid()<<" in topic: \"" << rdata->topicName() <<"\""); - std::lock_guard pguard(*mp_PDP->getMutex()); for(std::vector::const_iterator pit = mp_PDP->ParticipantProxiesBegin(); pit!=mp_PDP->ParticipantProxiesEnd();++pit) { @@ -515,10 +521,10 @@ bool EDP::pairingWriter(RTPSWriter* W) { ParticipantProxyData* pdata = nullptr; WriterProxyData* wdata = nullptr; + std::lock_guard pguard(*mp_PDP->getMutex()); if(this->mp_PDP->lookupWriterProxyData(W->getGuid(),&wdata, &pdata)) { logInfo(RTPS_EDP, W->getGuid() << " in topic: \"" << wdata->topicName() <<"\""); - std::lock_guard pguard(*mp_PDP->getMutex()); for(std::vector::const_iterator pit = mp_PDP->ParticipantProxiesBegin(); pit!=mp_PDP->ParticipantProxiesEnd();++pit) { @@ -609,6 +615,7 @@ bool EDP::pairingReaderProxy(Participant lock.unlock(); ParticipantProxyData* wpdata = nullptr; WriterProxyData* wdata = nullptr; + std::lock_guard pguard(*mp_PDP->getMutex()); if(mp_PDP->lookupWriterProxyData(writerGUID,&wdata, &wpdata)) { std::unique_lock plock(*pdata->mp_mutex); @@ -769,6 +776,7 @@ bool EDP::pairingWriterProxy(Participant lock.unlock(); ParticipantProxyData* rpdata = nullptr; ReaderProxyData* rdata = nullptr; + std::lock_guard pguard(*mp_PDP->getMutex()); if(mp_PDP->lookupReaderProxyData(readerGUID, &rdata, &rpdata)) { std::unique_lock plock(*pdata->mp_mutex); diff -Npur -X ex.file a/src/cpp/rtps/builtin/discovery/endpoint/EDPSimple.cpp b/src/cpp/rtps/builtin/discovery/endpoint/EDPSimple.cpp --- a/src/cpp/rtps/builtin/discovery/endpoint/EDPSimple.cpp 2019-01-30 16:06:46.361544074 +0800 +++ b/src/cpp/rtps/builtin/discovery/endpoint/EDPSimple.cpp 2019-01-30 16:12:27.877537959 +0800 @@ -129,6 +129,8 @@ bool EDPSimple::createSEDPEndpoints() watt.times.nackResponseDelay.fraction = 0; watt.times.initialHeartbeatDelay.seconds = 0; watt.times.initialHeartbeatDelay.fraction = 0; + watt.times.heartbeatPeriod.seconds = 1; + watt.times.heartbeatPeriod.fraction = 0; if(mp_RTPSParticipant->getRTPSParticipantAttributes().throughputController.bytesPerPeriod != UINT32_MAX && mp_RTPSParticipant->getRTPSParticipantAttributes().throughputController.periodMillisecs != 0) watt.mode = ASYNCHRONOUS_WRITER; @@ -219,6 +221,8 @@ bool EDPSimple::createSEDPEndpoints() watt.times.nackResponseDelay.fraction = 0; watt.times.initialHeartbeatDelay.seconds = 0; watt.times.initialHeartbeatDelay.fraction = 0; + watt.times.heartbeatPeriod.seconds = 1; + watt.times.heartbeatPeriod.fraction = 0; if(mp_RTPSParticipant->getRTPSParticipantAttributes().throughputController.bytesPerPeriod != UINT32_MAX && mp_RTPSParticipant->getRTPSParticipantAttributes().throughputController.periodMillisecs != 0) watt.mode = ASYNCHRONOUS_WRITER; @@ -422,7 +426,7 @@ void EDPSimple::assignRemoteEndpoints(Pa watt.guid.guidPrefix = pdata->m_guid.guidPrefix; watt.guid.entityId = c_EntityId_SEDPPubWriter; watt.endpoint.unicastLocatorList = pdata->m_metatrafficUnicastLocatorList; - watt.endpoint.multicastLocatorList = pdata->m_metatrafficMulticastLocatorList; + //watt.endpoint.multicastLocatorList = pdata->m_metatrafficMulticastLocatorList; watt.endpoint.reliabilityKind = RELIABLE; watt.endpoint.durabilityKind = TRANSIENT_LOCAL; pdata->m_builtinWriters.push_back(watt); @@ -440,7 +444,7 @@ void EDPSimple::assignRemoteEndpoints(Pa ratt.guid.guidPrefix = pdata->m_guid.guidPrefix; ratt.guid.entityId = c_EntityId_SEDPPubReader; ratt.endpoint.unicastLocatorList = pdata->m_metatrafficUnicastLocatorList; - ratt.endpoint.multicastLocatorList = pdata->m_metatrafficMulticastLocatorList; + //ratt.endpoint.multicastLocatorList = pdata->m_metatrafficMulticastLocatorList; ratt.endpoint.durabilityKind = TRANSIENT_LOCAL; ratt.endpoint.reliabilityKind = RELIABLE; pdata->m_builtinReaders.push_back(ratt); @@ -457,7 +461,7 @@ void EDPSimple::assignRemoteEndpoints(Pa watt.guid.guidPrefix = pdata->m_guid.guidPrefix; watt.guid.entityId = c_EntityId_SEDPSubWriter; watt.endpoint.unicastLocatorList = pdata->m_metatrafficUnicastLocatorList; - watt.endpoint.multicastLocatorList = pdata->m_metatrafficMulticastLocatorList; + //watt.endpoint.multicastLocatorList = pdata->m_metatrafficMulticastLocatorList; watt.endpoint.reliabilityKind = RELIABLE; watt.endpoint.durabilityKind = TRANSIENT_LOCAL; pdata->m_builtinWriters.push_back(watt); @@ -475,7 +479,7 @@ void EDPSimple::assignRemoteEndpoints(Pa ratt.guid.guidPrefix = pdata->m_guid.guidPrefix; ratt.guid.entityId = c_EntityId_SEDPSubReader; ratt.endpoint.unicastLocatorList = pdata->m_metatrafficUnicastLocatorList; - ratt.endpoint.multicastLocatorList = pdata->m_metatrafficMulticastLocatorList; + //ratt.endpoint.multicastLocatorList = pdata->m_metatrafficMulticastLocatorList; ratt.endpoint.durabilityKind = TRANSIENT_LOCAL; ratt.endpoint.reliabilityKind = RELIABLE; pdata->m_builtinReaders.push_back(ratt); diff -Npur -X ex.file a/src/cpp/rtps/builtin/discovery/endpoint/EDPSimpleListeners.cpp b/src/cpp/rtps/builtin/discovery/endpoint/EDPSimpleListeners.cpp --- a/src/cpp/rtps/builtin/discovery/endpoint/EDPSimpleListeners.cpp 2019-01-30 16:06:46.361544074 +0800 +++ b/src/cpp/rtps/builtin/discovery/endpoint/EDPSimpleListeners.cpp 2019-01-30 16:12:27.877537959 +0800 @@ -72,6 +72,7 @@ void EDPSimplePUBListener::onNewCacheCha //LOOK IF IS AN UPDATED INFORMATION WriterProxyData* wdata = nullptr; ParticipantProxyData* pdata = nullptr; + std::lock_guard pguard(*mp_SEDP->mp_PDP->getMutex()); if(this->mp_SEDP->mp_PDP->addWriterProxyData(&writerProxyData,true,&wdata,&pdata)) //ADDED NEW DATA { //CHECK the locators: @@ -207,6 +208,7 @@ void EDPSimpleSUBListener::onNewCacheCha //LOOK IF IS AN UPDATED INFORMATION ReaderProxyData* rdata = nullptr; ParticipantProxyData* pdata = nullptr; + std::lock_guard pguard(*mp_SEDP->mp_PDP->getMutex()); if(this->mp_SEDP->mp_PDP->addReaderProxyData(&readerProxyData,true,&rdata,&pdata)) //ADDED NEW DATA { pdata->mp_mutex->lock(); diff -Npur -X ex.file a/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp --- a/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp 2019-01-30 16:06:46.361544074 +0800 +++ b/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp 2019-01-30 16:12:27.877537959 +0800 @@ -221,7 +221,8 @@ void PDPSimple::announceParticipantState { logInfo(RTPS_PDP,"Announcing RTPSParticipant State (new change: "<< new_change <<")"); CacheChange_t* change = nullptr; - + bool unsent_change = false; + { std::lock_guard guardPDP(*this->mp_mutex); if(!dispose) @@ -257,7 +258,8 @@ void PDPSimple::announceParticipantState } else { - mp_SPDPWriter->unsent_changes_reset(); + unsent_change = true; + //mp_SPDPWriter->unsent_changes_reset(); } } else @@ -286,7 +288,10 @@ void PDPSimple::announceParticipantState mp_SPDPWriterHistory->add_change(change); } } - + } + if (unsent_change) { + mp_SPDPWriter->unsent_changes_reset(); + } } bool PDPSimple::lookupReaderProxyData(const GUID_t& reader, ReaderProxyData** rdata, ParticipantProxyData** pdata) @@ -549,6 +554,7 @@ void PDPSimple::assignRemoteEndpoints(Pa uint32_t auxendp = endp; auxendp &=DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER; // TODO Review because the mutex is already take in PDPSimpleListener. + std::lock_guard guard_pdpsimple(*mp_mutex); std::lock_guard guard(*pdata->mp_mutex); if(auxendp!=0) { @@ -637,6 +643,7 @@ bool PDPSimple::removeRemoteParticipant( break; } } + guardPDP.unlock(); if(pdata !=nullptr) { @@ -672,7 +679,7 @@ bool PDPSimple::removeRemoteParticipant( } pdata->mp_mutex->unlock(); - guardPDP.unlock(); + //guardPDP.unlock(); guardW.unlock(); guardR.unlock(); diff -Npur -X ex.file a/src/cpp/rtps/builtin/discovery/participant/PDPSimpleListener.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPSimpleListener.cpp --- a/src/cpp/rtps/builtin/discovery/participant/PDPSimpleListener.cpp 2019-01-30 16:06:46.361544074 +0800 +++ b/src/cpp/rtps/builtin/discovery/participant/PDPSimpleListener.cpp 2019-01-30 16:12:27.877537959 +0800 @@ -45,10 +45,12 @@ namespace rtps { -void PDPSimpleListener::onNewCacheChangeAdded(RTPSReader* reader, const CacheChange_t* const change_in) +void PDPSimpleListener::onNewCacheChangeAdded(RTPSReader* /*reader*/, const CacheChange_t* const change_in) { + ParticipantProxyData m_ParticipantProxyData; CacheChange_t* change = (CacheChange_t*)(change_in); - std::lock_guard rguard(*reader->getMutex()); + //std::lock_guard rguard(*reader->getMutex()); + logInfo(RTPS_PDP,"SPDP Message received"); if(change->instanceHandle == c_InstanceHandle_Unknown) { @@ -81,6 +83,10 @@ void PDPSimpleListener::onNewCacheChange //LOOK IF IS AN UPDATED INFORMATION ParticipantProxyData* pdata_ptr = nullptr; bool found = false; + RTPSParticipantDiscoveryInfo info; + + //lock mp_SPDP mutex. + { std::lock_guard guard(*mp_SPDP->getMutex()); for (auto it = mp_SPDP->m_participantProxies.begin(); it != mp_SPDP->m_participantProxies.end();++it) @@ -92,17 +98,25 @@ void PDPSimpleListener::onNewCacheChange break; } } - RTPSParticipantDiscoveryInfo info; info.m_guid = m_ParticipantProxyData.m_guid; info.m_RTPSParticipantName = m_ParticipantProxyData.m_participantName; info.m_propertyList = m_ParticipantProxyData.m_properties.properties; info.m_userData = m_ParticipantProxyData.m_userData; + if (found) + { + info.m_status = CHANGED_QOS_RTPSPARTICIPANT; + std::lock_guard pguard(*pdata_ptr->mp_mutex); + pdata_ptr->updateData(m_ParticipantProxyData); + if(mp_SPDP->m_discovery.use_STATIC_EndpointDiscoveryProtocol) + mp_SPDP->mp_EDP->assignRemoteEndpoints(&m_ParticipantProxyData); + } + } // mp_SPDP lock guard end. + if(!found) { info.m_status = DISCOVERED_RTPSPARTICIPANT; //IF WE DIDNT FOUND IT WE MUST CREATE A NEW ONE ParticipantProxyData* pdata = new ParticipantProxyData(); - std::lock_guard pguard(*pdata->mp_mutex); pdata->copy(m_ParticipantProxyData); pdata_ptr = pdata; pdata_ptr->isAlive = true; @@ -110,18 +124,14 @@ void PDPSimpleListener::onNewCacheChange pdata_ptr, TimeConv::Time_t2MilliSecondsDouble(pdata_ptr->m_leaseDuration)); pdata_ptr->mp_leaseDurationTimer->restart_timer(); + { + std::lock_guard guard(*mp_SPDP->getMutex()); this->mp_SPDP->m_participantProxies.push_back(pdata_ptr); mp_SPDP->assignRemoteEndpoints(pdata_ptr); + } mp_SPDP->announceParticipantState(false); } - else - { - info.m_status = CHANGED_QOS_RTPSPARTICIPANT; - std::lock_guard pguard(*pdata_ptr->mp_mutex); - pdata_ptr->updateData(m_ParticipantProxyData); - if(mp_SPDP->m_discovery.use_STATIC_EndpointDiscoveryProtocol) - mp_SPDP->mp_EDP->assignRemoteEndpoints(&m_ParticipantProxyData); - } + if(this->mp_SPDP->getRTPSParticipant()->getListener()!=nullptr) this->mp_SPDP->getRTPSParticipant()->getListener()->onRTPSParticipantDiscovery( this->mp_SPDP->getRTPSParticipant()->getUserRTPSParticipant(), @@ -133,7 +143,11 @@ void PDPSimpleListener::onNewCacheChange { GUID_t guid; iHandle2GUID(guid,change->instanceHandle); - this->mp_SPDP->removeRemoteParticipant(guid); + { + std::lock_guard guard(*mp_SPDP->getMutex()); + this->mp_SPDP->removeRemoteParticipant(guid); + } + RTPSParticipantDiscoveryInfo info; info.m_status = REMOVED_RTPSPARTICIPANT; info.m_guid = guid; @@ -151,10 +165,14 @@ void PDPSimpleListener::onNewCacheChange bool PDPSimpleListener::getKey(CacheChange_t* change) { + CDRMessage_t aux_msg; SerializedPayload_t* pl = &change->serializedPayload; CDRMessage::initCDRMsg(&aux_msg); - // TODO CHange because it create a buffer to remove after. - free(aux_msg.buffer); + + if (aux_msg.buffer) { + // TODO CHange because it create a buffer to remove after. + free(aux_msg.buffer); + } aux_msg.buffer = pl->data; aux_msg.length = pl->length; aux_msg.max_size = pl->max_size; diff -Npur -X ex.file a/src/cpp/rtps/builtin/discovery/participant/timedevent/RemoteParticipantLeaseDuration.cpp b/src/cpp/rtps/builtin/discovery/participant/timedevent/RemoteParticipantLeaseDuration.cpp --- a/src/cpp/rtps/builtin/discovery/participant/timedevent/RemoteParticipantLeaseDuration.cpp 2019-01-30 16:06:46.361544074 +0800 +++ b/src/cpp/rtps/builtin/discovery/participant/timedevent/RemoteParticipantLeaseDuration.cpp 2019-01-30 16:12:27.877537959 +0800 @@ -73,7 +73,11 @@ void RemoteParticipantLeaseDuration::eve mp_participantProxyData->mp_mutex->lock(); mp_participantProxyData->mp_leaseDurationTimer = nullptr; mp_participantProxyData->mp_mutex->unlock(); - mp_PDP->removeRemoteParticipant(mp_participantProxyData->m_guid); + + { + std::lock_guard guard(*mp_PDP->getMutex()); + mp_PDP->removeRemoteParticipant(mp_participantProxyData->m_guid); + } if(mp_PDP->getRTPSParticipant()->getListener()!=nullptr) mp_PDP->getRTPSParticipant()->getListener()->onRTPSParticipantDiscovery( diff -Npur -X ex.file a/src/cpp/rtps/messages/CDRMessagePool.cpp b/src/cpp/rtps/messages/CDRMessagePool.cpp --- a/src/cpp/rtps/messages/CDRMessagePool.cpp 2019-01-30 16:06:46.361544074 +0800 +++ b/src/cpp/rtps/messages/CDRMessagePool.cpp 2019-01-30 16:12:27.877537959 +0800 @@ -57,12 +57,13 @@ void CDRMessagePool::allocateGroup(uint1 CDRMessagePool::~CDRMessagePool() { +#if 0 for(std::vector::iterator it=m_all_objects.begin(); it!=m_all_objects.end();++it) { delete(*it); } - +#endif if(mutex_ != nullptr) delete mutex_; } diff -Npur -X ex.file a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp 2019-01-30 16:06:46.361544074 +0800 +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp 2019-01-30 16:12:27.877537959 +0800 @@ -124,6 +124,7 @@ RTPSParticipantImpl::RTPSParticipantImpl for (const auto& transportDescriptor : PParam.userTransports) m_network_Factory.RegisterTransport(transportDescriptor.get()); +{ std::lock_guard guard(*mp_mutex); mp_userParticipant->mp_impl = this; Locator_t loc; @@ -327,6 +328,7 @@ RTPSParticipantImpl::RTPSParticipantImpl // Start security m_security_manager.init(); #endif +} //START BUILTIN PROTOCOLS mp_builtinProtocols = new BuiltinProtocols(); @@ -787,7 +789,7 @@ bool RTPSParticipantImpl::assignEndpoint LocatorList_t finalList; for(auto lit = list.begin();lit != list.end();++lit){ //Iteration of all Locators within the Locator list passed down as argument - std::lock_guard guard(*mp_mutex); + std::lock_guard guard(m_receive_resources_mutex); //Check among ReceiverResources whether the locator is supported or not for (auto it = m_receiverResourcelist.begin(); it != m_receiverResourcelist.end(); ++it){ //Take mutex for the resource since we are going to interact with shared resources @@ -851,6 +853,7 @@ void RTPSParticipantImpl::createReceiver for(auto it_buffer = newItemsBuffer.begin(); it_buffer != newItemsBuffer.end(); ++it_buffer) { + std::lock_guard guard(m_receive_resources_mutex); //Push the new items into the ReceiverResource buffer m_receiverResourcelist.push_back(ReceiverControlBlock(std::move(*it_buffer))); //Create and init the MessageReceiver @@ -868,9 +871,11 @@ void RTPSParticipantImpl::createReceiver bool RTPSParticipantImpl::deleteUserEndpoint(Endpoint* p_endpoint) { + m_receive_resources_mutex.lock(); for(auto it=m_receiverResourcelist.begin();it!=m_receiverResourcelist.end();++it){ (*it).mp_receiver->removeEndpoint(p_endpoint); } + m_receive_resources_mutex.unlock(); bool found = false; { if(p_endpoint->getAttributes()->endpointKind == WRITER) @@ -924,12 +929,13 @@ bool RTPSParticipantImpl::deleteUserEndp if(!found) return false; //REMOVE FOR BUILTINPROTOCOLS + //std::lock_guard guardParticipant(*mp_mutex); if(p_endpoint->getAttributes()->endpointKind == WRITER) mp_builtinProtocols->removeLocalWriter((RTPSWriter*)p_endpoint); else mp_builtinProtocols->removeLocalReader((RTPSReader*)p_endpoint); //BUILTINPROTOCOLS - std::lock_guard guardParticipant(*mp_mutex); + // std::lock_guard guardParticipant(*mp_mutex); } // std::lock_guard guardEndpoint(*p_endpoint->getMutex()); delete(p_endpoint); diff -Npur -X ex.file a/src/cpp/rtps/participant/RTPSParticipantImpl.h b/src/cpp/rtps/participant/RTPSParticipantImpl.h --- a/src/cpp/rtps/participant/RTPSParticipantImpl.h 2019-01-30 16:06:46.361544074 +0800 +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.h 2019-01-30 16:12:27.877537959 +0800 @@ -261,6 +261,7 @@ class RTPSParticipantImpl #endif //!ReceiverControlBlock list - encapsulates all associated resources on a Receiving element + std::mutex m_receive_resources_mutex; std::list m_receiverResourcelist; //!SenderResource List std::mutex m_send_resources_mutex; diff -Npur -X ex.file a/src/cpp/rtps/reader/FragmentedChangePitStop.cpp b/src/cpp/rtps/reader/FragmentedChangePitStop.cpp --- a/src/cpp/rtps/reader/FragmentedChangePitStop.cpp 2019-01-30 16:06:25.529544448 +0800 +++ b/src/cpp/rtps/reader/FragmentedChangePitStop.cpp 2019-01-30 16:12:27.877537959 +0800 @@ -18,7 +18,7 @@ using namespace eprosima::fastrtps::rtps; -CacheChange_t* FragmentedChangePitStop::process(CacheChange_t* incoming_change, uint32_t sampleSize, uint32_t fragmentStartingNum) +CacheChange_t* FragmentedChangePitStop::process(CacheChange_t* incoming_change, uint32_t sampleSize, uint32_t fragmentStartingNum, bool &has_hole, CacheChange_t * & r_change) { CacheChange_t* returnedValue = nullptr; @@ -88,13 +88,23 @@ CacheChange_t* FragmentedChangePitStop:: } // If was updated, check if it is completed. + uint32_t hole_index = 0; if(was_updated) { auto fit = original_change_cit->getChange()->getDataFragments()->begin(); for(; fit != original_change_cit->getChange()->getDataFragments()->end(); ++fit) { - if(*fit == ChangeFragmentStatus_t::NOT_PRESENT) + ++hole_index; + if(*fit == ChangeFragmentStatus_t::NOT_PRESENT) { + if (hole_index < fragmentStartingNum) { + if (hole_index != _hole_last) { + _hole_last = hole_index; + r_change = original_change_cit->getChange(); + has_hole = true; + } + } break; + } } // If it is completed, return CacheChange_t and remove information. diff -Npur -X ex.file a/src/cpp/rtps/reader/FragmentedChangePitStop.h b/src/cpp/rtps/reader/FragmentedChangePitStop.h --- a/src/cpp/rtps/reader/FragmentedChangePitStop.h 2019-01-30 16:06:46.361544074 +0800 +++ b/src/cpp/rtps/reader/FragmentedChangePitStop.h 2019-01-30 16:12:27.877537959 +0800 @@ -105,7 +105,7 @@ namespace eprosima * @return If a CacheChange_t is completed with new incomming fragments, this will be returned. * In other case nullptr is returned. */ - CacheChange_t* process(CacheChange_t* incoming_change, uint32_t sampleSize, uint32_t fragmentStartingNum); + CacheChange_t* process(CacheChange_t* incoming_change, uint32_t sampleSize, uint32_t fragmentStartingNum, bool & has_hole,CacheChange_t* & r_change); /*! * @brief Search if there is a CacheChange_t, giving SequenceNumber_t and writer GUID_t, @@ -139,7 +139,7 @@ namespace eprosima private: std::unordered_multiset changes_; - + uint32_t _hole_last = 0; RTPSReader* parent_; FragmentedChangePitStop(const FragmentedChangePitStop&) NON_COPYABLE_CXX11; diff -Npur -X ex.file a/src/cpp/rtps/reader/StatefulReader.cpp b/src/cpp/rtps/reader/StatefulReader.cpp --- a/src/cpp/rtps/reader/StatefulReader.cpp 2019-01-30 16:06:46.365544074 +0800 +++ b/src/cpp/rtps/reader/StatefulReader.cpp 2019-01-30 16:12:27.877537959 +0800 @@ -29,6 +29,8 @@ #include "FragmentedChangePitStop.h" #include +#include + #include #include @@ -70,7 +72,7 @@ bool StatefulReader::matched_writer_add( if((*it)->m_att.guid == wdata.guid) { logInfo(RTPS_READER,"Attempting to add existing writer"); - return false; + return true; } } WriterProxy* wp = new WriterProxy(wdata, this); @@ -296,11 +298,14 @@ bool StatefulReader::processDataFragMsg( } } #endif - // Fragments manager has to process incomming fragments. // If CacheChange_t is completed, it will be returned; - CacheChange_t* change_completed = fragmentedChangePitStop_->process(change_to_add, sampleSize, fragmentStartingNum); - + bool has_hole = false; + CacheChange_t *orig_change; + CacheChange_t* change_completed = fragmentedChangePitStop_->process(change_to_add, sampleSize, fragmentStartingNum, has_hole, orig_change); + //if (has_hole) { + //sendFragAck(pWP, orig_change); + //} #if HAVE_SECURITY if(is_payload_protected()) releaseCache(change_to_add); @@ -707,3 +712,48 @@ bool StatefulReader::isInCleanState() co return cleanState; } + +void StatefulReader::sendFragAck(WriterProxy *mp_WP, CacheChange_t * cit) { + RTPSMessageGroup_t m_cdrmessages(mp_WP->mp_SFR->getRTPSParticipant()->getMaxMessageSize(), + mp_WP->mp_SFR->getRTPSParticipant()->getGuid().guidPrefix); + + std::lock_guard guard(*mp_WP->mp_SFR->getMutex()); + RTPSMessageGroup group(mp_WP->mp_SFR->getRTPSParticipant(), mp_WP->mp_SFR, RTPSMessageGroup::READER, m_cdrmessages); + LocatorList_t locators(mp_WP->m_att.endpoint.unicastLocatorList); + + { + FragmentNumberSet_t frag_sns; + + // Search first fragment not present. + uint32_t frag_num = 0; + auto fit = cit->getDataFragments()->begin(); + for(; fit != cit->getDataFragments()->end(); ++fit) + { + ++frag_num; + if(*fit == ChangeFragmentStatus_t::NOT_PRESENT) + break; + } + + // Never should happend. + assert(frag_num != 0); + assert(fit != cit->getDataFragments()->end()); + + // Store FragmentNumberSet_t base. + frag_sns.base = frag_num; + + // Fill the FragmentNumberSet_t bitmap. + for(; fit != cit->getDataFragments()->end(); ++fit) + { + if(*fit == ChangeFragmentStatus_t::NOT_PRESENT) { + frag_sns.add(frag_num); + } else { + break; + } + ++frag_num; + } + + ++mp_WP->m_nackfragCount; + group.add_nackfrag(mp_WP->m_att.guid, cit->sequenceNumber, frag_sns, mp_WP->m_nackfragCount, locators); + } + +} diff -Npur -X ex.file a/src/cpp/rtps/reader/StatelessReader.cpp b/src/cpp/rtps/reader/StatelessReader.cpp --- a/src/cpp/rtps/reader/StatelessReader.cpp 2019-01-30 16:06:46.365544074 +0800 +++ b/src/cpp/rtps/reader/StatelessReader.cpp 2019-01-30 16:12:27.877537959 +0800 @@ -209,7 +209,9 @@ bool StatelessReader::processDataMsg(Cac if(getGuid().entityId == c_EntityId_SPDPReader) { + lock.unlock(); mp_RTPSParticipant->assertRemoteRTPSParticipantLiveliness(change->writerGUID.guidPrefix); + lock.lock(); } } } @@ -232,7 +234,9 @@ bool StatelessReader::processDataFragMsg // Fragments manager has to process incomming fragments. // If CacheChange_t is completed, it will be returned; - CacheChange_t* change_completed = fragmentedChangePitStop_->process(incomingChange, sampleSize, fragmentStartingNum); + bool has_hole = false; + CacheChange_t* tmp_cache; + CacheChange_t* change_completed = fragmentedChangePitStop_->process(incomingChange, sampleSize, fragmentStartingNum, has_hole, tmp_cache); // Try to remove previous CacheChange_t from PitStop. fragmentedChangePitStop_->try_to_remove_until(incomingChange->sequenceNumber, incomingChange->writerGUID); @@ -276,7 +280,9 @@ bool StatelessReader::processDataFragMsg // Assert liveliness because if it is a participant discovery info. if (getGuid().entityId == c_EntityId_SPDPReader) { + lock.unlock(); mp_RTPSParticipant->assertRemoteRTPSParticipantLiveliness(incomingChange->writerGUID.guidPrefix); + lock.lock(); } // Release CacheChange_t. diff -Npur -X ex.file a/src/cpp/rtps/reader/timedevent/HeartbeatResponseDelay.cpp b/src/cpp/rtps/reader/timedevent/HeartbeatResponseDelay.cpp --- a/src/cpp/rtps/reader/timedevent/HeartbeatResponseDelay.cpp 2019-01-30 16:06:46.365544074 +0800 +++ b/src/cpp/rtps/reader/timedevent/HeartbeatResponseDelay.cpp 2019-01-30 16:12:27.877537959 +0800 @@ -133,9 +133,9 @@ void HeartbeatResponseDelay::event(Event // Fill the FragmentNumberSet_t bitmap. for(; fit != cit->getDataFragments()->end(); ++fit) { - if(*fit == ChangeFragmentStatus_t::NOT_PRESENT) + if(*fit == ChangeFragmentStatus_t::NOT_PRESENT) { frag_sns.add(frag_num); - + } ++frag_num; } diff -Npur -X ex.file a/src/cpp/rtps/writer/ReaderProxy.cpp b/src/cpp/rtps/writer/ReaderProxy.cpp --- a/src/cpp/rtps/writer/ReaderProxy.cpp 2019-01-30 16:06:46.365544074 +0800 +++ b/src/cpp/rtps/writer/ReaderProxy.cpp 2019-01-30 16:12:27.877537959 +0800 @@ -198,6 +198,13 @@ void ReaderProxy::set_change_to_status(c auto it = m_changesForReader.find(ChangeForReader_t(seq_num)); bool mustWakeUpAsyncThread = false; + if ((m_att.endpoint.reliabilityKind != RELIABLE) + && (status == ACKNOWLEDGED)) { + if (it != m_changesForReader.end()) { + m_changesForReader.erase(m_changesForReader.begin(), it); + } + } + if(it != m_changesForReader.end()) { if(status == ACKNOWLEDGED && it == m_changesForReader.begin()) @@ -300,6 +307,7 @@ void ReaderProxy::setNotValid(CacheChang // Element must be in the container. In other case, bug. assert(chit != m_changesForReader.end()); +#if 0 if(chit == m_changesForReader.begin()) { assert(chit->getStatus() != ACKNOWLEDGED); @@ -327,6 +335,11 @@ void ReaderProxy::setNotValid(CacheChang m_changesForReader.insert(hint, newch); } +#endif + + m_changesForReader.erase(chit); + if (changesFromRLowMark_ < change->sequenceNumber) + changesFromRLowMark_ = change->sequenceNumber; } bool ReaderProxy::thereIsUnacknowledged() const diff -Npur -X ex.file a/src/cpp/rtps/writer/StatefulWriter.cpp b/src/cpp/rtps/writer/StatefulWriter.cpp --- a/src/cpp/rtps/writer/StatefulWriter.cpp 2019-01-30 16:06:46.365544074 +0800 +++ b/src/cpp/rtps/writer/StatefulWriter.cpp 2019-01-30 16:12:27.877537959 +0800 @@ -54,7 +54,8 @@ StatefulWriter::StatefulWriter(RTPSParti mp_periodicHB(nullptr), m_times(att.times), all_acked_mutex_(nullptr), all_acked_(false), all_acked_cond_(nullptr), disableHeartbeatPiggyback_(att.disableHeartbeatPiggyback), - sendBufferSize_(pimpl->get_min_network_send_buffer_size()), + //sendBufferSize_(pimpl->get_min_network_send_buffer_size()), + sendBufferSize_(32 << 10), currentUsageSendBufferSize_(static_cast(pimpl->get_min_network_send_buffer_size())) { m_heartbeatCount = 0; @@ -145,7 +146,7 @@ void StatefulWriter::unsent_change_added // Heartbeat piggyback. if(!disableHeartbeatPiggyback_) { - if(mp_history->isFull()) + if (0) //(mp_history->isFull()) { send_heartbeat_nts_(mAllRemoteReaders, mAllShrinkedLocatorList, group); } @@ -279,8 +280,9 @@ void StatefulWriter::send_any_unsent_cha { activateHeartbeatPeriod = true; assert(remoteReader->mp_nackSupression != nullptr); - if(allFragmentsSent) + if(allFragmentsSent) { remoteReader->mp_nackSupression->restart_timer(); + } } } } @@ -319,7 +321,7 @@ void StatefulWriter::send_any_unsent_cha // Heartbeat piggyback. if(!disableHeartbeatPiggyback_) { - if(mp_history->isFull()) + if(0) //(mp_history->isFull()) { send_heartbeat_nts_(mAllRemoteReaders, mAllShrinkedLocatorList, group); } @@ -386,7 +388,7 @@ bool StatefulWriter::matched_reader_add( if((*it)->m_att.guid == rdata.guid) { logInfo(RTPS_WRITER, "Attempting to add existing reader" << endl); - return false; + return true; } allRemoteReaders.push_back((*it)->m_att.guid); diff -Npur -X ex.file a/src/cpp/transport/UDPv4Transport.cpp b/src/cpp/transport/UDPv4Transport.cpp --- a/src/cpp/transport/UDPv4Transport.cpp 2019-01-30 16:06:46.365544074 +0800 +++ b/src/cpp/transport/UDPv4Transport.cpp 2019-01-30 16:12:27.873537959 +0800 @@ -30,7 +30,7 @@ static const uint32_t maximumMessageSize static const uint32_t minimumSocketBuffer = 65536; static const uint8_t defaultTTL = 1; -static void GetIP4s(vector& locNames, bool return_loopback = false) +static void GetIP4s(vector& locNames, bool return_loopback = true) { IPFinder::getIPs(&locNames, return_loopback); auto newEnd = remove_if(locNames.begin(), @@ -106,7 +106,6 @@ bool UDPv4Transport::init() socket_base::send_buffer_size option; socket.get_option(option); mConfiguration_.sendBufferSize = option.value(); - if(mConfiguration_.sendBufferSize < minimumSocketBuffer) { mConfiguration_.sendBufferSize = minimumSocketBuffer; @@ -210,11 +209,15 @@ bool UDPv4Transport::OpenInputChannel(co for (const auto& infoIP : locNames) { auto ip = asio::ip::address_v4::from_string(infoIP.name); + try { #if defined(ASIO_HAS_MOVE) - socket.set_option(ip::multicast::join_group(ip::address_v4::from_string(locator.to_IP4_string()), ip)); + socket.set_option(ip::multicast::join_group(ip::address_v4::from_string(locator.to_IP4_string()), ip)); #else - socket->set_option(ip::multicast::join_group(ip::address_v4::from_string(locator.to_IP4_string()), ip)); + socket->set_option(ip::multicast::join_group(ip::address_v4::from_string(locator.to_IP4_string()), ip)); #endif + } catch (std::exception &e) { + logWarning(RTPS_MSG_OUT, "Ignore nic bind mutiple ips error."); + } } } @@ -413,6 +416,10 @@ asio::ip::udp::socket UDPv4Transport::Op { ip::udp::socket socket(mService); socket.open(ip::udp::v4()); + if (mSendBufferSize < (1 << 20)) { + mSendBufferSize = 1 << 20; + } + if(mSendBufferSize != 0) socket.set_option(socket_base::send_buffer_size(mSendBufferSize)); socket.set_option(ip::multicast::hops(mConfiguration_.TTL)); @@ -448,6 +455,10 @@ asio::ip::udp::socket UDPv4Transport::Op { ip::udp::socket socket(mService); socket.open(ip::udp::v4()); + if (mReceiveBufferSize < 1 << 20) { + mReceiveBufferSize = 1 << 20; + } + if(mReceiveBufferSize != 0) socket.set_option(socket_base::receive_buffer_size(mReceiveBufferSize)); if(is_multicast) diff -Npur -X ex.file a/thirdparty/fastcdr/include/fastcdr/Cdr.h b/thirdparty/fastcdr/include/fastcdr/Cdr.h --- a/thirdparty/fastcdr/include/fastcdr/Cdr.h 2019-01-30 16:07:33.657543227 +0800 +++ b/thirdparty/fastcdr/include/fastcdr/Cdr.h 2019-01-30 16:12:27.881537959 +0800 @@ -726,6 +726,7 @@ namespace eprosima * @exception exception::NotEnoughMemoryException This exception is thrown when trying to serialize a position that exceeds the internal memory size. */ Cdr& serialize(const char *string_t); + Cdr& serialize(const char *string_t, size_t length); /*! * @brief This function serializes a string with a different endianness. @@ -735,6 +736,7 @@ namespace eprosima * @exception exception::NotEnoughMemoryException This exception is thrown when trying to serialize a position that exceeds the internal memory size. */ Cdr& serialize(const char *string_t, Endianness endianness); + Cdr& serialize(const char *string_t, size_t length, Endianness endianness); //TODO inline Cdr& serialize(char *string_t) {return serialize((const char*)string_t);} @@ -749,7 +751,7 @@ namespace eprosima * @exception exception::NotEnoughMemoryException This exception is thrown when trying to serialize a position that exceeds the internal memory size. */ inline - Cdr& serialize(const std::string &string_t) {return serialize(string_t.c_str());} + Cdr& serialize(const std::string &string_t) {return serialize(string_t.c_str(), string_t.size());} /*! * @brief This function serializes a std::string with a different endianness. @@ -759,7 +761,7 @@ namespace eprosima * @exception exception::NotEnoughMemoryException This exception is thrown when trying to serialize a position that exceeds the internal memory size. */ inline - Cdr& serialize(const std::string &string_t, Endianness endianness) {return serialize(string_t.c_str(), endianness);} + Cdr& serialize(const std::string &string_t, Endianness endianness) {return serialize(string_t.c_str(), string_t.size(), endianness);} #if HAVE_CXX0X /*! diff -Npur -X ex.file a/thirdparty/fastcdr/src/cpp/Cdr.cpp b/thirdparty/fastcdr/src/cpp/Cdr.cpp --- a/thirdparty/fastcdr/src/cpp/Cdr.cpp 2019-01-30 16:07:33.657543227 +0800 +++ b/thirdparty/fastcdr/src/cpp/Cdr.cpp 2019-01-30 16:12:27.877537959 +0800 @@ -570,6 +570,38 @@ Cdr& Cdr::serialize(const char *string_t return *this; } +Cdr& Cdr::serialize(const char *string_t, size_t str_length) +{ + uint32_t length = 0; + + if(string_t != nullptr) + length = (uint32_t)str_length + 1; + + if(length > 0) + { + Cdr::state state(*this); + serialize(length); + + if(((m_lastPosition - m_currentPosition) >= length) || resize(length)) + { + // Save last datasize. + m_lastDataSize = sizeof(uint8_t); + + m_currentPosition.memcopy(string_t, length); + m_currentPosition += length; + } + else + { + setState(state); + throw NotEnoughMemoryException(NotEnoughMemoryException::NOT_ENOUGH_MEMORY_MESSAGE_DEFAULT); + } + } + else + serialize(length); + + return *this; +} + Cdr& Cdr::serialize(const char *string_t, Endianness endianness) { bool auxSwap = m_swapBytes; @@ -581,6 +613,25 @@ Cdr& Cdr::serialize(const char *string_t m_swapBytes = auxSwap; } catch(Exception &ex) + { + m_swapBytes = auxSwap; + ex.raise(); + } + + return *this; +} + +Cdr& Cdr::serialize(const char *string_t, size_t length, Endianness endianness) +{ + bool auxSwap = m_swapBytes; + m_swapBytes = (m_swapBytes && (m_endianness == endianness)) || (!m_swapBytes && (m_endianness != endianness)); + + try + { + serialize(string_t, length); + m_swapBytes = auxSwap; + } + catch(Exception &ex) { m_swapBytes = auxSwap; ex.raise();