Skip to content
This repository has been archived by the owner on Apr 13, 2024. It is now read-only.

Commit

Permalink
Implement error handling and exceptions (#18)
Browse files Browse the repository at this point in the history
* implement error handling and exceptions

* Add test for MultiplePublisherError

* add some more tests
  • Loading branch information
pd0wm authored Dec 4, 2019
1 parent a7d5bb7 commit 3b753be
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 46 deletions.
6 changes: 5 additions & 1 deletion messaging/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# must be build with scons
from .messaging_pyx import Context, Poller, SubSocket, PubSocket # pylint: disable=no-name-in-module, import-error
from .messaging_pyx import Context, Poller, SubSocket, PubSocket # pylint: disable=no-name-in-module, import-error
from .messaging_pyx import MultiplePublishersError, MessagingError # pylint: disable=no-name-in-module, import-error

assert MultiplePublishersError
assert MessagingError

from cereal import log
from cereal.services import service_list
Expand Down
14 changes: 9 additions & 5 deletions messaging/impl_msgq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,16 @@ MSGQMessage::~MSGQMessage() {
}


void MSGQSubSocket::connect(Context *context, std::string endpoint, std::string address, bool conflate){
int MSGQSubSocket::connect(Context *context, std::string endpoint, std::string address, bool conflate){
assert(context);
assert(address == "127.0.0.1");

q = new msgq_queue_t;
msgq_new_queue(q, endpoint.c_str(), DEFAULT_SEGMENT_SIZE);
int r = msgq_new_queue(q, endpoint.c_str(), DEFAULT_SEGMENT_SIZE);
if (r != 0){
return r;
}

msgq_init_subscriber(q);

if (conflate){
Expand All @@ -64,7 +68,7 @@ void MSGQSubSocket::connect(Context *context, std::string endpoint, std::string

timeout = -1;

//std::cout << "MSGQ SUB: " << endpoint << std::endl;
return 0;
}


Expand Down Expand Up @@ -130,14 +134,14 @@ MSGQSubSocket::~MSGQSubSocket(){
}
}

void MSGQPubSocket::connect(Context *context, std::string endpoint){
int MSGQPubSocket::connect(Context *context, std::string endpoint){
assert(context);

q = new msgq_queue_t;
msgq_new_queue(q, endpoint.c_str(), DEFAULT_SEGMENT_SIZE);
msgq_init_publisher(q);

//std::cout << "MSGQ PUB: " << endpoint << std::endl;
return 0;
}

int MSGQPubSocket::sendMessage(Message *message){
Expand Down
4 changes: 2 additions & 2 deletions messaging/impl_msgq.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class MSGQSubSocket : public SubSocket {
msgq_queue_t * q = NULL;
int timeout;
public:
void connect(Context *context, std::string endpoint, std::string address, bool conflate=false);
int connect(Context *context, std::string endpoint, std::string address, bool conflate=false);
void setTimeout(int timeout);
void * getRawSocket() {return (void*)q;}
Message *receive(bool non_blocking=false);
Expand All @@ -45,7 +45,7 @@ class MSGQPubSocket : public PubSocket {
private:
msgq_queue_t * q = NULL;
public:
void connect(Context *context, std::string endpoint);
int connect(Context *context, std::string endpoint);
int sendMessage(Message *message);
int send(char *data, size_t size);
~MSGQPubSocket();
Expand Down
19 changes: 10 additions & 9 deletions messaging/impl_zmq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@ ZMQMessage::~ZMQMessage() {
}


void ZMQSubSocket::connect(Context *context, std::string endpoint, std::string address, bool conflate){
int ZMQSubSocket::connect(Context *context, std::string endpoint, std::string address, bool conflate){
sock = zmq_socket(context->getRawContext(), ZMQ_SUB);
assert(sock);
if (sock == NULL){
return -1;
}

zmq_setsockopt(sock, ZMQ_SUBSCRIBE, "", 0);

Expand All @@ -71,9 +73,7 @@ void ZMQSubSocket::connect(Context *context, std::string endpoint, std::string a
full_endpoint = "tcp://" + address + ":";
full_endpoint += std::to_string(get_port(endpoint));

//std::cout << "ZMQ SUB: " << full_endpoint << std::endl;

assert(zmq_connect(sock, full_endpoint.c_str()) == 0);
return zmq_connect(sock, full_endpoint.c_str());
}


Expand Down Expand Up @@ -103,15 +103,16 @@ ZMQSubSocket::~ZMQSubSocket(){
zmq_close(sock);
}

void ZMQPubSocket::connect(Context *context, std::string endpoint){
int ZMQPubSocket::connect(Context *context, std::string endpoint){
sock = zmq_socket(context->getRawContext(), ZMQ_PUB);
if (sock == NULL){
return -1;
}

full_endpoint = "tcp://*:";
full_endpoint += std::to_string(get_port(endpoint));

//std::cout << "ZMQ PUB: " << full_endpoint << std::endl;

assert(zmq_bind(sock, full_endpoint.c_str()) == 0);
return zmq_bind(sock, full_endpoint.c_str());
}

int ZMQPubSocket::sendMessage(Message *message){
Expand Down
4 changes: 2 additions & 2 deletions messaging/impl_zmq.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class ZMQSubSocket : public SubSocket {
void * sock;
std::string full_endpoint;
public:
void connect(Context *context, std::string endpoint, std::string address, bool conflate=false);
int connect(Context *context, std::string endpoint, std::string address, bool conflate=false);
void setTimeout(int timeout);
void * getRawSocket() {return sock;}
Message *receive(bool non_blocking=false);
Expand All @@ -44,7 +44,7 @@ class ZMQPubSocket : public PubSocket {
void * sock;
std::string full_endpoint;
public:
void connect(Context *context, std::string endpoint);
int connect(Context *context, std::string endpoint);
int sendMessage(Message *message);
int send(char *data, size_t size);
~ZMQPubSocket();
Expand Down
37 changes: 29 additions & 8 deletions messaging/messaging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,38 @@ SubSocket * SubSocket::create(){

SubSocket * SubSocket::create(Context * context, std::string endpoint){
SubSocket *s = SubSocket::create();
s->connect(context, endpoint, "127.0.0.1");
int r = s->connect(context, endpoint, "127.0.0.1");

return s;
if (r == 0) {
return s;
} else {
delete s;
return NULL;
}
}

SubSocket * SubSocket::create(Context * context, std::string endpoint, std::string address){
SubSocket *s = SubSocket::create();
s->connect(context, endpoint, address);
int r = s->connect(context, endpoint, address);

return s;
if (r == 0) {
return s;
} else {
delete s;
return NULL;
}
}

SubSocket * SubSocket::create(Context * context, std::string endpoint, std::string address, bool conflate){
SubSocket *s = SubSocket::create();
s->connect(context, endpoint, address, conflate);
int r = s->connect(context, endpoint, address, conflate);

return s;
if (r == 0) {
return s;
} else {
delete s;
return NULL;
}
}

PubSocket * PubSocket::create(){
Expand All @@ -55,8 +70,14 @@ PubSocket * PubSocket::create(){

PubSocket * PubSocket::create(Context * context, std::string endpoint){
PubSocket *s = PubSocket::create();
s->connect(context, endpoint);
return s;
int r = s->connect(context, endpoint);

if (r == 0) {
return s;
} else {
delete s;
return NULL;
}
}

Poller * Poller::create(){
Expand Down
6 changes: 4 additions & 2 deletions messaging/messaging.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include <vector>
#include <string>

#define MSG_MULTIPLE_PUBLISHERS 100

class Context {
public:
virtual void * getRawContext() = 0;
Expand All @@ -23,7 +25,7 @@ class Message {

class SubSocket {
public:
virtual void connect(Context *context, std::string endpoint, std::string address, bool conflate=false) = 0;
virtual int connect(Context *context, std::string endpoint, std::string address, bool conflate=false) = 0;
virtual void setTimeout(int timeout) = 0;
virtual Message *receive(bool non_blocking=false) = 0;
virtual void * getRawSocket() = 0;
Expand All @@ -36,7 +38,7 @@ class SubSocket {

class PubSocket {
public:
virtual void connect(Context *context, std::string endpoint) = 0;
virtual int connect(Context *context, std::string endpoint) = 0;
virtual int sendMessage(Message *message) = 0;
virtual int send(char *data, size_t size) = 0;
static PubSocket * create();
Expand Down
7 changes: 2 additions & 5 deletions messaging/messaging.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,17 @@ cdef extern from "messaging.hpp":
size_t getSize()
char *getData()



cdef cppclass SubSocket:
@staticmethod
SubSocket * create()
void connect(Context *, string, string, bool)
int connect(Context *, string, string, bool)
Message * receive(bool)
void setTimeout(int)


cdef cppclass PubSocket:
@staticmethod
PubSocket * create()
void connect(Context *, string)
int connect(Context *, string)
int sendMessage(Message *)
int send(char *, size_t)

Expand Down
49 changes: 41 additions & 8 deletions messaging/messaging_pyx.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,18 @@ from messaging cimport Poller as cppPoller
from messaging cimport Message as cppMessage


class MessagingError(Exception):
pass


class MultiplePublishersError(MessagingError):
pass


cdef class Context:
cdef cppContext * context
def __cinit__(self):

def __cinit__(self):
self.context = cppContext.create()

def term(self):
Expand All @@ -34,7 +43,7 @@ cdef class Poller:
cdef cppPoller * poller
cdef list sub_sockets

def __cinit__(self):
def __cinit__(self):
self.sub_sockets = []
self.poller = cppPoller.create()

Expand Down Expand Up @@ -63,10 +72,13 @@ cdef class SubSocket:
cdef cppSubSocket * socket
cdef bool is_owner

def __cinit__(self):
def __cinit__(self):
self.socket = cppSubSocket.create()
self.is_owner = True

if self.socket == NULL:
raise MessagingError

def __dealloc__(self):
if self.is_owner:
del self.socket
Expand All @@ -79,12 +91,17 @@ cdef class SubSocket:
self.socket = ptr

def connect(self, Context context, string endpoint, string address=b"127.0.0.1", bool conflate=False):
self.socket.connect(context.context, endpoint, address, conflate)
r = self.socket.connect(context.context, endpoint, address, conflate)

if r != 0:
if errno.errno == errno.EADDRINUSE:
raise MultiplePublishersError
else:
raise MessagingError

def setTimeout(self, int timeout):
self.socket.setTimeout(timeout)


def receive(self, bool non_blocking=False):
msg = self.socket.receive(non_blocking)

Expand All @@ -105,14 +122,30 @@ cdef class SubSocket:

cdef class PubSocket:
cdef cppPubSocket * socket
def __cinit__(self):

def __cinit__(self):
self.socket = cppPubSocket.create()
if self.socket == NULL:
raise MessagingError

def __dealloc__(self):
del self.socket

def connect(self, Context context, string endpoint):
self.socket.connect(context.context, endpoint)
r = self.socket.connect(context.context, endpoint)

if r != 0:
if errno.errno == errno.EADDRINUSE:
raise MultiplePublishersError
else:
raise MessagingError

def send(self, string data):
return self.socket.send(<char*>data.c_str(), len(data))
length = len(data)
r = self.socket.send(<char*>data.c_str(), length)

if r != length:
if errno.errno == errno.EADDRINUSE:
raise MultiplePublishersError
else:
raise MessagingError
6 changes: 2 additions & 4 deletions messaging/msgq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,16 @@ int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){
auto fd = open(full_path, O_RDWR | O_CREAT, 0777);
delete[] full_path;

assert(fd >= 0); // TODO: properly handle exit codes
if (fd < 0)
return -1;

int rc = ftruncate(fd, size + sizeof(msgq_header_t));
assert(rc == 0); // TODO: properly handle exit codes
if (rc < 0)
return -1;

char * mem = (char*)mmap(NULL, size + sizeof(msgq_header_t), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
close(fd);

assert(mem != NULL); // TODO: properly handle exit codes
if (mem == NULL)
return -1;

Expand Down Expand Up @@ -207,7 +204,8 @@ int msgq_msg_send(msgq_msg_t * msg, msgq_queue_t *q){
// Die if we are no longer the active publisher
if (q->write_uid_local != *q->write_uid){
std::cout << "Killing old publisher: " << q->endpoint << std::endl;
assert(q->write_uid_local == *q->write_uid);
errno = EADDRINUSE;
return -1;
}

uint64_t total_msg_size = ALIGN(msg->size + sizeof(int64_t));
Expand Down
Loading

0 comments on commit 3b753be

Please sign in to comment.