forked from wirenboard/wb-mqtt-serial
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpoll_plan.cpp
146 lines (133 loc) · 4.78 KB
/
poll_plan.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#include "poll_plan.h"
#undef POLL_PLAN_DEBUG
#ifdef POLL_PLAN_DEBUG
#include <iostream>
#endif
void TPollPlan::TQueueItem::Update(const std::chrono::milliseconds& new_interval,
const std::chrono::milliseconds& request_duration)
{
// http://www.daycounter.com/LabBook/Moving-Average.phtml
PollCountAtLeast++;
if (PollCountAtLeast > 1) {
PollIntervalSum += new_interval;
if (PollCountAtLeast > PollIntervalAveragingWindow) {
PollIntervalSum -= AvgPollInterval;
PollCountAtLeast = PollIntervalAveragingWindow;
}
AvgPollInterval = PollIntervalSum / PollCountAtLeast;
}
RequestDuration = request_duration;
#ifdef POLL_PLAN_DEBUG
std::cout << "Poll interval " << PollInterval.count() << ": CurrentTime is " <<
std::chrono::duration_cast<std::chrono::milliseconds>(CurrentTime->time_since_epoch()).count() <<
". Was scheduled at " <<
std::chrono::duration_cast<std::chrono::milliseconds>(DueAt.time_since_epoch()).count() <<
". Rescheduling to " <<
std::chrono::duration_cast<std::chrono::milliseconds>((*CurrentTime + PollInterval).time_since_epoch()).count() <<
std::endl;
#endif
DueAt = *CurrentTime + PollInterval;
}
int TPollPlan::TQueueItem::Importance() const
{
long long v = std::chrono::duration_cast<std::chrono::milliseconds>(*CurrentTime - DueAt).count() * 1000;
if (PollInterval != std::chrono::milliseconds::zero()) {
// the longer is poll interval, the lower is priority
v /= PollInterval.count();
if (AvgPollInterval != std::chrono::milliseconds::zero()) {
// If average poll interval is considerably greater than the requested poll
// interval, give the entry more priority to compensate for this problem
v *= AvgPollInterval.count();
v /= PollInterval.count();
}
}
if (RequestDuration != std::chrono::milliseconds::zero() &&
*AvgRequestDuration != std::chrono::milliseconds::zero()) {
// requests that require more time have lower priority
v *= AvgRequestDuration->count();
v /= RequestDuration.count();
}
return v;
}
TPollPlan::TPollPlan(TClockFunc clock_func): ClockFunc(clock_func)
{
CurrentTime = ClockFunc();
}
void TPollPlan::AddEntry(const PPollEntry& entry)
{
Queue.emplace(std::make_shared<TQueueItem>(&CurrentTime, &AvgRequestDuration, entry, Queue.size()));
}
void TPollPlan::ProcessPending(const TCallback& callback)
{
CurrentTime = ClockFunc();
while (!PendingItems.empty())
PendingItems.pop();
#ifdef POLL_PLAN_DEBUG
if (!Queue.empty())
std::cout << "top due at " <<
std::chrono::duration_cast<std::chrono::milliseconds>(Queue.top()->DueAt.time_since_epoch()).count() <<
"; now is " <<
std::chrono::duration_cast<std::chrono::milliseconds>(CurrentTime.time_since_epoch()).count() <<
std::endl;
#endif
while (!Queue.empty() && Queue.top()->DueAt <= CurrentTime) {
PendingItems.push(Queue.top());
Queue.pop();
}
int n = 0;
std::chrono::milliseconds avg_duration = std::chrono::milliseconds::zero();
while (!PendingItems.empty()) {
auto item = PendingItems.top();
auto start = ClockFunc();
callback(item->Entry);
auto request_duration = std::chrono::duration_cast<std::chrono::milliseconds>(ClockFunc() - start);
item->Update(item->PollCountAtLeast > 1 ?
std::chrono::duration_cast<std::chrono::milliseconds>(start - item->LastPollAt) :
std::chrono::milliseconds(0),
request_duration);
avg_duration += request_duration;
++n;
Queue.push(item);
PendingItems.pop();
}
if (n > 0)
AvgRequestDuration = std::chrono::milliseconds(avg_duration.count() / n);
}
bool TPollPlan::PollIsDue()
{
CurrentTime = ClockFunc();
return Queue.top()->DueAt <= CurrentTime;
}
TPollPlan::TTimePoint TPollPlan::GetNextPollTimePoint()
{
if (Queue.empty())
return TTimePoint::max();
if (PollIsDue())
return TTimePoint(CurrentTime);
return Queue.top()->DueAt;
}
void TPollPlan::Reset()
{
AvgRequestDuration = std::chrono::milliseconds::zero();
while (!PendingItems.empty())
PendingItems.pop();
while (!Queue.empty())
Queue.pop();
}
void TPollPlan::Modify(std::function<bool(const PPollEntry & entry)> && thunk)
{
std::queue<PQueueItem> items;
while (!Queue.empty()) {
auto item = Queue.top();
if (!thunk(item->Entry)) {
items.push(item);
Queue.pop();
} else {
break;
}
}
while (!items.empty()) {
Queue.push(items.front());
items.pop();
}
}