-
-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathinput.d
127 lines (102 loc) · 2.74 KB
/
input.d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
module jin.go.input;
import jin.go.channel;
import jin.go.output;
import jin.go.await;
import std.range;
/// Round robin input channel.
/// Implements InputRange.
struct Input(Message)
{
alias Pair = Output;
mixin Channel!Message;
/// Minimum count of pending messages.
/// Negative value - new messages will never provided.
ptrdiff_t pending()
{
ptrdiff_t pending = -1;
if (this.queues.length == 0)
return pending;
foreach (i; this.queues.length.iota)
{
const queue = this.queues[this.current];
auto pending2 = queue.pending;
if (pending2 > 0)
{
return pending2;
}
if (pending2 < 0)
{
this.currentUnlink();
continue;
}
pending = 0;
this.current = (this.current + 1) % this.queues.length;
}
return pending;
}
/// True when no more messages will be consumed.
bool empty()
{
return this.pending == -1;
}
/// Get message from current non empty Queue or wait.
/// `pending` must be checked before.
Message front()
{
const pending = this.pending.await;
assert(pending != -1, "Message will never be produced");
return this.queues[this.current].front;
}
/// Consume current pending message and switch to another Queue.
/// `pending` must be checked before.
void popFront()
{
assert(this.pending > 0, "Channel is empty");
const current = this.current;
this.queues[current].popFront;
this.current = (current + 1) % this.queues.length;
}
/// Consumes current message;
Message next()
{
auto value = this.front;
this.popFront;
return value;
}
/// Iterates over all messages.
/// Example: `foreach(msg : chan) {...}`
int opApply(int delegate(Message) proceed)
{
for (;;)
{
const pending = this.pending.await;
if (pending == -1)
return 0;
auto queue = this.queues[this.current];
foreach (i; pending.iota)
{
auto result = proceed(queue.front);
queue.popFront;
if (result)
return result;
}
}
}
/// Collects all messages to array.
/// Example: `chan[]`
Message[] opSlice()
{
Message[] list;
foreach (msg; this)
list ~= msg;
return list;
}
/// Fix all cursors on destroy.
~this()
{
if (this.immortal)
return;
foreach (queue; this.queues)
queue.consumer.finalize();
}
}