Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/handle client in new process #140

Merged
merged 5 commits into from
Apr 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/rpcclient/tests/test_processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ def test_list_sanity(client):

def test_get_process_by_listening_port(client):
# there should only be one process listening on this port and that's us
assert client.processes.get_process_by_listening_port(DEFAULT_PORT).pid == client.pid
worker_process = client.processes.get_by_pid(client.pid)
assert client.processes.get_process_by_listening_port(DEFAULT_PORT).pid == worker_process.ppid
11 changes: 11 additions & 0 deletions src/rpcclient/tests/test_worker_processes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from contextlib import closing

from rpcclient.client_factory import create_client


def test_new_process_per_client(client):
with closing(create_client('127.0.0.1')) as client2:
assert client.pid != client2.pid
client_process = client.processes.get_by_pid(client.pid)
client2_process = client2.processes.get_by_pid(client2.pid)
assert client_process.ppid == client2_process.ppid
71 changes: 67 additions & 4 deletions src/rpcserver/rpcserver.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Example usage: \n\
#define MAX_OPTION_LEN (256)
#define BUFFERSIZE (64 * 1024)
#define INVALID_PID (0xffffffff)
#define WORKER_CLIENT_SOCKET_FD (3)

extern char **environ;

Expand Down Expand Up @@ -233,6 +234,41 @@ bool internal_spawn(bool background, char *const *argv, char *const *envp, pid_t
return success;
}

bool spawn_worker_server(int client_socket, const char *argv[], int argc)
{
bool success = false;

// append -w to original argv
int new_argc = argc + 1;
const char **new_argv = malloc((new_argc + 1) * sizeof(char *));
for (int i = 0; i < argc; ++i)
{
new_argv[i] = argv[i];
}
new_argv[new_argc - 1] = "-w";
new_argv[new_argc] = NULL;

pid_t pid;
posix_spawn_file_actions_t actions;
CHECK(0 == posix_spawn_file_actions_init(&actions));
CHECK(0 == posix_spawn_file_actions_adddup2(&actions, STDIN_FILENO, STDIN_FILENO));
CHECK(0 == posix_spawn_file_actions_adddup2(&actions, STDOUT_FILENO, STDOUT_FILENO));
CHECK(0 == posix_spawn_file_actions_adddup2(&actions, STDERR_FILENO, STDERR_FILENO));
CHECK(0 == posix_spawn_file_actions_adddup2(&actions, client_socket, WORKER_CLIENT_SOCKET_FD));

CHECK(0 == posix_spawnp(&pid, new_argv[0], &actions, NULL, (char *const *)new_argv, environ));
CHECK(pid != INVALID_PID);

TRACE("Spawned Worker Process: %d", pid);
success = true;

error:
posix_spawn_file_actions_destroy(&actions);
free(new_argv);
close(client_socket);
return success;
}

bool send_reply(int sockfd, cmd_type_t type)
{
protocol_message_t protocol_message = {.magic = MAGIC, .cmd_type = type};
Expand Down Expand Up @@ -963,6 +999,8 @@ void handle_client(int sockfd)
handshake.arch = ARCH_UNKNOWN;
strncpy(handshake.sysname, uname_buf.sysname, HANDSHAKE_SYSNAME_LEN - 1);

CHECK(-1 != fcntl(sockfd, F_SETFD, FD_CLOEXEC));

#ifdef __ARM_ARCH_ISA_A64
handshake.arch = ARCH_ARM64;
#endif
Expand Down Expand Up @@ -1036,15 +1074,26 @@ void handle_client(int sockfd)

void signal_handler(int sig)
{
int status;
pid_t pid;

if (SIGCHLD == sig)
{
pid = waitpid(-1, &status, 0);
TRACE("PID: %d exited with status: %d", pid, status);
return;
}

TRACE("entered with signal code: %d", sig);
}

int main(int argc, const char *argv[])
{
int opt;
bool worker_spawn = false;
char port[MAX_OPTION_LEN] = DEFAULT_PORT;

while ((opt = getopt(argc, (char *const *)argv, "hp:o:")) != -1)
while ((opt = getopt(argc, (char *const *)argv, "hp:o:w")) != -1)
{
switch (opt)
{
Expand Down Expand Up @@ -1074,6 +1123,11 @@ int main(int argc, const char *argv[])
}
break;
}
case 'w':
{
worker_spawn = true;
break;
}
case 'h':
case '?':
default: /* '?' */
Expand All @@ -1086,6 +1140,13 @@ int main(int argc, const char *argv[])

signal(SIGPIPE, signal_handler);

if (worker_spawn)
{
TRACE("New worker spawned");
handle_client(WORKER_CLIENT_SOCKET_FD);
exit(EXIT_SUCCESS);
}

int err = 0;
int server_fd = -1;
struct addrinfo hints;
Expand Down Expand Up @@ -1115,11 +1176,13 @@ int main(int argc, const char *argv[])

CHECK(0 == listen(server_fd, MAX_CONNECTIONS));

pthread_t thread;
#ifdef __APPLE__
CHECK(0 == pthread_create(&thread, NULL, (void *(*)(void *))CFRunLoopRun, NULL));
pthread_t runloop_thread;
CHECK(0 == pthread_create(&runloop_thread, NULL, (void *(*)(void *))CFRunLoopRun, NULL));
#endif // __APPLE__

signal(SIGCHLD, signal_handler);
doronz88 marked this conversation as resolved.
Show resolved Hide resolved

while (1)
{
struct sockaddr_storage their_addr; // connector's address information
Expand All @@ -1132,7 +1195,7 @@ int main(int argc, const char *argv[])
CHECK(inet_ntop(their_addr.ss_family, get_in_addr((struct sockaddr *)&their_addr), ipstr, sizeof(ipstr)));
TRACE("Got a connection from %s [%d]", ipstr, client_fd);

CHECK(0 == pthread_create(&thread, NULL, (void *(*)(void *))handle_client, (void *)(long)client_fd));
CHECK(spawn_worker_server(client_fd, argv, argc));
}

error:
Expand Down