forked from yeliqseu/streamc
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathencoder.c
276 lines (262 loc) · 9.43 KB
/
encoder.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
#include "streamcodec.h"
#include "galois.h"
#include "assert.h"
#define ENC_ALLOC 64
// pseudo-random number generator
extern void mt19937_seed(unsigned long s, unsigned long *mt);
extern unsigned long mt19937_randint(unsigned long *mt, int *mti);
static struct packet *_output_repair_packet(struct encoder *ec, int win_s, int win_e);
struct encoder *initialize_encoder(struct parameters *cp, unsigned char *buf, int nbytes)
{
struct encoder *ec = calloc(1, sizeof(struct encoder));
ec->cp = cp;
ec->count = 0;
ec->nextsid = 0;
ec->rcount = 0;
if (nbytes == 0) {
ec->bufsize = ENC_ALLOC;
ec->srcpkt = calloc(ENC_ALLOC, sizeof(GF_ELEMENT*));
ec->snum = 0;
ec->head = -1;
ec->tail = -1;
ec->headsid = -1;
ec->tailsid = -1;
} else {
int snum = ALIGN(nbytes, cp->pktsize);
ec->snum = snum;
int blocks = ALIGN(snum, ENC_ALLOC);
ec->srcpkt = calloc(blocks*ENC_ALLOC, sizeof(GF_ELEMENT*));
ec->bufsize = blocks * ENC_ALLOC;
// Load source packets
int pktsize = ec->cp->pktsize;
int hasread = 0;
for (int i=0; i<snum; i++) {
int toread = (hasread+pktsize) <= nbytes ? pktsize : nbytes-hasread;
ec->srcpkt[i] = calloc(pktsize, sizeof(GF_ELEMENT));
memcpy(ec->srcpkt[i], buf+hasread, toread*sizeof(GF_ELEMENT));
hasread += toread;
}
ec->head = 0;
ec->tail = ec->snum - 1;
ec->headsid = 0;
ec->tailsid = ec->snum - 1;
}
// Construct finite field
constructField(cp->gfpower);
return ec;
}
// Used when a random arrival model of source packets is considered
// It is assumed that the source packet id's are continuous
int enqueue_packet(struct encoder *ec, int sourceid, GF_ELEMENT *syms)
{
int pktsize = ec->cp->pktsize;
int bufsize = ec->bufsize;
int i;
// printf("[Encoder] Enqueuing source packet %d [ head_pos: %d tail_pos: %d headsid: %d tailsid: %d ]\n", sourceid, ec->head, ec->tail, ec->headsid, ec->tailsid);
// the buffer is empty
if (ec->head == -1) {
ec->srcpkt[0] = calloc(pktsize, sizeof(GF_ELEMENT));
memcpy(ec->srcpkt[0], syms, pktsize);
ec->head = 0;
ec->tail = 0;
ec->headsid = sourceid;
ec->tailsid = sourceid;
ec->snum += 1;
//printf("[Encoder] Enqueued source packet %d [ head_pos: %d tail_pos: %d headsid: %d tailsid: %d snum: %d]\n", sourceid, ec->head, ec->tail, ec->headsid, ec->tailsid, ec->snum);
return 0;
}
// if the buffer is full, has to enlarge it before saving the packet
if ((ec->tail+1) % bufsize == ec->head) {
ec->srcpkt = realloc(ec->srcpkt, bufsize*2*sizeof(GF_ELEMENT*));
// initialize the added memory
for (i=0; i<bufsize; i++) {
ec->srcpkt[bufsize+i] = NULL;
}
// re-locate the bufferred source packet pointers so that the packets are consistently stored
if (ec->tail < ec->head) {
for (i=0; i<ec->tail+1; i++) {
ec->srcpkt[bufsize+i] = ec->srcpkt[i];
ec->srcpkt[i] = NULL;
}
ec->tail = ec->head -1 + bufsize;
}
ec->bufsize = bufsize * 2;
DEBUG_PRINT(("[Encoder] Realloc encoder buffer to %d packets\n", ec->bufsize));
}
bufsize = ec->bufsize; // bufsize may have been changed
int pos = (ec->tail + 1) % bufsize; // location to enqueue
ec->srcpkt[pos] = calloc(pktsize, sizeof(GF_ELEMENT));
memcpy(ec->srcpkt[pos], syms, pktsize);
ec->tail = pos;
ec->tailsid = sourceid;
ec->snum += 1;
// printf("[Encoder] Enqueued source packet %d [ head_pos: %d tail_pos: %d headsid: %d tailsid: %d snum: %d]\n", sourceid, ec->head, ec->tail, ec->headsid, ec->tailsid, ec->snum);
return 0;
}
struct packet *output_source_packet(struct encoder *ec)
{
int pos;
int pktsize = ec->cp->pktsize;
struct packet *pkt = calloc(1, sizeof(struct packet));
pkt->syms = calloc(pktsize, sizeof(GF_ELEMENT));
pkt->sourceid = ec->nextsid;
pkt->repairid = -1;
pos = (ec->head + (ec->nextsid - ec->headsid)) % (ec->bufsize);
memcpy(pkt->syms, ec->srcpkt[pos], pktsize*sizeof(GF_ELEMENT));
ec->count += 1;
ec->nextsid += 1;
return pkt;
}
struct packet *_output_repair_packet(struct encoder *ec, int win_s, int win_e)
{
assert(win_e >= win_s);
assert(win_s >= ec->headsid);
assert(win_e <= ec->nextsid - 1);
int i, pos;
int pktsize = ec->cp->pktsize;
struct packet *pkt = calloc(1, sizeof(struct packet));
pkt->syms = calloc(pktsize, sizeof(GF_ELEMENT));
pkt->sourceid = -1;
pkt->repairid = ec->rcount;
pkt->win_s = win_s;
pkt->win_e = win_e;
ec->count += 1;
ec->rcount += 1;
int width = pkt->win_e - pkt->win_s + 1;
pkt->coes = calloc(width, sizeof(GF_ELEMENT));
// init prng using repairid as the seed
ec->prng.mti = N;
mt19937_seed(pkt->repairid*EWIN, ec->prng.mt);
for (i=0; i<width; i++) {
GF_ELEMENT co = mt19937_randint(ec->prng.mt, &ec->prng.mti) % (1 << ec->cp->gfpower);
pkt->coes[i] = co;
// pos = (ec->head + i) % (ec->bufsize);
pos = (ec->head + (i + win_s - ec->headsid)) % (ec->bufsize);
galois_multiply_add_region(pkt->syms, ec->srcpkt[pos], co, pktsize);
}
DEBUG_PRINT(("[Encoder] Transmit repair packet %d across window [%d, %d] n_enc_row_ops: %d\n", pkt->repairid, pkt->win_s, pkt->win_e, width));
return pkt;
}
struct packet *output_repair_packet(struct encoder *ec)
{
return _output_repair_packet(ec, ec->headsid, ec->nextsid-1);
/*
int i, pos;
int pktsize = ec->cp->pktsize;
struct packet *pkt = calloc(1, sizeof(struct packet));
pkt->syms = calloc(pktsize, sizeof(GF_ELEMENT));
pkt->sourceid = -1;
pkt->repairid = ec->rcount;
pkt->win_s = ec->headsid;
pkt->win_e = ec->nextsid - 1;
ec->count += 1;
ec->rcount += 1;
DEBUG_PRINT(("[Encoder] Transmit repair packet %d across window [%d, %d]\n", pkt->repairid, pkt->win_s, pkt->win_e));
int width = pkt->win_e - pkt->win_s + 1;
pkt->coes = calloc(width, sizeof(GF_ELEMENT));
// init prng using repairid as the seed
ec->prng.mti = N;
mt19937_seed(pkt->repairid*EWIN, ec->prng.mt);
for (i=0; i<width; i++) {
GF_ELEMENT co = mt19937_randint(ec->prng.mt, &ec->prng.mti) % (1 << ec->cp->gfpower);
pkt->coes[i] = co;
pos = (ec->head + i) % (ec->bufsize);
galois_multiply_add_region(pkt->syms, ec->srcpkt[pos], co, pktsize);
}
return pkt;
*/
}
struct packet *output_repair_packet_short(struct encoder *ec, int ew_width)
{
int win_s = ec->nextsid-ew_width >= ec->headsid ? ec->nextsid-ew_width : ec->headsid;
return _output_repair_packet(ec, win_s, ec->nextsid-1);
}
void flush_acked_packets(struct encoder *ec, int ack_sid)
{
if (ack_sid < ec->headsid) {
return;
}
int count = 0;
for (int i=ec->headsid; i<=ack_sid; i++) {
int pos = (ec->head + (i - ec->headsid)) % (ec->bufsize);
if (ec->srcpkt[pos] != NULL) {
free(ec->srcpkt[pos]);
ec->srcpkt[pos] = NULL;
count += 1;
}
}
DEBUG_PRINT(("[Encoder] Receive ACK i_ord=%d. Current encoding window: [%d %d]. Window width: %d\n", ack_sid, ec->headsid, ec->nextsid-1, (ec->nextsid-ec->headsid)));
DEBUG_PRINT(("[Encoder] %d source packets up to no. %d are flushed from sending queue\n", count, ack_sid));
if (ack_sid == ec->tailsid) {
// all the buffered packets are flushed
ec->head = -1;
ec->tail = -1;
ec->headsid = -1;
ec->tailsid = -1;
} else {
ec->head = (ec->head + (ack_sid - ec->headsid + 1)) % (ec->bufsize);
ec->headsid = ack_sid + 1;
}
return;
}
unsigned char *serialize_packet(struct encoder *ec, struct packet *pkt)
{
int sym_len = ec->cp->pktsize;
int strlen = sizeof(int) * 4 + sym_len;
unsigned char *pktstr = calloc(strlen, sizeof(unsigned char));
memcpy(pktstr, &pkt->sourceid, sizeof(int));
memcpy(pktstr+sizeof(int), &pkt->repairid, sizeof(int));
memcpy(pktstr+sizeof(int)*2, &pkt->win_s, sizeof(int));
memcpy(pktstr+sizeof(int)*3, &pkt->win_e, sizeof(int));
memcpy(pktstr+sizeof(int)*4, pkt->syms, sym_len);
return pktstr;
}
void visualize_buffer(struct encoder *ec)
{
printf("enqueued: %d\nbufsize: %d\nnextsid: %d\nbuffered: %d\n",\
ec->snum, ec->bufsize, ec->nextsid, ec->tailsid-ec->headsid+1);
int i;
printf("Buffered SourceID:\t");
for (i=ec->headsid; i<=ec->tailsid; i++) {
printf(" %d\t ", i);
}
printf("\nBuffer Position:\t");
for (i=ec->headsid; i<=ec->tailsid; i++) {
printf(" %d\t ", (ec->head + i - ec->headsid) % (ec->bufsize) );
}
printf("\n");
return;
}
void free_packet(struct packet *pkt)
{
if (pkt->coes != NULL)
free(pkt->coes);
if (pkt->syms != NULL)
free(pkt->syms);
free(pkt);
pkt = NULL;
return;
}
void free_serialized_packet(unsigned char *pktstr)
{
if (pktstr != NULL) {
free(pktstr);
pktstr = NULL;
}
return;
}
void free_encoder(struct encoder *ec)
{
assert(ec!=NULL);
//free(ec->cp); // not malloced, no need to free
for (int i=0; i<ec->bufsize; i++) {
if (ec->srcpkt[i]!= NULL) {
free(ec->srcpkt[i]);
ec->srcpkt[i] = NULL;
}
}
free(ec->srcpkt);
free(ec);
ec = NULL;
return;
}