diff options
author | 2016-12-22 11:35:11 +0200 | |
---|---|---|
committer | 2016-12-22 17:32:14 +0200 | |
commit | 89309695626ba47a7e9f4c908a4d46c8b059df0f (patch) | |
tree | e316c2ec05d42558dde0c0a7d43c864bb91c1ad7 /src | |
parent | 7b8e05fc8996827a99d5da74b30138a15c89a96e (diff) |
stateful scheduler cleanup
Signed-off-by: Hanoh Haim <hhaim@cisco.com>
Diffstat (limited to 'src')
-rwxr-xr-x | src/bp_gtest.cpp | 16 | ||||
-rwxr-xr-x | src/bp_sim.cpp | 77 | ||||
-rwxr-xr-x | src/bp_sim.h | 22 | ||||
-rw-r--r-- | src/gtest/bp_timer_gtest.cpp | 61 | ||||
-rw-r--r-- | src/h_timer.cpp | 44 | ||||
-rw-r--r-- | src/h_timer.h | 68 | ||||
-rw-r--r-- | src/h_timer_w.h | 533 |
7 files changed, 255 insertions, 566 deletions
diff --git a/src/bp_gtest.cpp b/src/bp_gtest.cpp index 87f3f7ca..4c04dde9 100755 --- a/src/bp_gtest.cpp +++ b/src/bp_gtest.cpp @@ -263,7 +263,7 @@ TEST_F(basic, limit_single_pkt) { EXPECT_EQ_UINT32(1, res?1:0)<< "pass"; } -/*TEST_F(basic, limit_multi_pkt) { +TEST_F(basic, limit_multi_pkt) { CTestBasic t1; CParserOption * po =&CGlobalInfo::m_options; @@ -273,7 +273,7 @@ TEST_F(basic, limit_single_pkt) { po->out_file ="exp/limit_multi_pkt"; bool res=t1.init(); EXPECT_EQ_UINT32(1, res?1:0)<< "pass"; -} */ +} TEST_F(basic, imix) { @@ -287,6 +287,18 @@ TEST_F(basic, imix) { EXPECT_EQ_UINT32(1, res?1:0)<< "pass"; } +TEST_F(basic, imix_fast) { + + CTestBasic t1; + CParserOption * po =&CGlobalInfo::m_options; + po->preview.setVMode(3); + po->preview.setFileWrite(true); + po->cfg_file ="cap2/imix_64_fast.yaml"; + po->out_file ="exp/imix_64_fast"; + bool res=t1.init(); + EXPECT_EQ_UINT32(1, res?1:0)<< "pass"; +} + TEST_F(basic, dns) { diff --git a/src/bp_sim.cpp b/src/bp_sim.cpp index 0725a437..72eb91ba 100755 --- a/src/bp_sim.cpp +++ b/src/bp_sim.cpp @@ -3475,8 +3475,12 @@ bool CFlowGenListPerThread::Create(uint32_t thread_id, 0 , socket_id); - m_tw.Create(TW_BUCKETS,3); - + RC_HTW_t tw_res=m_tw.Create(TW_BUCKETS,3); + if (tw_res != RC_HTW_OK){ + CHTimerWheelErrorStr err(tw_res); + printf("ERROR %-30s - %s \n",err.get_str(),err.get_help_str()); + exit(1); + } m_node_gen.Create(this); m_flow_id_to_node_lookup.Create(); @@ -3542,19 +3546,6 @@ FORCE_NO_INLINE void CFlowGenListPerThread::handler_defer_job(CGenNode *p){ } } -FORCE_NO_INLINE void CFlowGenListPerThread::handler_defer_job_flush(void){ - /* flush the pending job of free ports */ - if (m_tcp_dpc) { - handler_defer_job((CGenNode *)m_tcp_dpc); - free_node((CGenNode *)m_tcp_dpc); - m_tcp_dpc=0; - } - if (m_udp_dpc) { - handler_defer_job((CGenNode *)m_udp_dpc); - free_node((CGenNode *)m_udp_dpc); - m_udp_dpc=0; - } -} void CFlowGenListPerThread::defer_client_port_free(bool is_tcp, @@ -3767,7 +3758,10 @@ template<bool TEARDOWN> inline void CFlowGenListPerThread::on_flow_tick(CGenNode *node){ #ifdef TREX_SIM - node->m_time=m_cur_time_sec; + /* for sim, we need to update the node from tick time*/ + if ( likely (!node->is_repeat_flow() ) ){ + node->m_time=m_cur_time_sec; + } #endif #ifdef _DEBUG m_node_gen.update_stats(node); @@ -3777,22 +3771,22 @@ inline void CFlowGenListPerThread::on_flow_tick(CGenNode *node){ if ( likely (!node->is_repeat_flow()) ) { if ( likely (!node->is_last_in_flow()) ) { m_tw.timer_start(&node->m_tmr,node->update_next_pkt_in_flow_tw() ); - }else{ - free_last_flow_node( node); + }else{ + free_last_flow_node(node); } }else{ /* repeatable flow, we need to stop it in case of repeat */ if ( node->is_last_in_flow() ) { if ( TEARDOWN == false ){ - node->m_time=m_cur_time_sec; /* update the node time as we schedule it */ - reschedule_flow(node); + reschedule_flow(node); }else{ - free_last_flow_node( node); + free_last_flow_node(node); } }else{ - m_tw.timer_start(&node->m_tmr,node->update_next_pkt_in_flow_tw() ); + /* need to update both accurate time and bucket time in this case update_next_pkt_in_flow_both() for reschedule to be accurate */ + m_tw.timer_start(&node->m_tmr,node->update_next_pkt_in_flow_both() ); } } } @@ -3809,6 +3803,19 @@ inline void CFlowGenListPerThread::on_flow_tick(CGenNode *node){ #define UNSAFE_CONTAINER_OF_POP GCC_DIAG_ON() +static void tw_free_node(void *userdata, + CHTimerObj *tmr){ + CFlowGenListPerThread * thread=(CFlowGenListPerThread * )userdata; + UNSAFE_CONTAINER_OF_PUSH; + CGenNode * node=unsafe_container_of(node,tmr,CGenNode,m_tmr); + UNSAFE_CONTAINER_OF_POP; + if (node->is_flow_node()) { + /* this is a flow*/ + thread->free_last_flow_node(node); + }else{ + thread->free_node(node); + } +} static void tw_on_tick_per_thread_cb_always(void *userdata, @@ -3834,6 +3841,26 @@ void tw_on_tick_per_thread_cb(void *userdata, } +FORCE_NO_INLINE void CFlowGenListPerThread::handler_defer_job_flush(void){ + + /* free the objects letf in TW */ + m_tw.detach_all((void *)this,tw_free_node); + + /* flush the pending job of free ports */ + if (m_tcp_dpc) { + handler_defer_job((CGenNode *)m_tcp_dpc); + free_node((CGenNode *)m_tcp_dpc); + m_tcp_dpc=0; + } + if (m_udp_dpc) { + handler_defer_job((CGenNode *)m_udp_dpc); + free_node((CGenNode *)m_udp_dpc); + m_udp_dpc=0; + } +} + + + inline bool CNodeGenerator::do_work_stl(CGenNode * node, CFlowGenListPerThread * thread, bool on_terminate){ @@ -3883,7 +3910,6 @@ inline bool CNodeGenerator::do_work_both(CGenNode * node, /* PKT */ m_p_queue.pop(); thread->on_flow_tick<ON_TERMINATE>(node); - //printf(" MOVE from PKT->TW\n"); }else{ if ((type == CGenNode::FLOW_FIF)) { /* callback to our method */ @@ -3968,6 +3994,7 @@ inline int CNodeGenerator::teardown(CFlowGenListPerThread * thread, }else{ // free the left other thread->handler_defer_job_flush(); + } return (0); } @@ -4211,7 +4238,7 @@ void CNodeGenerator::handle_flow_pkt(CGenNode *node, CFlowGenListPerThread *thre } m_p_queue.pop(); if ( node->is_last_in_flow() ) { - thread->free_last_flow_node( node); + thread->free_last_flow_node(node); } else { node->update_next_pkt_in_flow_as(); m_p_queue.push(node); @@ -4510,7 +4537,7 @@ void CFlowGenListPerThread::terminate_nat_flows(CGenNode *p){ } else { m_flow_id_to_node_lookup.remove_no_lookup(p->get_short_fid() & NAT_FLOW_ID_MASK_IP_ID); } - free_last_flow_node( p); + free_last_flow_node(p); } diff --git a/src/bp_sim.h b/src/bp_sim.h index e8a37f6f..a41349d0 100755 --- a/src/bp_sim.h +++ b/src/bp_sim.h @@ -1498,6 +1498,14 @@ public: } void free_base(); + + bool is_flow_node(){ + if ((m_type == FLOW_PKT) || (m_type == FLOW_PKT_NAT)) { + return (true); + }else{ + return (false); + } + } }; @@ -1557,6 +1565,9 @@ public: /* update the node time for accurate scheduler */ inline void update_next_pkt_in_flow_as(void); + inline uint32_t update_next_pkt_in_flow_both(void); + + inline void reset_pkt_in_flow(void); inline uint8_t get_plugin_id(void){ return ( m_template_info->m_plugin_id); @@ -4108,6 +4119,17 @@ inline void CGenNode::update_next_pkt_in_flow_as(void){ m_pkt_info = m_flow_info->GetPacket((pkt_index-1)); } + +inline uint32_t CGenNode::update_next_pkt_in_flow_both(void){ + m_time += m_pkt_info->m_pkt_indication.m_cap_ipg; + uint32_t dticks = m_pkt_info->m_pkt_indication.m_ticks; + uint32_t pkt_index = m_pkt_info->m_pkt_indication.m_packet->pkt_cnt; + pkt_index++; + m_pkt_info = m_flow_info->GetPacket((pkt_index-1)); + return (dticks); +} + + inline uint32_t CGenNode::update_next_pkt_in_flow_tw(void){ uint32_t dticks = m_pkt_info->m_pkt_indication.m_ticks; diff --git a/src/gtest/bp_timer_gtest.cpp b/src/gtest/bp_timer_gtest.cpp index c0779868..5185da87 100644 --- a/src/gtest/bp_timer_gtest.cpp +++ b/src/gtest/bp_timer_gtest.cpp @@ -189,6 +189,17 @@ void my_test_on_tick_cb7(void *userdata,CHTimerObj *tmr){ lp->timer_start(tmr,9); } +void my_test_on_tick_cb_free(void *userdata,CHTimerObj *tmr){ + + + //CHTimerWheel * lp=(CHTimerWheel *)userdata; + #pragma GCC diagnostic ignored "-Winvalid-offsetof" + CMyTestObject *lpobj=(CMyTestObject *)((uint8_t*)tmr-offsetof (CMyTestObject,m_timer)); + #pragma GCC diagnostic pop + printf(" [event %d ]",lpobj->m_id); + delete lpobj; +} + TEST_F(gt_r_timer, timer7) { @@ -593,3 +604,53 @@ TEST_F(gt_r_timer, timer16) { test.Delete(); } + +/* test free of iterator */ +TEST_F(gt_r_timer, timer17) { + + CHTimerWheel timer; + + CMyTestObject * tmr1; + tmr1 = new CMyTestObject(); + tmr1->m_id=12; + + CMyTestObject * tmr2; + tmr2 = new CMyTestObject(); + tmr2->m_id=13; + + CMyTestObject * tmr3; + tmr3 = new CMyTestObject(); + tmr3->m_id=14; + + + EXPECT_EQ( timer.Create(4,4),RC_HTW_OK); + timer.timer_start(&tmr1->m_timer,80); + timer.timer_start(&tmr2->m_timer,1); + timer.timer_start(&tmr3->m_timer,0); + + timer.detach_all((void *)&timer,my_test_on_tick_cb_free); + EXPECT_EQ( timer.Delete(),RC_HTW_OK); +} + + +TEST_F(gt_r_timer, timer18) { + + RC_HTW_t res[]={ + RC_HTW_OK, + RC_HTW_ERR_NO_RESOURCES, + RC_HTW_ERR_TIMER_IS_ON, + RC_HTW_ERR_NO_LOG2, + RC_HTW_ERR_MAX_WHEELS, + RC_HTW_ERR_NOT_ENOUGH_BITS + }; + + int i; + for (i=0; i<sizeof(res)/sizeof(RC_HTW_t); i++) { + CHTimerWheelErrorStr err(res[i]); + printf(" %-30s - %s \n",err.get_str(),err.get_help_str()); + } +} + + + + diff --git a/src/h_timer.cpp b/src/h_timer.cpp index baaaca31..b3d86d46 100644 --- a/src/h_timer.cpp +++ b/src/h_timer.cpp @@ -27,6 +27,10 @@ void CHTimerObj::Dump(FILE *fd){ } + +void detach_all(void *userdata,htw_on_tick_cb_t cb); + + RC_HTW_t CHTimerOneWheel::Create(uint32_t wheel_size){ CHTimerWheelLink *bucket; @@ -67,6 +71,26 @@ RC_HTW_t CHTimerOneWheel::Delete(){ } + +uint32_t CHTimerOneWheel::detach_all(void *userdata,htw_on_tick_cb_t cb){ + + uint32_t m_total_events=0; + int i; + for (i = 0; i < m_wheel_size; i++) { + CHTimerWheelLink * lp=&m_buckets[i]; + CHTimerWheelLink * first; + + while (!lp->is_self()) { + first = lp->m_next; + first->detach(); + m_total_events++; + assert(cb); + cb(userdata,(CHTimerObj *)first); + } + } + return (m_total_events); +} + RC_HTW_t CHTimerOneWheel::timer_stop(CHTimerObj *tmr){ if ( tmr->is_running() ) { tmr->detach(); @@ -107,11 +131,27 @@ void CHTimerOneWheel::dump_link_list(uint32_t bucket_index, } -void CHTimerWheel::on_tick(void *userdata,htw_on_tick_cb_t cb){ - +void CHTimerWheel::detach_all(void *userdata,htw_on_tick_cb_t cb){ + #ifndef _DEBUG + if (m_total_events==0) { + return; + } + #endif int i; + uint32_t res=0; + for (i=0;i<m_num_wheels; i++) { + CHTimerOneWheel * lp=&m_timer_w[i]; + res=lp->detach_all(userdata,cb); + assert(m_total_events>=res); + m_total_events -=res; + } + assert(m_total_events==0); +} +void CHTimerWheel::on_tick(void *userdata,htw_on_tick_cb_t cb){ + + int i; for (i=0;i<m_num_wheels; i++) { CHTimerOneWheel * lp=&m_timer_w[i]; CHTimerObj * event; diff --git a/src/h_timer.h b/src/h_timer.h index 1d57640b..a937d548 100644 --- a/src/h_timer.h +++ b/src/h_timer.h @@ -40,9 +40,64 @@ typedef enum { } RC_HTW_t; +class CHTimerWheelErrorStr { +public: + CHTimerWheelErrorStr(RC_HTW_t val){ + m_err=val; + } + const char * get_str(void){ + switch (m_err) { + case RC_HTW_OK: + return ("RC_HTW_OK"); + break; + case RC_HTW_ERR_NO_RESOURCES : + return ("RC_HTW_ERR_NO_RESOURCES"); + break; + case RC_HTW_ERR_TIMER_IS_ON : + return ("RC_HTW_ERR_TIMER_IS_ON"); + break; + case RC_HTW_ERR_NO_LOG2 : + return ("RC_HTW_ERR_NO_LOG2"); + break; + case RC_HTW_ERR_MAX_WHEELS : + return ("RC_HTW_ERR_MAX_WHEELS"); + break; + case RC_HTW_ERR_NOT_ENOUGH_BITS : + return ("RC_HTW_ERR_NOT_ENOUGH_BITS"); + break; + default: + assert(0); + } + } + const char * get_help_str(void){ + switch (m_err) { + case RC_HTW_OK: + return ("ok"); + break; + case RC_HTW_ERR_NO_RESOURCES : + return ("not enough memory"); + break; + case RC_HTW_ERR_TIMER_IS_ON : + return ("timer is already on, you should stop before start"); + break; + case RC_HTW_ERR_NO_LOG2 : + return ("number of buckets should be log2"); + break; + case RC_HTW_ERR_MAX_WHEELS : + return ("maximum number of wheels is limited to 4"); + break; + case RC_HTW_ERR_NOT_ENOUGH_BITS : + return ("(log2(buckets) * number of wheels) should be less than 32, try to reduce the number of wheels"); + break; + default: + assert(0); + } + } - +private: + RC_HTW_t m_err; +}; class CHTimerWheelLink { @@ -76,7 +131,7 @@ public: } void detach(void){ - #ifdef HTW_DEBUG + #ifdef _DEBUG assert(m_next); #endif CHTimerWheelLink *next; @@ -134,7 +189,7 @@ public: inline RC_HTW_t timer_start(CHTimerObj *tmr, htw_ticks_t ticks){ - #ifdef HTW_DEBUG + #ifdef _DEBUG if ( tmr->is_running() ){ return( RC_HTW_ERR_TIMER_IS_ON); } @@ -146,6 +201,8 @@ public: RC_HTW_t timer_stop (CHTimerObj *tmr); + uint32_t detach_all(void *userdata,htw_on_tick_cb_t cb); + inline bool check_timer_tick_cycle(){ return (m_tick_done); } @@ -247,6 +304,7 @@ public: RC_HTW_t Delete(); + inline RC_HTW_t timer_start(CHTimerObj *tmr, htw_ticks_t ticks){ m_total_events++; @@ -265,7 +323,9 @@ public: bool is_any_events_left(){ return(m_total_events>0?true:false); } - + + /* iterate all, detach and call the callback */ + void detach_all(void *userdata,htw_on_tick_cb_t cb); private: diff --git a/src/h_timer_w.h b/src/h_timer_w.h deleted file mode 100644 index 552c7423..00000000 --- a/src/h_timer_w.h +++ /dev/null @@ -1,533 +0,0 @@ -// -*- mode: c++; c-basic-offset: 4 indent-tabs-mode: nil -*- */ -// -// Copyright 2016 Juho Snellman, released under a MIT license (see -// LICENSE). -// -// A timer queue which allows events to be scheduled for execution -// at some later point. Reasons you might want to use this implementation -// instead of some other are: -// -// - A single-file C++11 implementation with no external dependencies. -// - Optimized for high occupancy rates, on the assumption that the -// utilization of the timer queue is proportional to the utilization -// of the system as a whole. When a tradeoff needs to be made -// between efficiency of one operation at a low occupancy rate and -// another operation at a high rate, we choose the latter. -// - Tries to minimize the cost of event rescheduling or cancelation, -// on the assumption that a large percentage of events will never -// be triggered. The implementation avoids unnecessary work when an -// event is rescheduled, and provides a way for the user specify a -// range of acceptable execution times instead of just an exact one. -// - Facility for limiting the number of events to execute on a -// single invocation, to allow fine grained interleaving of timer -// processing and application logic. -// - An interface that at least the author finds convenient. -// -// The exact implementation strategy is a hierarchical timer -// wheel. A timer wheel is effectively a ring buffer of linked lists -// of events, and a pointer to the ring buffer. As the time advances, -// the pointer moves forward, and any events in the ring buffer slots -// that the pointer passed will get executed. -// -// A hierarchical timer wheel layers multiple timer wheels running at -// different resolutions on top of each other. When an event is -// scheduled so far in the future than it does not fit the innermost -// (core) wheel, it instead gets scheduled on one of the outer -// wheels. On each rotation of the inner wheel, one slot's worth of -// events are promoted from the second wheel to the core. On each -// rotation of the second wheel, one slot's worth of events is -// promoted from the third wheel to the second, and so on. -// -// The basic usage is to create a single TimerWheel object and -// multiple TimerEvent or MemberTimerEvent objects. The events are -// scheduled for execution using TimerWheel::schedule() or -// TimerWheel::schedule_in_range(), or unscheduled using the event's -// cancel() method. -// -// Example usage: -// -// typedef std::function<void()> Callback; -// TimerWheel timers; -// int count = 0; -// TimerEvent<Callback> timer([&count] () { ++count; }); -// -// timers.schedule(&timer, 5); -// timers.advance(4); -// assert(count == 0); -// timers.advance(1); -// assert(count == 1); -// -// timers.schedule(&timer, 5); -// timer.cancel(); -// timers.advance(4); -// assert(count == 1); -// -// To tie events to specific member functions of an object instead of -// a callback function, use MemberTimerEvent instead of TimerEvent. -// For example: -// -// class Test { -// public: -// Test() : inc_timer_(this) { -// } -// void start(TimerWheel* timers) { -// timers->schedule(&inc_timer_, 10); -// } -// void on_inc() { -// count_++; -// } -// int count() { return count_; } -// private: -// MemberTimerEvent<Test, &Test::on_inc> inc_timer_; -// int count_ = 0; -// }; - -#ifndef RATAS_TIMER_WHEEL_H -#define RATAS_TIMER_WHEEL_H - -#include <cassert> -#include <cstdlib> -#include <cstdint> -#include <cstdio> -#include <limits> -#include <memory> - -typedef uint64_t tick_t; - -class TimerWheelSlot; -class TimerWheel; - -// An abstract class representing an event that can be scheduled to -// happen at some later time. -struct TimerEventInterface { -public: - // Unschedule this event. It's safe to cancel an event that is inactive. - inline void cancel(); - - // Return true iff the event is currently scheduled for execution. - bool active() const { - return slot_ != NULL; - } - - // Return the absolute tick this event is scheduled to be executed on. - tick_t scheduled_at() const { return scheduled_at_; } - -private: - TimerEventInterface(const TimerEventInterface& other) = delete; - TimerEventInterface& operator=(const TimerEventInterface& other) = delete; - friend TimerWheelSlot; - friend TimerWheel; - - - void set_scheduled_at(tick_t ts) { scheduled_at_ = ts; } - // Move the event to another slot. (It's safe for either the current - // or new slot to be NULL). - inline void relink(TimerWheelSlot* slot); - -private: - /* one CACHELINE 32 byte in 64bit processor */ - - tick_t scheduled_at_; - // The slot this event is currently in (NULL if not currently scheduled). - TimerWheelSlot* slot_ = NULL; - // The events are linked together in the slot using an internal - // doubly-linked list; this iterator does double duty as the - // linked list node for this event. - TimerEventInterface* next_ = NULL; - TimerEventInterface* prev_ = NULL; -}; - -#if 0 -// An event that takes the callback (of type CBType) to execute as -// a constructor parameter. -template<typename CBType> -class TimerEvent : public TimerEventInterface { -public: - explicit TimerEvent<CBType>(const CBType& callback) - : callback_(callback) { - } - - void execute() { - callback_(); - } - -private: - TimerEvent<CBType>(const TimerEvent<CBType>& other) = delete; - TimerEvent<CBType>& operator=(const TimerEvent<CBType>& other) = delete; - CBType callback_; -}; - -// An event that's specialized with a (static) member function of class T, -// and a dynamic instance of T. Event execution causes an invocation of the -// member function on the instance. -template<typename T, void(T::*MFun)() > -class MemberTimerEvent : public TimerEventInterface { -public: - MemberTimerEvent(T* obj) : obj_(obj) { - } - - virtual void execute () { - (obj_->*MFun)(); - } - -private: - T* obj_; -}; - -#endif - -// Purely an implementation detail. -class TimerWheelSlot { -public: - TimerWheelSlot() { - } - -private: - // Return the first event queued in this slot. - const TimerEventInterface* events() const { return events_; } - // Deque the first event from the slot, and return it. - TimerEventInterface* pop_event() { - auto event = events_; - events_ = event->next_; - if (events_) { - events_->prev_ = NULL; - } - event->next_ = NULL; - event->slot_ = NULL; - return event; - } - - TimerWheelSlot(const TimerWheelSlot& other) = delete; - TimerWheelSlot& operator=(const TimerWheelSlot& other) = delete; - friend TimerEventInterface; - friend TimerWheel; - - // Doubly linked (inferior) list of events. - TimerEventInterface* events_ = NULL; -}; - -// A TimerWheel is the entity that TimerEvents can be scheduled on -// for execution (with schedule() or schedule_in_range()), and will -// eventually be executed once the time advances far enough with the -// advance() method. - -class TimerWheel { -public: - bool Create(tick_t now = 0){ - for (int i = 0; i < NUM_LEVELS; ++i) { - now_[i] = now >> (WIDTH_BITS * i); - } - ticks_pending_ = 0; - } - void Delete(){ - } - - // Advance the TimerWheel by the specified number of ticks, and execute - // any events scheduled for execution at or before that time. The - // number of events executed can be restricted using the max_execute - // parameter. If that limit is reached, the function will return false, - // and the excess events will be processed on a subsequent call. - // - // - It is safe to cancel or schedule events from within event callbacks. - // - During the execution of the callback the observable event tick will - // be the tick it was scheduled to run on; not the tick the clock will - // be advanced to. - // - Events will happen in order; all events scheduled for tick X will - // be executed before any event scheduled for tick X+1. - // - // Delta should be non-0. The only exception is if the previous - // call to advance() returned false. - // - // advance() should not be called from an event callback. - inline bool advance(tick_t delta, - size_t max_execute=std::numeric_limits<size_t>::max(), - int level = 0); - - // Schedule the event to be executed delta ticks from the current time. - // The delta must be non-0. - inline void schedule(TimerEventInterface* event, tick_t delta); - - // Schedule the event to happen at some time between start and end - // ticks from the current time. The actual time will be determined - // by the TimerWheel to minimize rescheduling and promotion overhead. - // Both start and end must be non-0, and the end must be greater than - // the start. - inline void schedule_in_range(TimerEventInterface* event, - tick_t start, tick_t end); - - // Return the current tick value. Note that if the time increases - // by multiple ticks during a single call to advance(), during the - // execution of the event callback now() will return the tick that - // the event was scheduled to run on. - tick_t now() const { return now_[0]; } - - // Return the number of ticks remaining until the next event will get - // executed. If the max parameter is passed, that will be the maximum - // tick value that gets returned. The max parameter's value will also - // be returned if no events have been scheduled. - // - // Will return 0 if the wheel still has unprocessed events from the - // previous call to advance(). - inline tick_t ticks_to_next_event(tick_t max = std::numeric_limits<tick_t>::max(), - int level = 0); - -private: - TimerWheel(const TimerWheel& other) = delete; - TimerWheel& operator=(const TimerWheel& other) = delete; - - // This handles the actual work of executing event callbacks and - // recursing to the outer wheels. - inline bool process_current_slot(tick_t now, size_t max_execute, int level); - - static const int WIDTH_BITS = 8; - static const int NUM_LEVELS = (64 + WIDTH_BITS - 1) / WIDTH_BITS; - static const int MAX_LEVEL = NUM_LEVELS - 1; - static const int NUM_SLOTS = 1 << WIDTH_BITS; - // A bitmask for looking at just the bits in the timestamp relevant to - // this wheel. - static const int MASK = (NUM_SLOTS - 1); - - // The current timestamp for this wheel. This will be right-shifted - // such that each slot is separated by exactly one tick even on - // the outermost wheels. - tick_t now_[NUM_LEVELS]; - // We've done a partial tick advance. This is how many ticks remain - // unprocessed. - tick_t ticks_pending_; - TimerWheelSlot slots_[NUM_LEVELS][NUM_SLOTS]; -}; - -// Implementation - -inline void TimerEventInterface::relink(TimerWheelSlot* new_slot) { - if (new_slot == slot_) { - return; - } - - // Unlink from old location. - if (slot_) { - auto prev = prev_; - auto next = next_; - if (next) { - next->prev_ = prev; - } - if (prev) { - prev->next_ = next; - } else { - // Must be at head of slot. Move the next item to the head. - slot_->events_ = next; - } - } - - // Insert in new slot. - { - if (new_slot) { - auto old = new_slot->events_; - next_ = old; - if (old) { - old->prev_ = this; - } - new_slot->events_ = this; - } else { - next_ = NULL; - } - prev_ = NULL; - } - slot_ = new_slot; -} - -inline void TimerEventInterface::cancel() { - // It's ok to cancel a event that's not scheduled. - if (!slot_) { - return; - } - - relink(NULL); -} - -inline bool TimerWheel::advance(tick_t delta, size_t max_events, int level) { - if (ticks_pending_) { - if (level == 0) { - // Continue collecting a backlog of ticks to process if - // we're called with non-zero deltas. - ticks_pending_ += delta; - } - // We only partially processed the last tick. Process the - // current slot, rather incrementing like advance() normally - // does. - tick_t now = now_[level]; - if (!process_current_slot(now, max_events, level)) { - // Outer layers are still not done, propagate that information - // back up. - return false; - } - if (level == 0) { - // The core wheel has been fully processed. We can now close - // down the partial tick and pretend that we've just been - // called with a delta containing both the new and original - // amounts. - delta = (ticks_pending_ - 1); - ticks_pending_ = 0; - } else { - return true; - } - } else { - // Zero deltas are only ok when in the middle of a partially - // processed tick. - assert(delta > 0); - } - - while (delta--) { - tick_t now = ++now_[level]; - if (!process_current_slot(now, max_events, level)) { - ticks_pending_ = (delta + 1); - return false; - } - } - return true; -} - -inline bool TimerWheel::process_current_slot(tick_t now, size_t max_events, int level) { - size_t slot_index = now & MASK; - auto slot = &slots_[level][slot_index]; - if (slot_index == 0 && level < MAX_LEVEL) { - if (!advance(1, max_events, level + 1)) { - return false; - } - } - while (slot->events()) { - auto event = slot->pop_event(); - if (level > 0) { - assert((now_[0] & MASK) == 0); - if (now_[0] >= event->scheduled_at()) { - event->execute(); - if (!--max_events) { - return false; - } - } else { - // There's a case to be made that promotion should - // also count as work done. And that would simplify - // this code since the max_events manipulation could - // move to the top of the loop. But it's an order of - // magnitude more expensive to execute a typical - // callback, and promotions will naturally clump while - // events triggering won't. - schedule(event, - event->scheduled_at() - now_[0]); - } - } else { - event->execute(); - if (!--max_events) { - return false; - } - } - } - return true; -} - -inline void TimerWheel::schedule(TimerEventInterface* event, tick_t delta) { - event->set_scheduled_at(now_[0] + delta); - - int level = 0; - while (delta >= NUM_SLOTS) { - delta = (delta + (now_[level] & MASK)) >> WIDTH_BITS; - ++level; - } - - size_t slot_index = (now_[level] + delta) & MASK; - auto slot = &slots_[level][slot_index]; - event->relink(slot); -} - -inline void TimerWheel::schedule_in_range(TimerEventInterface* event, - tick_t start, tick_t end) { - assert(end > start); - if (event->active()) { - auto current = event->scheduled_at() - now_[0]; - // Event is already scheduled to happen in this range. Instead - // of always using the old slot, we could check compute the - // new slot and switch iff it's aligned better than the old one. - // But it seems hard to believe that could be worthwhile. - if (current >= start && current <= end) { - return; - } - } - - // Zero as many bits (in WIDTH_BITS chunks) as possible - // from "end" while still keeping the output in the - // right range. - tick_t mask = ~0; - while ((start & mask) != (end & mask)) { - mask = (mask << WIDTH_BITS); - } - - tick_t delta = end & (mask >> WIDTH_BITS); - - schedule(event, delta); -} - -inline tick_t TimerWheel::ticks_to_next_event(tick_t max, int level) { - if (ticks_pending_) { - return 0; - } - // The actual current time (not the bitshifted time) - tick_t now = now_[0]; - - // Smallest tick (relative to now) we've found. - tick_t min = max; - for (int i = 0; i < NUM_SLOTS; ++i) { - // Note: Unlike the uses of "now", slot index calculations really - // need to use now_. - auto slot_index = (now_[level] + 1 + i) & MASK; - // We've reached slot 0. In normal scheduling this would - // mean advancing the next wheel and promoting or executing - // those events. So we need to look in that slot too - // before proceeding with the rest of this wheel. But we - // can't just accept those results outright, we need to - // check the best result there against the next slot on - // this wheel. - if (slot_index == 0 && level < MAX_LEVEL) { - // Exception: If we're in the core wheel, and slot 0 is - // not empty, there's no point in looking in the outer wheel. - // It's guaranteed that the events actually in slot 0 will be - // executed no later than anything in the outer wheel. - if (level > 0 || !slots_[level][slot_index].events()) { - auto up_slot_index = (now_[level + 1] + 1) & MASK; - const auto& slot = slots_[level + 1][up_slot_index]; - for (auto event = slot.events(); event != NULL; - event = event->next_) { - min = std::min(min, event->scheduled_at() - now); - } - } - } - bool found = false; - const auto& slot = slots_[level][slot_index]; - for (auto event = slot.events(); event != NULL; - event = event->next_) { - min = std::min(min, event->scheduled_at() - now); - // In the core wheel all the events in a slot are guaranteed to - // run at the same time, so it's enough to just look at the first - // one. - if (level == 0) { - return min; - } else { - found = true; - } - } - if (found) { - return min; - } - } - - // Nothing found on this wheel, try the next one (unless the wheel can't - // possibly contain an event scheduled earlier than "max"). - if (level < MAX_LEVEL && - (max >> (WIDTH_BITS * level + 1)) > 0) { - return ticks_to_next_event(max, level + 1); - } - - return max; -} - -#endif // RATAS_TIMER_WHEEL_H - |