/* * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include #include #include #include #include extern "C" { #include #include } // XXX the main listener should be retrieve in this class at initialization, aka // when hICN becomes avialable // // XXX the main listener port will be retrieved in the forwarder // interface... everything else will be delayed until we have this // information namespace hiperf { ForwarderInterface::ForwarderInterface(asio::io_service &io_service, ICallback *callback) : external_ioservice_(io_service), forwarder_interface_callback_(callback), work_(std::make_unique(internal_ioservice_)), sock_(nullptr), thread_(std::make_unique([this]() { std::cout << "Starting Forwarder Interface thread" << std::endl; internal_ioservice_.run(); std::cout << "Stopping Forwarder Interface thread" << std::endl; })), // set_route_callback_(std::forward(setRouteCallback)), check_routes_timer_(nullptr), pending_add_route_counter_(0), hicn_listen_port_(9695), /* We start in disabled state even when a forwarder is always available */ state_(State::Disabled), timer_(io_service), num_reattempts(0) { std::cout << "Forwarder interface created... connecting to forwarder...\n"; internal_ioservice_.post([this]() { onHicnServiceAvailable(true); }); } ForwarderInterface::~ForwarderInterface() { if (thread_ && thread_->joinable()) { internal_ioservice_.dispatch([this]() { if (sock_) { hc_sock_free(sock_); sock_ = nullptr; } work_.reset(); }); thread_->join(); } std::cout << "ForwarderInterface::~ForwarderInterface" << std::endl; } void ForwarderInterface::onHicnServiceAvailable(bool flag) { if (flag) { switch (state_) { case State::Disabled: case State::Requested: state_ = State::Available; case State::Available: connectToForwarder(); /* Synchronous */ if (state_ != State::Connected) { std::cout << "ConnectToForwarder failed" << std::endl; goto REATTEMPT; } state_ = State::Ready; std::cout << "Connected to forwarder... cancelling reconnection timer" << std::endl; timer_.cancel(); num_reattempts = 0; // case State::Connected: // checkListener(); // if (state_ != State::Ready) { // std::cout << "Listener not found" << std::endl; // goto REATTEMPT; // } // state_ = State::Ready; // timer_.cancel(); // num_reattempts = 0; std::cout << "Forwarder interface is ready... communicate to controller" << std::endl; forwarder_interface_callback_->onHicnServiceReady(); case State::Ready: break; } } else { if (sock_) { hc_sock_free(sock_); sock_ = nullptr; } state_ = State::Disabled; // XXX to be checked upon callback to prevent the // state from going forward (used to manage // concurrency) } return; REATTEMPT: /* Schedule reattempt */ std::cout << "Failed to connect, scheduling reattempt" << std::endl; num_reattempts++; timer_.expires_from_now( std::chrono::milliseconds(ForwarderInterface::REATTEMPT_DELAY_MS)); // timer_.async_wait(std::bind(&ForwarderInterface::onHicnServiceAvailable, // this, flag, std::placeholders::_1)); timer_.async_wait([this, flag](std::error_code ec) { if (ec) return; onHicnServiceAvailable(flag); }); } int ForwarderInterface::connectToForwarder() { sock_ = hc_sock_create(); if (!sock_) { std::cout << "Could not create socket" << std::endl; goto ERR_SOCK; } if (hc_sock_connect(sock_) < 0) { std::cout << "Could not connect to forwarder" << std::endl; goto ERR; } std::cout << "Forwarder interface connected" << std::endl; state_ = State::Connected; return 0; ERR: hc_sock_free(sock_); sock_ = nullptr; ERR_SOCK: return -1; } int ForwarderInterface::checkListener() { if (!sock_) return -1; hc_data_t *data; if (hc_listener_list(sock_, &data) < 0) return -1; int ret = -1; foreach_listener(l, data) { std::string interface = std::string(l->interface_name); if (interface.compare("lo") != 0) { hicn_listen_port_ = l->local_port; state_ = State::Ready; ret = 0; std::cout << "Got listener port" << std::endl; break; } } hc_data_free(data); return ret; } void ForwarderInterface::close() { std::cout << "ForwarderInterface::close" << std::endl; state_ = State::Disabled; /* Cancelling eventual reattempts */ timer_.cancel(); if (sock_) { hc_sock_free(sock_); sock_ = nullptr; } internal_ioservice_.post([this]() { work_.reset(); }); if (thread_->joinable()) { thread_->join(); } } #if 0 void ForwarderInterface::enableCheckRoutesTimer() { if (check_routes_timer_ != nullptr) return; check_routes_timer_ = std::make_unique(internal_ioservice_); checkRoutesLoop(); } void ForwarderInterface::removeConnectedUserNow(ProtocolPtr protocol) { internalRemoveConnectedUser(protocol); } void ForwarderInterface::scheduleRemoveConnectedUser(ProtocolPtr protocol) { internal_ioservice_.post( [this, protocol]() { internalRemoveConnectedUser(protocol); }); } #endif void ForwarderInterface::createFaceAndRoute(const RouteInfoPtr &route_info) { std::vector routes; routes.push_back(std::move(route_info)); createFaceAndRoutes(routes); } void ForwarderInterface::createFaceAndRoutes( const std::vector &routes_info) { pending_add_route_counter_++; auto timer = new asio::steady_timer(internal_ioservice_); internal_ioservice_.post([this, routes_info, timer]() { internalCreateFaceAndRoutes(routes_info, ForwarderInterface::MAX_REATTEMPT, timer); }); } void ForwarderInterface::deleteFaceAndRoute(const RouteInfoPtr &route_info) { std::vector routes; routes.push_back(std::move(route_info)); deleteFaceAndRoutes(routes); } void ForwarderInterface::deleteFaceAndRoutes( const std::vector &routes_info) { internal_ioservice_.post([this, routes_info]() { for (auto &route : routes_info) { internalDeleteFaceAndRoute(route); } }); } void ForwarderInterface::internalDeleteFaceAndRoute( const RouteInfoPtr &route_info) { if (!sock_) return; hc_data_t *data; if (hc_route_list(sock_, &data) < 0) return; std::vector routes_to_remove; foreach_route(r, data) { char remote_addr[INET6_ADDRSTRLEN]; int ret = ip_address_ntop(&r->remote_addr, remote_addr, r->len, r->family); if (ret < 0) continue; std::string route_addr(remote_addr); if (route_addr.compare(route_info->route_addr) == 0 && r->len == route_info->route_len) { // route found routes_to_remove.push_back(r); } } if (routes_to_remove.size() == 0) { // nothing to do here hc_data_free(data); return; } std::unordered_set connids_to_remove; for (unsigned i = 0; i < routes_to_remove.size(); i++) { connids_to_remove.insert(routes_to_remove[i]->face_id); if (hc_route_delete(sock_, routes_to_remove[i]) < 0) { std::cout << "Error removing route from forwarder." << std::endl; } } // remove connection if (hc_connection_list(sock_, &data) < 0) { hc_data_free(data); return; } // collects pointerst to the connections using the conn IDs std::vector conns_to_remove; foreach_connection(c, data) { if (connids_to_remove.find(c->id) != connids_to_remove.end()) { // conn found conns_to_remove.push_back(c); } } if (conns_to_remove.size() == 0) { // nothing else to do here hc_data_free(data); return; } for (unsigned i = 0; i < conns_to_remove.size(); i++) { if (hc_connection_delete(sock_, conns_to_remove[i]) < 0) { std::cout << "Error removing connection from forwarder." << std::endl; } } hc_data_free(data); } void ForwarderInterface::internalCreateFaceAndRoutes( const std::vector &route_info, uint8_t max_try, asio::steady_timer *timer) { uint32_t face_id; std::vector failed; for (auto &route : route_info) { int ret = tryToCreateFace(route.get(), &face_id); if (ret >= 0) { auto ret = tryToCreateRoute(route.get(), face_id); if (ret < 0) { failed.push_back(route); std::cerr << "Error creating route and face" << std::endl; continue; } } } if (failed.size() > 0) { if (max_try == 0) { /* All attempts failed */ goto RESULT; } max_try--; timer->expires_from_now(std::chrono::milliseconds(500)); timer->async_wait([this, failed, max_try, timer](std::error_code ec) { if (ec) return; internalCreateFaceAndRoutes(failed, max_try, timer); }); return; } #if 0 // route_status_[protocol] = std::move(route_info); for (size_t i = 0; i < route_info.size(); i++) { route_status_.insert( std::pair(protocol, std::move(route_info[i]))); } #endif RESULT: std::cout << "Face / Route create ok, now calling back protocol" << std::endl; pending_add_route_counter_--; external_ioservice_.post([this, r = std::move(route_info)]() mutable { forwarder_interface_callback_->onRouteConfigured(r); }); delete timer; } int ForwarderInterface::tryToCreateFace(RouteInfo *route_info, uint32_t *face_id) { bool found = false; // check connection with the forwarder if (!sock_) { std::cout << "[ForwarderInterface::tryToCreateFace] socket error" << std::endl; goto ERR_SOCK; } // get listeners list hc_data_t *data; if (hc_listener_list(sock_, &data) < 0) { std::cout << "[ForwarderInterface::tryToCreateFace] cannot list listeners"; goto ERR_LIST; } char _local_address[128]; foreach_listener(l, data) { std::cout << "Processing " << l->interface_name << std::endl; std::string interface = std::string(l->interface_name); int ret = ip_address_ntop(&l->local_addr, _local_address, 128, AF_INET); if (ret < 0) { std::cerr << "Error in ip_address_ntop" << std::endl; goto ERR; } std::string local_address = std::string(_local_address); uint16_t local_port = l->local_port; if (interface.compare(route_info->interface) == 0 && local_address.compare(route_info->local_addr) == 0 && local_port == route_info->local_port) { found = true; break; } } std::cout << route_info->remote_addr << std::endl; ip_address_t local_address, remote_address; ip_address_pton(route_info->local_addr.c_str(), &local_address); ip_address_pton(route_info->remote_addr.c_str(), &remote_address); if (!found) { // Create listener hc_listener_t listener; memset(&listener, 0, sizeof(hc_listener_t)); std::string name = "l_" + route_info->name; listener.local_addr = local_address; listener.type = CONNECTION_TYPE_UDP; listener.family = AF_INET; listener.local_port = route_info->local_port; strncpy(listener.name, name.c_str(), sizeof(listener.name)); strncpy(listener.interface_name, route_info->interface.c_str(), sizeof(listener.interface_name)); std::cout << "------------> " << route_info->interface << std::endl; int ret = hc_listener_create(sock_, &listener); if (ret < 0) { std::cerr << "Error creating listener." << std::endl; return -1; } else { std::cout << "Listener " << listener.id << " created." << std::endl; } } // Create face hc_face_t face; memset(&face, 0, sizeof(hc_face_t)); // crate face with the local interest face.face.type = FACE_TYPE_UDP; face.face.family = route_info->family; face.face.local_addr = local_address; face.face.remote_addr = remote_address; face.face.local_port = route_info->local_port; face.face.remote_port = route_info->remote_port; if (netdevice_set_name(&face.face.netdevice, route_info->interface.c_str()) < 0) { std::cout << "[ForwarderInterface::tryToCreateFaceAndRoute] " "netdevice_set_name " "(" << face.face.netdevice.name << ", " << route_info->interface << ") error" << std::endl; goto ERR; } // create face if (hc_face_create(sock_, &face) < 0) { std::cout << "[ForwarderInterface::tryToCreateFace] error creating face"; goto ERR; } std::cout << "Face created successfully" << std::endl; // assing face to the return value *face_id = face.id; hc_data_free(data); return 0; ERR: hc_data_free(data); ERR_LIST: ERR_SOCK: return -1; } int ForwarderInterface::tryToCreateRoute(RouteInfo *route_info, uint32_t face_id) { std::cout << "Trying to create route" << std::endl; // check connection with the forwarder if (!sock_) { std::cout << "[ForwarderInterface::tryToCreateRoute] socket error"; return -1; } ip_address_t route_ip; hc_route_t route; if (ip_address_pton(route_info->route_addr.c_str(), &route_ip) < 0) { std::cout << "[ForwarderInterface::tryToCreateRoute] ip_address_pton error"; return -1; } route.face_id = face_id; route.family = AF_INET6; route.remote_addr = route_ip; route.len = route_info->route_len; route.cost = 1; if (hc_route_create(sock_, &route) < 0) { std::cout << "[ForwarderInterface::tryToCreateRoute] error creating route"; return -1; } std::cout << "[ForwarderInterface::tryToCreateRoute] OK" << std::endl; return 0; } #if 0 // not used void ForwarderInterface::checkRoutesLoop() { check_routes_timer_->expires_from_now(std::chrono::milliseconds(1000)); check_routes_timer_->async_wait([this](std::error_code ec) { if (ec) return; if (pending_add_route_counter_ == 0) checkRoutes(); }); } void ForwarderInterface::checkRoutes() { std::cout << "someone called the checkRoutes function" << std::endl; if (!sock_) return; hc_data_t *data; if (hc_route_list(sock_, &data) < 0) { return; } std::unordered_set routes_set; foreach_route(r, data) { char remote_addr[INET6_ADDRSTRLEN]; int ret = ip_address_ntop(&r->remote_addr, remote_addr, r->len, r->family); if (ret < 0) continue; std::string route(std::string(remote_addr) + "/" + std::to_string(r->len)); routes_set.insert(route); } for (auto it = route_status_.begin(); it != route_status_.end(); it++) { std::string route(it->second->route_addr + "/" + std::to_string(it->second->route_len)); if (routes_set.find(route) == routes_set.end()) { // the route is missing createFaceAndRoute(it->second, it->first); break; } } hc_data_free(data); } #endif #if 0 using ListenerRetrievedCallback = std::function; ListenerRetrievedCallback listener_retrieved_callback_; #ifdef __ANDROID__ hicn_listen_port_(9695), #else hicn_listen_port_(0), #endif timer_(forward_engine_.getIoService()), void initConfigurationProtocol(void) { // We need the configuration, which is different for every protocol... // so we move this step down towards the protocol implementation itself. if (!permanent_hicn) { doInitConfigurationProtocol(); } else { // XXX This should be moved somewhere else getMainListener( [this](std::error_code ec, uint32_t hicn_listen_port) { if (!ec) { hicn_listen_port_ = hicn_listen_port; doInitConfigurationProtocol(); } }); } } template void getMainListener(Callback &&callback) { listener_retrieved_callback_ = std::forward(callback); tryToConnectToForwarder(); } private: void doGetMainListener(std::error_code ec) { if (!ec) { // ec == 0 --> timer expired int ret = forwarder_interface_.getMainListenerPort(); if (ret <= 0) { // Since without the main listener of the forwarder the proxy cannot // work, we can stop the program here until we get the listener port. std::cout << "Could not retrieve main listener port from the forwarder. " "Retrying."; timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL)); timer_.async_wait(std::bind(&Protocol::doGetMainListener, this, std::placeholders::_1)); } else { timer_.cancel(); retx_count_ = 0; hicn_listen_port_ = uint16_t(ret); listener_retrieved_callback_( make_error_code(configuration_error::success), hicn_listen_port_); } } else { std::cout << "Timer for retrieving main hicn listener canceled." << std::endl; } } void tryToConnectToForwarder() { doTryToConnectToForwarder(std::make_error_code(std::errc(0))); } void doTryToConnectToForwarder(std::error_code ec) { if (!ec) { // ec == 0 --> timer expired int ret = forwarder_interface_.connect(); if (ret < 0) { // We were not able to connect to the local forwarder. Do not give up // and retry. std::cout << "Could not connect to local forwarder. Retrying." << std::endl; timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL)); timer_.async_wait(std::bind(&Protocol::doTryToConnectToForwarder, this, std::placeholders::_1)); } else { timer_.cancel(); retx_count_ = 0; doGetMainListener(std::make_error_code(std::errc(0))); } } else { std::cout << "Timer for re-trying forwarder connection canceled." << std::endl; } } template constexpr uint32_t Protocol::RETRY_INTERVAL; #endif constexpr uint32_t ForwarderInterface::REATTEMPT_DELAY_MS; constexpr uint32_t ForwarderInterface::MAX_REATTEMPT; } // namespace hiperf