forked from kangjianwei/LearningJDK
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathFlow.java
338 lines (323 loc) · 14.5 KB
/
Flow.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* This file is available under and governed by the GNU General Public
* License version 2 only, as published by the Free Software Foundation.
* However, the following notice accompanied the original version of this
* file:
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent;
/**
* Interrelated interfaces and static methods for establishing
* flow-controlled components in which {@link Publisher Publishers}
* produce items consumed by one or more {@link Subscriber
* Subscribers}, each managed by a {@link Subscription
* Subscription}.
*
* <p>These interfaces correspond to the <a
* href="http://www.reactive-streams.org/"> reactive-streams</a>
* specification. They apply in both concurrent and distributed
* asynchronous settings: All (seven) methods are defined in {@code
* void} "one-way" message style. Communication relies on a simple form
* of flow control (method {@link Subscription#request}) that can be
* used to avoid resource management problems that may otherwise occur
* in "push" based systems.
*
* <p><b>Examples.</b> A {@link Publisher} usually defines its own
* {@link Subscription} implementation; constructing one in method
* {@code subscribe} and issuing it to the calling {@link
* Subscriber}. It publishes items to the subscriber asynchronously,
* normally using an {@link Executor}. For example, here is a very
* simple publisher that only issues (when requested) a single {@code
* TRUE} item to a single subscriber. Because the subscriber receives
* only a single item, this class does not use buffering and ordering
* control required in most implementations (for example {@link
* SubmissionPublisher}).
*
* <pre> {@code
* class OneShotPublisher implements Publisher<Boolean> {
* private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
* private boolean subscribed; // true after first subscribe
* public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
* if (subscribed)
* subscriber.onError(new IllegalStateException()); // only one allowed
* else {
* subscribed = true;
* subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
* }
* }
* static class OneShotSubscription implements Subscription {
* private final Subscriber<? super Boolean> subscriber;
* private final ExecutorService executor;
* private Future<?> future; // to allow cancellation
* private boolean completed;
* OneShotSubscription(Subscriber<? super Boolean> subscriber,
* ExecutorService executor) {
* this.subscriber = subscriber;
* this.executor = executor;
* }
* public synchronized void request(long n) {
* if (!completed) {
* completed = true;
* if (n <= 0) {
* IllegalArgumentException ex = new IllegalArgumentException();
* executor.execute(() -> subscriber.onError(ex));
* } else {
* future = executor.submit(() -> {
* subscriber.onNext(Boolean.TRUE);
* subscriber.onComplete();
* });
* }
* }
* }
* public synchronized void cancel() {
* completed = true;
* if (future != null) future.cancel(false);
* }
* }
* }}</pre>
*
* <p>A {@link Subscriber} arranges that items be requested and
* processed. Items (invocations of {@link Subscriber#onNext}) are
* not issued unless requested, but multiple items may be requested.
* Many Subscriber implementations can arrange this in the style of
* the following example, where a buffer size of 1 single-steps, and
* larger sizes usually allow for more efficient overlapped processing
* with less communication; for example with a value of 64, this keeps
* total outstanding requests between 32 and 64.
* Because Subscriber method invocations for a given {@link
* Subscription} are strictly ordered, there is no need for these
* methods to use locks or volatiles unless a Subscriber maintains
* multiple Subscriptions (in which case it is better to instead
* define multiple Subscribers, each with its own Subscription).
*
* <pre> {@code
* class SampleSubscriber<T> implements Subscriber<T> {
* final Consumer<? super T> consumer;
* Subscription subscription;
* final long bufferSize;
* long count;
* SampleSubscriber(long bufferSize, Consumer<? super T> consumer) {
* this.bufferSize = bufferSize;
* this.consumer = consumer;
* }
* public void onSubscribe(Subscription subscription) {
* long initialRequestSize = bufferSize;
* count = bufferSize - bufferSize / 2; // re-request when half consumed
* (this.subscription = subscription).request(initialRequestSize);
* }
* public void onNext(T item) {
* if (--count <= 0)
* subscription.request(count = bufferSize - bufferSize / 2);
* consumer.accept(item);
* }
* public void onError(Throwable ex) { ex.printStackTrace(); }
* public void onComplete() {}
* }}</pre>
*
* <p>The default value of {@link #defaultBufferSize} may provide a
* useful starting point for choosing request sizes and capacities in
* Flow components based on expected rates, resources, and usages.
* Or, when flow control is never needed, a subscriber may initially
* request an effectively unbounded number of items, as in:
*
* <pre> {@code
* class UnboundedSubscriber<T> implements Subscriber<T> {
* public void onSubscribe(Subscription subscription) {
* subscription.request(Long.MAX_VALUE); // effectively unbounded
* }
* public void onNext(T item) { use(item); }
* public void onError(Throwable ex) { ex.printStackTrace(); }
* public void onComplete() {}
* void use(T item) { ... }
* }}</pre>
*
* @author Doug Lea
* @since 9
*/
/*
* 响应式编程接口,完成【生产-消费】/【发布-订阅】/【推送-接收】模式的编程
*
* 注:以下的发布统称为生产,订阅统称为消费
*
* Publisher -【生产】消息和【控制】信号,以下简称【生产者】
* Subscriber -【消费】消息和信号,以下简称【消费者】
* Subscription -【链接】Publisher和Subscriber,以下简称【中介】
* Processor - 执行一些高级操作,如【转换】Publisher到Subscriber
*/
public final class Flow {
static final int DEFAULT_BUFFER_SIZE = 256;
private Flow() {
// uninstantiable
}
/**
* Returns a default value for Publisher or Subscriber buffering,
* that may be used in the absence of other constraints.
*
* @return the buffer size value
*
* @implNote The current value returned is 256.
*/
public static int defaultBufferSize() {
return DEFAULT_BUFFER_SIZE;
}
/**
* A producer of items (and related control messages) received by Subscribers.
* Each current {@link Subscriber} receives the same items (via method {@code onNext}) in the same order,
* unless drops or errors are encountered.
* If a Publisher encounters an error that does not allow items to be issued to a Subscriber,
* that Subscriber receives {@code onError}, and then receives no further messages.
* Otherwise, when it is known that no further messages will be issued to it, a subscriber receives {@code onComplete}.
* Publishers ensure that Subscriber method invocations for each subscription are strictly ordered in <a href="package-summary.html#MemoryVisibility"><i>happens-before</i></a> order.
*
* Publishers may vary in policy about whether drops (failures to issue an item because of resource limitations) are treated as unrecoverable errors.
* Publishers may also vary about whether Subscribers receive items that were produced or available before they subscribed.
*
* @param <T> the published item type
*/
// 生产者,【生产】消息和【控制】信号
@FunctionalInterface
public interface Publisher<T> {
/**
* Adds the given Subscriber if possible. If already
* subscribed, or the attempt to subscribe fails due to policy
* violations or errors, the Subscriber's {@code onError}
* method is invoked with an {@link IllegalStateException}.
* Otherwise, the Subscriber's {@code onSubscribe} method is
* invoked with a new {@link Subscription}. Subscribers may
* enable receiving items by invoking the {@code request}
* method of this Subscription, and may unsubscribe by
* invoking its {@code cancel} method.
*
* @param subscriber the subscriber
*
* @throws NullPointerException if subscriber is null
*/
// 注册消费者
void subscribe(Subscriber<? super T> subscriber);
}
/**
* A receiver of messages.
* The methods in this interface are invoked in strict sequential order for each {@link Subscription}.
*
* @param <T> the subscribed item type
*/
// 消费者,【消费】消息和信号
public interface Subscriber<T> {
/**
* Method invoked prior to invoking any other Subscriber
* methods for the given Subscription. If this method throws
* an exception, resulting behavior is not guaranteed, but may
* cause the Subscription not to be established or to be cancelled.
*
* <p>Typically, implementations of this method invoke {@code
* subscription.request} to enable receiving items.
*
* @param subscription a new subscription
*/
/*
* 中介已准备就绪时的回调(中介刚刚设置了OPEN标记)
* 一般在这里需要拿到中介的引用,进而向生产者发出消费请求
*/
void onSubscribe(Subscription subscription);
/**
* Method invoked with a Subscription's next item. If this
* method throws an exception, resulting behavior is not
* guaranteed, but may cause the Subscription to be cancelled.
*
* @param item the item
*/
/*
* 消费者开始消费指定的消息(item)
* 一般在这里消费完成后,需要继续向生产者发出消费请求
*/
void onNext(T item);
/**
* Method invoked when it is known that no additional
* Subscriber method invocations will occur for a Subscription
* that is not already terminated by error, after which no
* other Subscriber methods are invoked by the Subscription.
* If this method throws an exception, resulting behavior is
* undefined.
*/
// (从中介内部调用)如果消费者在消费的时候,消息队列已经消费完了,且中介已被关闭时,回调此方法
void onComplete();
/**
* Method invoked upon an unrecoverable error encountered by a
* Publisher or Subscription, after which no other Subscriber
* methods are invoked by the Subscription. If this method
* itself throws an exception, resulting behavior is
* undefined.
*
* @param throwable the exception
*/
// (从中介内部调用)如果消费者在注册时(onSubscribe()内部)发生了异常,或者在消费中(onNext()内部)发生了异常,回调此方法
void onError(Throwable throwable);
}
/**
* Message control linking a {@link Publisher} and {@link Subscriber}.
* Subscribers receive items only when requested, and may cancel at any time.
* The methods in this interface are intended to be invoked only by their Subscribers;
* usages in other contexts have undefined effects.
*/
// 中介,内部包含注册的消费者,自身注册在生产者内。用于连接生产者与消费者,为它们提供通讯渠道
public interface Subscription {
/**
* Adds the given number {@code n} of items to the current
* unfulfilled demand for this subscription. If {@code n} is
* less than or equal to zero, the Subscriber will receive an
* {@code onError} signal with an {@link
* IllegalArgumentException} argument. Otherwise, the
* Subscriber will receive up to {@code n} additional {@code
* onNext} invocations (or fewer if terminated).
*
* @param n the increment of demand; a value of {@code
* Long.MAX_VALUE} may be considered as effectively unbounded
*/
// (消费者通过中介发出)消费请求
void request(long n);
/**
* Causes the Subscriber to (eventually) stop receiving messages.
* Implementation is best-effort -- additional
* messages may be received after invoking this method.
* A cancelled subscription need not ever receive an
* {@code onComplete} or {@code onError} signal.
*/
// (尽力)使消费者停止接收消息
void cancel();
}
/**
* A component that acts as both a Subscriber and Publisher.
*
* @param <T> the subscribed item type
* @param <R> the published item type
*/
// 执行一些高级操作,如【转换】Publisher到Subscriber
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
}