summaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/prod_protocol_rtc.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols/prod_protocol_rtc.cc')
-rw-r--r--libtransport/src/protocols/prod_protocol_rtc.cc426
1 files changed, 121 insertions, 305 deletions
diff --git a/libtransport/src/protocols/prod_protocol_rtc.cc b/libtransport/src/protocols/prod_protocol_rtc.cc
index e49f58167..cb8dff6e4 100644
--- a/libtransport/src/protocols/prod_protocol_rtc.cc
+++ b/libtransport/src/protocols/prod_protocol_rtc.cc
@@ -43,9 +43,6 @@ RTCProductionProtocol::RTCProductionProtocol(
last_produced_data_ts_(0),
last_round_(utils::SteadyTime::nowMs().count()),
allow_delayed_nacks_(false),
- queue_timer_on_(false),
- consumer_in_sync_(false),
- on_consumer_in_sync_(nullptr),
pending_fec_pace_(false),
max_len_(0),
queue_len_(0),
@@ -54,8 +51,6 @@ RTCProductionProtocol::RTCProductionProtocol(
std::uniform_int_distribution<> dis(0, 255);
prod_label_ = dis(gen_);
cache_label_ = (prod_label_ + 1) % 256;
- interests_queue_timer_ =
- std::make_unique<asio::steady_timer>(portal_->getThread().getIoService());
round_timer_ =
std::make_unique<asio::steady_timer>(portal_->getThread().getIoService());
fec_pacing_timer_ =
@@ -69,16 +64,7 @@ RTCProductionProtocol::~RTCProductionProtocol() {}
void RTCProductionProtocol::setProducerParam() {
// Flow name: here we assume there is only one prefix registered in the portal
- flow_name_ = portal_->getServedNamespaces().begin()->getName();
-
- // Manifest
- uint32_t making_manifest;
- socket_->getSocketOption(interface::GeneralTransportOptions::MAKE_MANIFEST,
- making_manifest);
-
- // Signer
- std::shared_ptr<auth::Signer> signer;
- socket_->getSocketOption(interface::GeneralTransportOptions::SIGNER, signer);
+ flow_name_ = portal_->getServedNamespaces().begin()->makeName();
// Default format
core::Packet::Format default_format;
@@ -94,15 +80,22 @@ void RTCProductionProtocol::setProducerParam() {
socket_->getSocketOption(interface::RtcTransportOptions::AGGREGATED_DATA,
data_aggregation_);
- size_t signature_size = signer->getSignatureFieldSize();
- data_header_format_ = {
- !making_manifest ? Packet::toAHFormat(default_format) : default_format,
- !making_manifest ? signature_size : 0};
+ size_t signature_size = signer_->getSignatureFieldSize();
+ data_header_format_ = {!manifest_max_capacity_
+ ? Packet::toAHFormat(default_format)
+ : default_format,
+ !manifest_max_capacity_ ? signature_size : 0};
manifest_header_format_ = {Packet::toAHFormat(default_format),
signature_size};
nack_header_format_ = {Packet::toAHFormat(default_format), signature_size};
fec_header_format_ = {Packet::toAHFormat(default_format), signature_size};
+ // Initialize verifier for aggregated interests
+ std::shared_ptr<auth::Verifier> verifier;
+ socket_->getSocketOption(implementation::GeneralTransportOptions::VERIFIER,
+ verifier);
+ verifier_ = std::make_shared<rtc::RTCVerifier>(verifier, 0, 0);
+
// Schedule round timer
scheduleRoundTimer();
}
@@ -143,15 +136,17 @@ void RTCProductionProtocol::updateStats(bool new_round) {
packets_production_rate_ =
ceil((double)(produced_packets_ + prev_produced_packets_) * per_second);
- // add fec packets looking at the fec code. we don't use directly the number
- // of fec packets produced in 1 round because it may happen that different
- // numbers of blocks are generated during the rounds and this creates
- // inconsistencies in the estimation of the production rate
- uint32_t k = fec::FECUtils::getSourceSymbols(fec_type_);
- uint32_t n = fec::FECUtils::getBlockSymbols(fec_type_);
+ if (fec_encoder_ && fec_type_ != fec::FECType::UNKNOWN) {
+ // add fec packets looking at the fec code. we don't use directly the number
+ // of fec packets produced in 1 round because it may happen that different
+ // numbers of blocks are generated during the rounds and this creates
+ // inconsistencies in the estimation of the production rate
+ uint32_t k = fec::FECUtils::getSourceSymbols(fec_type_);
+ uint32_t n = fec::FECUtils::getBlockSymbols(fec_type_);
- packets_production_rate_ +=
- ceil((double)packets_production_rate_ / (double)k) * (n - k);
+ packets_production_rate_ +=
+ ceil((double)packets_production_rate_ / (double)k) * (n - k);
+ }
// update the production rate as soon as it increases by 10% with respect to
// the last round
@@ -168,11 +163,6 @@ void RTCProductionProtocol::updateStats(bool new_round) {
allow_delayed_nacks_ = false;
}
- // check if the production rate is decreased. if yes send nacks if needed
- if (prev_packets_production_rate < packets_production_rate_) {
- sendNacksForPendingInterests();
- }
-
if (new_round) {
prev_produced_bytes_ = produced_bytes_;
prev_produced_packets_ = produced_packets_;
@@ -203,16 +193,25 @@ void RTCProductionProtocol::produce(ContentObject &content_object) {
uint32_t RTCProductionProtocol::produceDatagram(
const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer) {
std::size_t buffer_size = buffer->length();
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Maybe Sending content object: " << content_name;
+
if (TRANSPORT_EXPECT_FALSE(buffer_size == 0)) return 0;
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Sending content object: " << content_name;
+
uint32_t data_packet_size;
socket_->getSocketOption(interface::GeneralTransportOptions::DATA_PACKET_SIZE,
data_packet_size);
-
- if (TRANSPORT_EXPECT_FALSE(
- (Packet::getHeaderSizeFromFormat(data_header_format_.first,
- data_header_format_.second) +
- rtc::DATA_HEADER_SIZE + buffer_size) > data_packet_size)) {
+ // this is a source packet but we check the fec header size of FEC packet in
+ // order to leave room for the header when FEC packets will be generated
+ uint32_t fec_header = 0;
+ if (fec_encoder_) fec_encoder_->getFecHeaderSize(true);
+ uint32_t headers_size =
+ (uint32_t)Packet::getHeaderSizeFromFormat(data_header_format_.first,
+ data_header_format_.second) +
+ rtc::DATA_HEADER_SIZE + fec_header;
+ if (TRANSPORT_EXPECT_FALSE((headers_size + buffer_size) > data_packet_size)) {
return 0;
}
@@ -338,47 +337,42 @@ void RTCProductionProtocol::emptyQueue() {
}
void RTCProductionProtocol::sendManifest(const Name &name) {
- if (!making_manifest_) {
+ if (!manifest_max_capacity_) {
return;
}
- Name manifest_name(name);
-
- uint32_t data_packet_size;
- socket_->getSocketOption(interface::GeneralTransportOptions::DATA_PACKET_SIZE,
- data_packet_size);
-
- // The maximum number of entries a manifest can hold
- uint32_t manifest_capacity = making_manifest_;
+ Name manifest_name = name;
// If there is not enough hashes to fill a manifest, return early
- if (manifest_entries_.size() < manifest_capacity) {
+ if (manifest_entries_.size() < manifest_max_capacity_) {
return;
}
// Create a new manifest
std::shared_ptr<core::ContentObjectManifest> manifest =
createManifest(manifest_name.setSuffix(current_seg_));
+ auto manifest_co =
+ std::dynamic_pointer_cast<ContentObject>(manifest->getPacket());
// Fill the manifest with packet hashes that were previously saved
uint32_t nb_entries;
- for (nb_entries = 0; nb_entries < manifest_capacity; ++nb_entries) {
+ for (nb_entries = 0; nb_entries < manifest_max_capacity_; ++nb_entries) {
if (manifest_entries_.empty()) {
break;
}
std::pair<uint32_t, auth::CryptoHash> front = manifest_entries_.front();
- manifest->addSuffixHash(front.first, front.second);
+ manifest->addEntry(front.first, front.second);
manifest_entries_.pop();
}
DLOG_IF(INFO, VLOG_IS_ON(3))
- << "Sending manifest " << manifest->getName().getSuffix() << " of size "
- << nb_entries;
+ << "Sending manifest " << manifest_co->getName().getSuffix()
+ << " of size " << nb_entries;
// Encode and send the manifest
manifest->encode();
portal_->getThread().tryRunHandlerNow(
- [this, content_object{std::move(manifest)}, manifest_name]() mutable {
+ [this, content_object{std::move(manifest_co)}, manifest_name]() mutable {
produceInternal(std::move(content_object), manifest_name);
});
}
@@ -394,11 +388,12 @@ RTCProductionProtocol::createManifest(const Name &content_name) const {
uint64_t now = utils::SteadyTime::nowMs().count();
// Create a new manifest
- std::shared_ptr<core::ContentObjectManifest> manifest(
- ContentObjectManifest::createManifest(
- manifest_header_format_.first, name, core::ManifestVersion::VERSION_1,
- core::ManifestType::INLINE_MANIFEST, false, name, hash_algo,
- manifest_header_format_.second));
+ std::shared_ptr<core::ContentObjectManifest> manifest =
+ ContentObjectManifest::createContentManifest(
+ manifest_header_format_.first, name, manifest_header_format_.second);
+ manifest->setHeaders(core::ManifestType::INLINE_MANIFEST,
+ manifest_max_capacity_, hash_algo, false /* is_last */,
+ name);
// Set connection parameters
manifest->setParamsRTC(ParamsRTC{
@@ -444,7 +439,15 @@ void RTCProductionProtocol::producePktInternal(
// set hicn stuff
Name n(content_name);
content_object->setName(n.setSuffix(current_seg_));
- content_object->setLifetime(500); // XXX this should be set by the APP
+
+ uint32_t expiry_time = 0;
+ socket_->getSocketOption(
+ interface::GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
+ expiry_time);
+ if (expiry_time == interface::default_values::content_object_expiry_time)
+ expiry_time = 500; // the data expiration time should be set by the App. if
+ // the App does not specify it the default is 500ms
+ content_object->setLifetime(expiry_time);
content_object->setPathLabel(prod_label_);
// update stats
@@ -466,9 +469,9 @@ void RTCProductionProtocol::producePktInternal(
// pass packet to FEC encoder
if (fec_encoder_ && !fec) {
- uint32_t offset =
- is_manifest ? content_object->headerSize()
- : content_object->headerSize() + rtc::DATA_HEADER_SIZE;
+ uint32_t offset = is_manifest ? (uint32_t)content_object->headerSize()
+ : (uint32_t)content_object->headerSize() +
+ rtc::DATA_HEADER_SIZE;
uint32_t metadata = static_cast<uint32_t>(content_object->getPayloadType());
fec_encoder_->onPacketProduced(*content_object, offset, metadata);
@@ -481,19 +484,14 @@ void RTCProductionProtocol::producePktInternal(
*content_object);
}
- auto seq_it = seqs_map_.find(current_seg_);
- if (seq_it != seqs_map_.end()) {
- sendContentObject(content_object, false, fec);
- }
+ // TODO we may want to send FEC only if an interest is pending in the pit in
+ sendContentObject(content_object, false, fec);
if (*on_content_object_output_) {
on_content_object_output_->operator()(*socket_->getInterface(),
*content_object);
}
- // remove interests from the interest cache if it exists
- removeFromInterestQueue(current_seg_);
-
if (!fec) last_produced_data_ts_ = now;
// Update current segment
@@ -563,59 +561,65 @@ void RTCProductionProtocol::onInterest(Interest &interest) {
on_interest_input_->operator()(*socket_->getInterface(), interest);
}
- auto suffix = interest.firstSuffix();
- // numberOfSuffixes returns only the prefixes in the payalod
- // we add + 1 to count also the seq in the name
- auto n_suffixes = interest.numberOfSuffixes() + 1;
- Name name = interest.getName();
- bool prev_consumer_state = consumer_in_sync_;
-
- for (uint32_t i = 0; i < n_suffixes; i++) {
- if (i > 0) {
- name.setSuffix(*(suffix + (i - 1)));
- }
+ if (!interest.isValid()) throw std::runtime_error("Bad interest format");
+ if (interest.hasManifest() &&
+ verifier_->verify(interest) != auth::VerificationPolicy::ACCEPT)
+ throw std::runtime_error("Interset manifest verification failed");
- DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received interest " << name;
+ uint32_t *suffix = interest.firstSuffix();
+ uint32_t n_suffixes_in_manifest = interest.numberOfSuffixes();
+ uint32_t *request_bitmap = interest.getRequestBitmap();
- const std::shared_ptr<ContentObject> content_object =
- output_buffer_.find(name);
+ Name name = interest.getName();
+ uint32_t pos = 0; // Position of current suffix in manifest
- if (content_object) {
- if (*on_interest_satisfied_output_buffer_) {
- on_interest_satisfied_output_buffer_->operator()(
- *socket_->getInterface(), interest);
- }
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Received interest " << name << " (" << n_suffixes_in_manifest
+ << " suffixes in manifest)";
+
+ // Process the suffix in the interest header
+ // (first loop iteration), then suffixes in the manifest
+ do {
+ if (!interest.hasManifest() || is_bit_set(request_bitmap, pos)) {
+ const std::shared_ptr<ContentObject> content_object =
+ output_buffer_.find(name);
+
+ if (content_object) {
+ if (*on_interest_satisfied_output_buffer_) {
+ on_interest_satisfied_output_buffer_->operator()(
+ *socket_->getInterface(), interest);
+ }
- if (*on_content_object_output_) {
- on_content_object_output_->operator()(*socket_->getInterface(),
- *content_object);
- }
+ if (*on_content_object_output_) {
+ on_content_object_output_->operator()(*socket_->getInterface(),
+ *content_object);
+ }
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "Send content %u (onInterest) " << content_object->getName();
- content_object->setPathLabel(cache_label_);
- sendContentObject(content_object);
- } else {
- if (*on_interest_process_) {
- on_interest_process_->operator()(*socket_->getInterface(), interest);
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Send content %u (onInterest) " << content_object->getName();
+ content_object->setPathLabel(cache_label_);
+ sendContentObject(content_object);
+ } else {
+ if (*on_interest_process_) {
+ on_interest_process_->operator()(*socket_->getInterface(), interest);
+ }
+ processInterest(name.getSuffix(), interest.getLifetime());
}
- processInterest(name.getSuffix(), interest.getLifetime());
}
- }
- if (prev_consumer_state != consumer_in_sync_ && consumer_in_sync_)
- on_consumer_in_sync_(*socket_->getInterface(), interest);
+ // Retrieve next suffix in the manifest
+ if (interest.hasManifest()) {
+ uint32_t seq = *suffix;
+ suffix++;
+
+ name.setSuffix(seq);
+ interest.setName(name);
+ }
+ } while (pos++ < n_suffixes_in_manifest);
}
void RTCProductionProtocol::processInterest(uint32_t interest_seg,
uint32_t lifetime) {
- if (interest_seg == 0) {
- // first packet from the consumer, reset sync state
- consumer_in_sync_ = false;
- }
-
- uint64_t now = utils::SteadyTime::nowMs().count();
-
switch (rtc::ProbeHandler::getProbeType(interest_seg)) {
case rtc::ProbeType::INIT:
DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received init probe " << interest_seg;
@@ -629,183 +633,7 @@ void RTCProductionProtocol::processInterest(uint32_t interest_seg,
break;
}
- // if the production rate 0 use delayed nacks
- if (allow_delayed_nacks_ && interest_seg >= current_seg_) {
- uint64_t next_timer = UINT64_MAX;
- if (!timers_map_.empty()) {
- next_timer = timers_map_.begin()->first;
- }
-
- uint64_t expiration = now + rtc::NACK_DELAY;
- addToInterestQueue(interest_seg, expiration);
-
- // here we have at least one interest in the queue, we need to start or
- // update the timer
- if (!queue_timer_on_) {
- // set timeout
- queue_timer_on_ = true;
- scheduleQueueTimer(timers_map_.begin()->first - now);
- } else {
- // re-schedule the timer because a new interest will expires sooner
- if (next_timer > timers_map_.begin()->first) {
- interests_queue_timer_->cancel();
- scheduleQueueTimer(timers_map_.begin()->first - now);
- }
- }
- return;
- }
-
- if (queue_timer_on_) {
- // the producer is producing. Send nacks to packets that will expire
- // before the data production and remove the timer
- queue_timer_on_ = false;
- interests_queue_timer_->cancel();
- sendNacksForPendingInterests();
- }
-
- uint32_t max_gap = (uint32_t)floor(
- (double)((double)((double)lifetime *
- rtc::INTEREST_LIFETIME_REDUCTION_FACTOR /
- rtc::MILLI_IN_A_SEC) *
- (double)(packets_production_rate_)));
-
- if (interest_seg < current_seg_ || interest_seg > (max_gap + current_seg_)) {
- sendNack(interest_seg);
- } else {
- if (!consumer_in_sync_ && on_consumer_in_sync_) {
- // we consider the remote consumer to be in sync as soon as it covers
- // 70% of the production window with interests
- uint32_t perc = ceil((double)max_gap * 0.7);
- if (interest_seg > (perc + current_seg_)) {
- consumer_in_sync_ = true;
- // on_consumer_in_sync_(*socket_->getInterface(), interest);
- }
- }
- uint64_t expiration =
- now + floor((double)lifetime * rtc::INTEREST_LIFETIME_REDUCTION_FACTOR);
- addToInterestQueue(interest_seg, expiration);
- }
-}
-
-void RTCProductionProtocol::scheduleQueueTimer(uint64_t wait) {
- interests_queue_timer_->expires_from_now(std::chrono::milliseconds(wait));
- std::weak_ptr<RTCProductionProtocol> self = shared_from_this();
- interests_queue_timer_->async_wait([self](const std::error_code &ec) {
- if (ec) {
- return;
- }
-
- auto sp = self.lock();
- if (sp && sp->isRunning()) {
- sp->interestQueueTimer();
- }
- });
-}
-
-void RTCProductionProtocol::addToInterestQueue(uint32_t interest_seg,
- uint64_t expiration) {
- // check if the seq number exists already
- auto it_seqs = seqs_map_.find(interest_seg);
- if (it_seqs != seqs_map_.end()) {
- // the seq already exists
- if (expiration < it_seqs->second) {
- // we need to update the timer becasue we got a smaller one
- // 1) remove the entry from the multimap
- // 2) update this entry
- auto range = timers_map_.equal_range(it_seqs->second);
- for (auto it_timers = range.first; it_timers != range.second;
- it_timers++) {
- if (it_timers->second == it_seqs->first) {
- timers_map_.erase(it_timers);
- break;
- }
- }
- timers_map_.insert(
- std::pair<uint64_t, uint32_t>(expiration, interest_seg));
- it_seqs->second = expiration;
- } else {
- // nothing to do here
- return;
- }
- } else {
- // add the new seq
- timers_map_.insert(std::pair<uint64_t, uint32_t>(expiration, interest_seg));
- seqs_map_.insert(std::pair<uint32_t, uint64_t>(interest_seg, expiration));
- }
-}
-
-void RTCProductionProtocol::sendNacksForPendingInterests() {
- std::unordered_set<uint32_t> to_remove;
-
- uint32_t pps = ceil((double)(packets_production_rate_)*rtc::
- INTEREST_LIFETIME_REDUCTION_FACTOR);
-
- uint64_t now = utils::SteadyTime::nowMs().count();
- for (auto it = seqs_map_.begin(); it != seqs_map_.end(); it++) {
- if (it->first > current_seg_ && it->second > now) {
- double exp_time_in_sec =
- (double)(it->second - now) / (double)rtc::MILLI_IN_A_SEC;
- uint32_t packets_prod_before_expire = ceil((double)pps * exp_time_in_sec);
-
- if (it->first > (current_seg_ + packets_prod_before_expire)) {
- sendNack(it->first);
- to_remove.insert(it->first);
- }
- } else if (TRANSPORT_EXPECT_FALSE(it->first < current_seg_ ||
- it->second <= now)) {
- // this branch should never be execcuted
- // first condition: the packet was already prdocued and we have and old
- // interest pending. send a nack to notify the consumer if needed. the
- // case it->first = current_seg_ is not handled because
- // the interest will be satified by the next data packet.
- // second condition: the interest is expired.
- sendNack(it->first);
- to_remove.insert(it->first);
- }
- }
-
- // delete nacked interests
- for (auto it = to_remove.begin(); it != to_remove.end(); it++) {
- removeFromInterestQueue(*it);
- }
-}
-
-void RTCProductionProtocol::removeFromInterestQueue(uint32_t interest_seg) {
- auto seq_it = seqs_map_.find(interest_seg);
- if (seq_it != seqs_map_.end()) {
- auto range = timers_map_.equal_range(seq_it->second);
- for (auto it_timers = range.first; it_timers != range.second; it_timers++) {
- if (it_timers->second == seq_it->first) {
- timers_map_.erase(it_timers);
- break;
- }
- }
- seqs_map_.erase(seq_it);
- }
-}
-
-void RTCProductionProtocol::interestQueueTimer() {
- uint64_t now = utils::SteadyTime::nowMs().count();
-
- for (auto it_timers = timers_map_.begin(); it_timers != timers_map_.end();) {
- uint64_t expire = it_timers->first;
- if (expire <= now) {
- uint32_t seq = it_timers->second;
- sendNack(seq);
- // remove the interest from the other map
- seqs_map_.erase(seq);
- it_timers = timers_map_.erase(it_timers);
- } else {
- // stop, we are done!
- break;
- }
- }
- if (timers_map_.empty()) {
- queue_timer_on_ = false;
- } else {
- queue_timer_on_ = true;
- scheduleQueueTimer(timers_map_.begin()->first - now);
- }
+ if (interest_seg < current_seg_) sendNack(interest_seg);
}
void RTCProductionProtocol::sendManifestProbe(uint32_t sequence) {
@@ -814,18 +642,20 @@ void RTCProductionProtocol::sendManifestProbe(uint32_t sequence) {
std::shared_ptr<core::ContentObjectManifest> manifest_probe =
createManifest(manifest_name);
+ auto manifest_probe_co =
+ std::dynamic_pointer_cast<ContentObject>(manifest_probe->getPacket());
- manifest_probe->setLifetime(0);
- manifest_probe->setPathLabel(prod_label_);
+ manifest_probe_co->setLifetime(0);
+ manifest_probe_co->setPathLabel(prod_label_);
manifest_probe->encode();
if (*on_content_object_output_) {
on_content_object_output_->operator()(*socket_->getInterface(),
- *manifest_probe);
+ *manifest_probe_co);
}
DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send init probe " << sequence;
- sendContentObject(manifest_probe, true, false);
+ sendContentObject(manifest_probe_co, true, false);
}
void RTCProductionProtocol::sendNack(uint32_t sequence) {
@@ -847,20 +677,6 @@ void RTCProductionProtocol::sendNack(uint32_t sequence) {
nack->setLifetime(0);
nack->setPathLabel(prod_label_);
- if (!consumer_in_sync_ && on_consumer_in_sync_ &&
- rtc::ProbeHandler::getProbeType(sequence) == rtc::ProbeType::NOT_PROBE &&
- sequence > next_packet) {
- consumer_in_sync_ = true;
- Packet::Format format;
- socket_->getSocketOption(interface::GeneralTransportOptions::PACKET_FORMAT,
- format);
-
- auto interest =
- core::PacketManager<>::getInstance().getPacket<Interest>(format);
- interest->setName(n);
- on_consumer_in_sync_(*socket_->getInterface(), *interest);
- }
-
if (*on_content_object_output_) {
on_content_object_output_->operator()(*socket_->getInterface(), *nack);
}
@@ -881,7 +697,7 @@ void RTCProductionProtocol::sendContentObject(
portal_->sendContentObject(*content_object);
// Compute and save data packet digest
- if (making_manifest_ && !is_ah) {
+ if (manifest_max_capacity_ && !is_ah) {
auth::CryptoHashType hash_algo;
socket_->getSocketOption(interface::GeneralTransportOptions::HASH_ALGORITHM,
hash_algo);