-
Notifications
You must be signed in to change notification settings - Fork 1
/
dataServer.cpp
328 lines (270 loc) · 12.6 KB
/
dataServer.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
#include <iostream>
#include <stdlib.h>
#include <string>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <time.h>
#include <dirent.h>
#include <iostream>
#include <queue>
#include <pthread.h>
#include <deque>
#include <map>
// Custom print error function
#define perror2(s, e) fprintf(stderr, "%s: %s\n", s, strerror(e))
using namespace std;
// Custom print error function
void perror_exit(string message){
perror(message.c_str());
exit(EXIT_FAILURE);
}
// An item in the queue contains the name of the file and the socket to which it must be sent
typedef struct queue_item{
char file_name[4096];
FILE* sock_fp;
} queueItem;
///////////////////////////////////////////////////////////////////////////////////////////////////////
////////// GLOBAL VARIABLES TO BE SHARED AMONG ALL THREADS AND FUNCTIONS //////////////////////////////
///////////// AN APPROACH USING ARGUMENTS WAS MUCH MUCH MORE COMPLEX //////////////////////////////////
///////////////////////////////////////////////////////////////////////////////////////////////////////
///
int block_size = 0; // Block size in bytes for the files that the workers send ///
///
pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER; // mutex for accessing the queue ///
int queue_size = 0; ///
queue<queueItem> files_queue; // The queue containing the files to be sent ///
///
// A map to match sockets with the number of files that must be sent through each socket ///
// We need it in order to know when to close the socket (i.e. when files for this socket reach 0) ///
map<FILE*, int> files_per_socket; ///
///
///////////////////////////////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////////////////////////////
// send a file fp to a client through sock_fp using write() (size bytes in every write() call)
int write_file_to_client(FILE* sock_fp, FILE* fp, size_t size){
int n;
int sent = 0;
int fd = fileno(sock_fp); // get file descriptor of the socket
char* str = new char[size];
while(fgets(str, size+1, fp)){ // read contents of the file per size bytes
n = write(fd, str, size); // write size bytes to client
sent += n;
memset(&str[0], 0, sizeof(str)); // empty buffer
}
delete[] str;
return sent;
}
// explore files in a directory recursively and add them to the queue
void exploreFilesRecursively(char *base_path, queue<queueItem> &files_queue, FILE* sock_fp){
int err;
char path[4096];
memset(&path[0], 0, sizeof(path));
struct dirent *dp;
DIR* dir_stream = opendir(base_path); // open the desired directory stream
if (!dir_stream) // return if NULL (directory stream could not be opened)
return;
while ((dp = readdir(dir_stream)) != NULL){ // get next directory entry
if (strcmp(dp->d_name, ".") != 0 && strcmp(dp->d_name, "..") != 0){ // if it is not the current or the previous directory
// create new path based on the base_path given by the user and the directory entry currently read
strcpy(path, base_path);
strcat(path, "/");
strcat(path, dp->d_name);
if (dp->d_type != DT_DIR){ // if it is a file and not a directory
while (1){
// try to lock the queue
if (err = pthread_mutex_lock(&queue_lock)){
perror2("pthread_mutex_lock", err);
exit(1);
}
if(files_queue.size() < queue_size){ // and if there is empty space
queueItem queue_item1; // create a new entry - struct
memset(&(queue_item1.file_name)[0], 0, 4096); // initialize file name as empty
strcpy(queue_item1.file_name, path); // copy the path to the new entry
queue_item1.sock_fp = sock_fp; // copy the socket file pointer to the new entry
files_queue.push(queue_item1); // and add it to the queue
// also update the map containing the number of files for this socket file pointer
files_per_socket.insert(pair<FILE*, int>(sock_fp, files_per_socket[sock_fp]+=1));
// try to unlock the queue
if (err = pthread_mutex_unlock(&queue_lock)){
perror2("pthread_mutex_unlock", err);
exit(1);
}
break; // also assures that unlock is not performed twice
}
// try to unlock the queue
if (err = pthread_mutex_unlock(&queue_lock)){
perror2("pthread_mutex_unlock", err);
exit(1);
}
}
}
exploreFilesRecursively(path, files_queue, sock_fp); // continue resursive exploring according to new path
}
}
closedir(dir_stream);
}
// code for communication threads
void* communication_thread(void* arg){
// get client socket file pointer and try to open it for both reading and writing
FILE* sock_fp;
int csock = *(int*)arg;
if ((sock_fp = fdopen(csock, "r+")) == NULL)
perror_exit("fdopen");
// read from the socket the name of the directory that we want to explore and send
char dirname[4096];
if (fgets(dirname, BUFSIZ, sock_fp) == NULL)
perror_exit("reading dirname");
dirname[strcspn(dirname, "\n")] = '\0'; // find first '\n' occurence and terminate the string there
exploreFilesRecursively(dirname, files_queue, sock_fp);
// now that all the directory has been explored recursively, wait for the workers to finish (i.e. 0 files for the socket)
// and close the connection. Also erase the socket file pointer from the map.
while(1){
if (files_per_socket[sock_fp] == 0){
if (fclose(sock_fp) != 0){
perror2("close file", errno);
}
map<FILE*, int>::iterator it;
it=files_per_socket.find(sock_fp);
files_per_socket.erase(it);
break;
}
}
// let the thread release its resources when it terminates
int err;
if (err = pthread_detach(pthread_self())){
perror2("pthread_detach", err);
exit(1);
}
pthread_exit(NULL);
}
// code for worker threads
void* worker_thread(void* arg){
int c, err;
while(1){
// try to lock the queue
if (err = pthread_mutex_lock(&queue_lock)){
perror2("pthread_mutex_lock", err);
exit(1);
}
if (files_queue.size() > 0){ // avoid empty queue
// get first item of the queue
queueItem queue_item1 = files_queue.front();
// open the file described the file name (path)
FILE* fp;
fp = fopen(queue_item1.file_name, "r");
// count file characters (to include it in the protocol)
int count = 0;
for (c = getc(fp); c != EOF; c = getc(fp))
count = count + 1;
rewind(fp); // get back to the beginning of the file
int fd = fileno(queue_item1.sock_fp); // get file descriptor of the socket file pointer
// We define a protocol in which we deliver each file beginning with a '~', accompanied by the file name
// the file name ends with '~' which is then followed by the number of characters of the file. Finally
// one more '~' is sent and then the actual file contents are transferred.
write(fd, "~", 1);
write(fd, queue_item1.file_name, 4096);
write(fd, "~", 1);
char str[256]; // assume that the number of characters in a file can be converted to a 256 characters string
memset(&str[0], 0, sizeof(str));
sprintf(str, "%d", count);
write(fd, str, sizeof(str));
write(fd, "~", 1);
write_file_to_client(queue_item1.sock_fp, fp, block_size);
// close the file described the file name (path)
if (fclose(fp) != 0){
perror2("close file", errno);
}
files_queue.pop(); // pop the queue item we just processed
// Update the corresponding map by decrementing the files for this socket by one
files_per_socket.insert(pair<FILE*, int>(queue_item1.sock_fp, files_per_socket[queue_item1.sock_fp]-=1));
}
// try to unlock the queue
if (err = pthread_mutex_unlock(&queue_lock)){
perror2("pthread_mutex_unlock", err);
exit(1);
}
}
// let the thread release its resources when it terminates
if (err = pthread_detach(pthread_self())){
perror2("pthread_detach", err);
exit(1);
}
pthread_exit(NULL);
}
int main(int argc, char* argv[]){
int port = 0;
int thread_pool_size = 0;
// get command line arguments
if (argc == 9){
// iterate through arguments - when position is an odd number we have a option and the next
// argument is the variable for the option
for (int i = 0; i < argc; i++){
if (i % 2){
if (!strcmp(argv[i], "-p")){
port = atoi(argv[i+1]);
}else if (!strcmp(argv[i], "-s")){
thread_pool_size = atoi(argv[i+1]);
}else if (!strcmp(argv[i], "-q")){
queue_size = atoi(argv[i+1]);
}else if (!strcmp(argv[i], "-b")){
block_size = atoi(argv[i+1]);
}
}
}
}else{
perror("wrong arguments");
}
// keep worker thread ids in a vector for further use
vector<pthread_t> wtids;
// create thread_pool_size number of worker threads and add them to the vector
int err;
for (int i = 0; i < thread_pool_size; i++){
pthread_t returned;
if (err = pthread_create(&returned, NULL, worker_thread, NULL)){
perror2("pthread_create", err);
exit(1);
}
wtids.push_back(returned);
}
int lsock, csock;
// create listening socket through internet
if ((lsock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
perror("socket");
exit(-1);
}
// set socket options (we want the socket to be reusable itself)
int enable = 1;
if (setsockopt(lsock, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0)
perror("setsockopt(SO_REUSEADDR) failed");
struct sockaddr_in myaddr ; // structure for handling internet addresses
myaddr.sin_addr.s_addr = htonl(INADDR_ANY); // get any address and convert to network byte order
myaddr.sin_port = htons(port); // covnert port to network byte order
myaddr.sin_family = AF_INET; // we will use Internet Protocol v4 addresses
if (bind(lsock, (struct sockaddr*)&myaddr, sizeof(myaddr))) // bind address in myaddr to listening socket
perror_exit("bind");
// mark as passive socket (will be used to accept incoming connection requests using accept())
// arbitary queue size = 5
if (listen(lsock, 5) != 0)
perror_exit("listen");
// create a vector to hold communication thread ids
vector<pthread_t> ctids;
// wait for new connections, and every time a client is connected to the server
while (1){
// extract the first connection request on the queue of pending connections for the listening socket,
// create a new connected socket, and return a new file descriptor referring to that socket
if ((csock = accept(lsock, NULL, NULL)) < 0)
perror_exit("accept");
// create new communication thread
pthread_t returned;
if (err = pthread_create(&returned, NULL, communication_thread, (void*)&csock)){
perror2("pthread_create", err);
exit(1);
}
ctids.push_back(returned); // add it to the corresponding vector
}
return 0;
}