diff options
Diffstat (limited to 'apps/http-proxy/src')
-rw-r--r-- | apps/http-proxy/src/IcnReceiver.cc | 54 | ||||
-rw-r--r-- | apps/http-proxy/src/IcnReceiver.h | 14 |
2 files changed, 49 insertions, 19 deletions
diff --git a/apps/http-proxy/src/IcnReceiver.cc b/apps/http-proxy/src/IcnReceiver.cc index fc6837a2c..18553d84b 100644 --- a/apps/http-proxy/src/IcnReceiver.cc +++ b/apps/http-proxy/src/IcnReceiver.cc @@ -24,7 +24,8 @@ namespace transport { -core::Prefix generatePrefix(const std::string& prefix_url) { +core::Prefix generatePrefix(const std::string& prefix_url, + std::string& first_ipv6_word) { const char* str = prefix_url.c_str(); uint16_t pos = 0; @@ -39,7 +40,7 @@ core::Prefix generatePrefix(const std::string& prefix_url) { uint32_t locator_hash = utils::hash::fnv32_buf(str, strlen(str)); std::stringstream stream; - stream << std::hex << http::default_values::ipv6_first_word << ":0"; + stream << first_ipv6_word << ":0"; for (uint16_t* word = (uint16_t*)&locator_hash; std::size_t(word) < (std::size_t(&locator_hash) + sizeof(locator_hash)); @@ -52,12 +53,10 @@ core::Prefix generatePrefix(const std::string& prefix_url) { return core::Prefix(stream.str(), 64); } -AsyncConsumerProducer::AsyncConsumerProducer(const std::string& prefix, - std::string& ip_address, - std::string& port, - std::string& cache_size, - std::string& mtu) - : prefix_(generatePrefix(prefix)), +AsyncConsumerProducer::AsyncConsumerProducer( + const std::string& prefix, std::string& ip_address, std::string& port, + std::string& cache_size, std::string& mtu, std::string& first_ipv6_word) + : prefix_(generatePrefix(prefix, first_ipv6_word)), producer_socket_(), ip_address_(ip_address), port_(port), @@ -126,20 +125,37 @@ void AsyncConsumerProducer::doReceive() { void AsyncConsumerProducer::manageIncomingInterest( core::Name& name, core::Packet::MemBufPtr& packet, utils::MemBuf* payload) { - // auto seg = name.getSuffix(); + auto seg = name.getSuffix(); name.setSuffix(0); auto _it = chunk_number_map_.find(name); auto _end = chunk_number_map_.end(); if (_it != _end) { - return; + if (_it->second.second) { + // Content is in production + return; + } + + if (seg >= _it->second.first) { + TRANSPORT_LOGD( + "Ignoring interest with name %s for a content object which does not " + "exist. (Request: %u, max: %u)", + name.toString().c_str(), (uint32_t)seg, (uint32_t)_it->second.first); + return; + } } bool is_mpd = HTTPMessageFastParser::isMpdRequest(payload->data(), payload->length()); - chunk_number_map_.emplace(name, 0); - response_name_queue_.emplace(std::move(name), is_mpd ? 1000 : 10000); + auto pair = chunk_number_map_.emplace(name, std::pair<uint32_t, bool>(0, 0)); + if (!pair.second) { + pair.first->second.first = 0; + } + + pair.first->second.second = true; + + response_name_queue_.emplace(std::move(name), is_mpd ? 1000 : 100000); connector_.send(payload, [packet = std::move(packet)]() {}); } @@ -166,17 +182,23 @@ void AsyncConsumerProducer::publishContent(const uint8_t* data, const interface::Name& name = options.getName(); - start_suffix = chunk_number_map_[name]; + auto it = chunk_number_map_.find(name); + if (it == chunk_number_map_.end()) { + std::cerr << "Aborting due to response not found in ResposeInfo map." + << std::endl; + abort(); + } + + start_suffix = it->second.first; if (headers) { request_counter_++; } - - chunk_number_map_[name] += + it->second.first += producer_socket_.produce(name, data, size, is_last, start_suffix); if (is_last) { - chunk_number_map_.erase(name); + it->second.second = false; response_name_queue_.pop(); } } diff --git a/apps/http-proxy/src/IcnReceiver.h b/apps/http-proxy/src/IcnReceiver.h index 61ca4333a..67f615ad7 100644 --- a/apps/http-proxy/src/IcnReceiver.h +++ b/apps/http-proxy/src/IcnReceiver.h @@ -28,10 +28,15 @@ namespace transport { class AsyncConsumerProducer { + using SegmentProductionPair = std::pair<uint32_t, bool>; + using ResponseInfoMap = std::unordered_map<core::Name, SegmentProductionPair>; + using RequestQueue = std::queue<interface::PublicationOptions>; + public: explicit AsyncConsumerProducer(const std::string& prefix, std::string& ip_address, std::string& port, - std::string& cache_size, std::string& mtu); + std::string& cache_size, std::string& mtu, + std::string& first_ipv6_word); void start(); @@ -63,8 +68,11 @@ class AsyncConsumerProducer { // std::unordered_map<core::Name, std::shared_ptr<ATSConnector>> // connection_map_; ATSConnector connector_; - std::unordered_map<core::Name, uint32_t> chunk_number_map_; - std::queue<interface::PublicationOptions> response_name_queue_; + + // ResponseInfoMap --> max_seq_number + bool indicating whether request is in + // production + ResponseInfoMap chunk_number_map_; + RequestQueue response_name_queue_; }; } // namespace transport |