-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathcopy_buckets.c
343 lines (266 loc) · 10.1 KB
/
copy_buckets.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
/* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*/
#include <apr_pools.h>
#include "serf.h"
#include "serf_private.h"
#include "serf_bucket_util.h"
#define IOVEC_HOLD_COUNT 16
typedef struct copy_context_t {
serf_bucket_t *wrapped;
/* When reading, this defines the amount of data that we should grab
from the wrapped bucket. */
apr_size_t min_size;
/* In order to reach MIN_SIZE, we may sometimes make copies of the
data to reach that size. HOLD_BUF (if not NULL) is a buffer of
MIN_SIZE length to hold/concatenate that data.
HOLD_BUF remains NULL until the buffer is actually required. */
char *hold_buf;
} copy_context_t;
serf_bucket_t *serf_bucket_copy_create(
serf_bucket_t *wrapped,
apr_size_t min_size,
serf_bucket_alloc_t *allocator)
{
copy_context_t *ctx;
ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
ctx->wrapped = wrapped;
ctx->min_size = min_size;
ctx->hold_buf = NULL;
return serf_bucket_create(&serf_bucket_type_copy, allocator, ctx);
}
void serf__copy_iovec(char *data,
apr_size_t *copied,
struct iovec *vecs,
int vecs_used)
{
int i;
apr_size_t sz = 0;
for (i = 0; i < vecs_used; i++) {
memcpy(data, vecs[i].iov_base, vecs[i].iov_len);
data += vecs[i].iov_len;
sz += vecs[i].iov_len;
}
if (copied)
*copied = sz;
}
static apr_status_t serf_copy_read(serf_bucket_t *bucket,
apr_size_t requested,
const char **data, apr_size_t *len)
{
copy_context_t *ctx = bucket->data;
apr_status_t status;
const char *wdata;
apr_size_t peek_len;
apr_size_t fetched;
status = serf_bucket_peek(ctx->wrapped, &wdata, &peek_len);
if (SERF_BUCKET_READ_ERROR(status)) {
*len = 0;
return status;
}
/* Can we just return the peeked result? */
if (status || requested <= peek_len || ctx->min_size <= peek_len) {
return serf_bucket_read(ctx->wrapped, requested, data, len);
}
/* Reduce requested to fit in our buffer */
if (requested > ctx->min_size)
requested = ctx->min_size;
fetched = 0;
while (fetched < requested) {
struct iovec vecs[16];
int vecs_used;
apr_size_t read;
status = serf_bucket_read_iovec(ctx->wrapped, requested - fetched,
16, vecs, &vecs_used);
if (SERF_BUCKET_READ_ERROR(status)) {
if (fetched > 0)
status = APR_EAGAIN;
break;
}
else if (!fetched && vecs_used == 1
&& (status || (vecs[0].iov_len == requested))) {
/* Easy out
* We don't have anything stashed
* We only have one buffer to return
* And either
- We can't read any further at this time
- Or the buffer is already filled
*/
*data = vecs[0].iov_base;
*len = vecs[0].iov_len;
return status;
}
else if (!ctx->hold_buf && vecs_used > 0) {
/* We have something that we want to store */
ctx->hold_buf = serf_bucket_mem_alloc(bucket->allocator,
ctx->min_size);
}
serf__copy_iovec(ctx->hold_buf + fetched, &read, vecs, vecs_used);
fetched += read;
if (status)
break;
}
*data = ctx->hold_buf;
*len = fetched;
return status;
}
static apr_status_t serf_copy_readline(serf_bucket_t *bucket,
int acceptable, int *found,
const char **data, apr_size_t *len)
{
copy_context_t *ctx = bucket->data;
/* Disregard MIN_SIZE. a "line" could very well be shorter. Just
delegate this to the wrapped bucket. */
return serf_bucket_readline(ctx->wrapped, acceptable, found, data, len);
}
static apr_status_t serf_copy_read_iovec(serf_bucket_t *bucket,
apr_size_t requested,
int vecs_size,
struct iovec *vecs,
int *vecs_used)
{
copy_context_t *ctx = bucket->data;
apr_status_t status;
apr_size_t total;
apr_size_t fetched;
int i;
/* If somebody really wants to call us for 1 iovec, call the function
that already implements the copying for this */
if (vecs_size == 1) {
const char *data;
apr_size_t len;
*vecs_used = 1;
status = serf_copy_read(bucket, requested, &data, &len);
vecs[0].iov_base = (void*)data;
vecs[0].iov_len = len;
*vecs_used = 1;
return status;
}
status = serf_bucket_read_iovec(ctx->wrapped, requested,
vecs_size, vecs, vecs_used);
/* There are four possible results:
EOF: if the wrapped bucket is done, then we must be done, too. it is
quite possible we'll return less than MIN_SIZE, but with EOF, there
is no way we'll be able to return that.
EAGAIN: we cannot possibly read more (right now), so return. whatever
was read, it is all we have, whether we met MIN_SIZE or not.
error: any other error will prevent us from further work; return it.
SUCCESS: we read a portion, and the bucket says we can read more.
For all but SUCCESS, we simply return the status. We're done now. */
if (status)
return status;
/* How much was read on this pass? */
for (total = 0, i = *vecs_used; i-- > 0; )
total += vecs[i].iov_len;
/* The IOVEC holds at least MIN_SIZE data, so we're good. Or, it
holds precisely the amount requested, so we shouldn't try to
gather/accumulate more data. */
if (total >= ctx->min_size || total == requested)
return APR_SUCCESS;
/* TOTAL < REQUESTED. TOTAL < MIN_SIZE. We should try and fetch more. */
/* Copy what we have into our buffer. Then continue reading to get at
least MIN_SIZE or REQUESTED bytes of data. */
if (! ctx->hold_buf)
ctx->hold_buf = serf_bucket_mem_alloc(bucket->allocator,
ctx->min_size);
/* ### copy into HOLD_BUF. then read/append some more. */
fetched = total;
serf__copy_iovec(ctx->hold_buf, NULL, vecs, *vecs_used);
/* ### point vecs[0] at HOLD_BUF. */
vecs[0].iov_base = ctx->hold_buf;
vecs[0].iov_len = fetched;
while (TRUE) {
int v_used;
status = serf_bucket_read_iovec(ctx->wrapped, requested - fetched,
vecs_size - 1, &vecs[1], &v_used);
if (SERF_BUCKET_READ_ERROR(status)) {
*vecs_used = 1;
return APR_EAGAIN;
}
for (i = 1; i <= v_used; i++)
total += vecs[i].iov_len;
if (status || total >= ctx->min_size || total == requested) {
*vecs_used = v_used + 1;
return status;
}
serf__copy_iovec(ctx->hold_buf + fetched, NULL, &vecs[1], v_used);
fetched += total;
vecs[0].iov_len = fetched;
}
}
static apr_status_t serf_copy_read_for_sendfile(
serf_bucket_t *bucket,
apr_size_t requested,
apr_hdtr_t *hdtr,
apr_file_t **file,
apr_off_t *offset,
apr_size_t *len)
{
copy_context_t *ctx = bucket->data;
return serf_bucket_read_for_sendfile(ctx->wrapped, requested,
hdtr, file, offset, len);
}
static serf_bucket_t *serf_copy_read_bucket(
serf_bucket_t *bucket,
const serf_bucket_type_t *type)
{
copy_context_t *ctx = bucket->data;
return serf_bucket_read_bucket(ctx->wrapped, type);
}
static apr_status_t serf_copy_peek(serf_bucket_t *bucket,
const char **data,
apr_size_t *len)
{
copy_context_t *ctx = bucket->data;
return serf_bucket_peek(ctx->wrapped, data, len);
}
static apr_uint64_t serf_copy_get_remaining(serf_bucket_t *bucket)
{
copy_context_t *ctx = bucket->data;
return serf_bucket_get_remaining(ctx->wrapped);
}
static void serf_copy_destroy(serf_bucket_t *bucket)
{
copy_context_t *ctx = bucket->data;
if (ctx->hold_buf)
serf_bucket_mem_free(bucket->allocator, ctx->hold_buf);
serf_bucket_destroy(ctx->wrapped);
serf_default_destroy_and_data(bucket);
}
static apr_status_t serf_copy_set_config(serf_bucket_t *bucket,
serf_config_t *config)
{
/* This bucket doesn't need/update any shared config, but we need to pass
it along to our wrapped bucket. */
copy_context_t *ctx = bucket->data;
return serf_bucket_set_config(ctx->wrapped, config);
}
const serf_bucket_type_t serf_bucket_type_copy = {
"COPY",
serf_copy_read,
serf_copy_readline,
serf_copy_read_iovec,
serf_copy_read_for_sendfile,
serf_buckets_are_v2,
serf_copy_peek,
serf_copy_destroy,
serf_copy_read_bucket,
serf_copy_get_remaining,
serf_copy_set_config,
};