Skip to content

Commit

Permalink
[C] Basic working flow for C & C++ wrapper for error frames.
Browse files Browse the repository at this point in the history
  • Loading branch information
mikeb01 committed Aug 1, 2024
1 parent 0b81c4c commit a964ffd
Show file tree
Hide file tree
Showing 16 changed files with 321 additions and 32 deletions.
94 changes: 93 additions & 1 deletion aeron-client/src/main/c/aeron_client_conductor.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ int aeron_client_conductor_init(aeron_client_conductor_t *conductor, aeron_conte
conductor->error_handler = context->error_handler;
conductor->error_handler_clientd = context->error_handler_clientd;

conductor->error_frame_handler = context->error_frame_handler;
conductor->error_handler_clientd = context->error_frame_handler_clientd;

conductor->on_new_publication = context->on_new_publication;
conductor->on_new_publication_clientd = context->on_new_publication_clientd;

Expand Down Expand Up @@ -352,9 +355,21 @@ void aeron_client_conductor_on_driver_response(int32_t type_id, uint8_t *buffer,
break;
}

default:
case AERON_RESPONSE_ON_PUBLICATION_ERROR:
{
aeron_publication_error_t *response = (aeron_publication_error_t *)buffer;

if (length < sizeof(aeron_publication_error_t))
{
goto malformed_command;
}

result = aeron_client_conductor_on_error_frame(conductor, response);
break;
}

default:
{
AERON_CLIENT_FORMAT_BUFFER(error_message, "response=%x unknown", type_id);
conductor->error_handler(
conductor->error_handler_clientd, AERON_ERROR_CODE_UNKNOWN_COMMAND_TYPE_ID, error_message);
Expand Down Expand Up @@ -2188,6 +2203,45 @@ int aeron_client_conductor_on_operation_success(
return 0;
}

struct aeron_client_conductor_clientd_stct
{
aeron_client_conductor_t *conductor;
void *clientd;
};
typedef struct aeron_client_conductor_clientd_stct aeron_client_conductor_clientd_t;

void aeron_client_conductor_forward_error(void *clientd, int64_t key, void *value)
{
aeron_client_conductor_clientd_t *conductor_clientd = (aeron_client_conductor_clientd_t *)clientd;
aeron_client_conductor_t *conductor = conductor_clientd->conductor;
aeron_publication_error_t *response = (aeron_publication_error_t *)conductor_clientd->clientd;
aeron_client_command_base_t *resource = (aeron_client_command_base_t *)value;

if (AERON_CLIENT_TYPE_PUBLICATION == resource->type)
{
aeron_publication_t *publication = (aeron_publication_t *)resource;
if (response->registration_id == publication->original_registration_id)
{
// TODO: Use a union.
conductor->error_frame_handler(
conductor->error_handler_clientd, (aeron_publication_error_values_t *)response);
}
}
}

int aeron_client_conductor_on_error_frame(aeron_client_conductor_t *conductor, aeron_publication_error_t *response)
{
aeron_client_conductor_clientd_t clientd = {
.conductor = conductor,
.clientd = response
};

aeron_int64_to_ptr_hash_map_for_each(
&conductor->resource_by_id_map, aeron_client_conductor_forward_error, (void *)&clientd);

return 0;
}

aeron_subscription_t *aeron_client_conductor_find_subscription_by_id(
aeron_client_conductor_t *conductor, int64_t registration_id)
{
Expand Down Expand Up @@ -2482,6 +2536,44 @@ int aeron_client_conductor_offer_destination_command(
return 0;
}

int aeron_client_conductor_reject_image(
aeron_client_conductor_t *conductor,
int64_t image_correlation_id,
int64_t position,
const char *reason,
int32_t command_type)
{
size_t reason_length = strlen(reason);
const size_t command_length = sizeof(aeron_reject_image_command_t) + reason_length;

int rb_offer_fail_count = 0;
int32_t offset;
while ((offset = aeron_mpsc_rb_try_claim(&conductor->to_driver_buffer, command_type, command_length)) < 0)
{
if (++rb_offer_fail_count > AERON_CLIENT_COMMAND_RB_FAIL_THRESHOLD)
{
const char *err_buffer = "reject_image command could not be sent";
conductor->error_handler(conductor->error_handler_clientd, AERON_CLIENT_ERROR_BUFFER_FULL, err_buffer);
AERON_SET_ERR(AERON_CLIENT_ERROR_BUFFER_FULL, "%s", err_buffer);
return -1;
}

sched_yield();
}

uint8_t *ptr = (conductor->to_driver_buffer.buffer + offset);
aeron_reject_image_command_t *command = (aeron_reject_image_command_t *)ptr;
command->image_correlation_id = image_correlation_id;
command->position = position;
command->reason_length = reason_length;
memcpy(ptr + offsetof(aeron_reject_image_command_t, reason_text), reason, reason_length);
command->reason_text[reason_length] = '\0';

aeron_mpsc_rb_commit(&conductor->to_driver_buffer, offset);

return 0;
}

extern int aeron_counter_heartbeat_timestamp_find_counter_id_by_registration_id(
aeron_counters_reader_t *counters_reader, int32_t type_id, int64_t registration_id);
extern bool aeron_counter_heartbeat_timestamp_is_active(
Expand Down
11 changes: 11 additions & 0 deletions aeron-client/src/main/c/aeron_client_conductor.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ typedef struct aeron_client_conductor_stct
aeron_error_handler_t error_handler;
void *error_handler_clientd;

aeron_error_frame_handler_t error_frame_handler;
void *error_frame_handler_clientd;

aeron_on_new_publication_t on_new_publication;
void *on_new_publication_clientd;

Expand Down Expand Up @@ -357,6 +360,7 @@ int aeron_client_conductor_on_counter_ready(aeron_client_conductor_t *conductor,
int aeron_client_conductor_on_unavailable_counter(
aeron_client_conductor_t *conductor, aeron_counter_update_t *response);
int aeron_client_conductor_on_client_timeout(aeron_client_conductor_t *conductor, aeron_client_timeout_t *response);
int aeron_client_conductor_on_error_frame(aeron_client_conductor_t *conductor, aeron_publication_error_t *response);

int aeron_client_conductor_get_or_create_log_buffer(
aeron_client_conductor_t *conductor,
Expand All @@ -378,6 +382,13 @@ int aeron_client_conductor_offer_destination_command(
const char *uri,
int64_t *correlation_id);

int aeron_client_conductor_reject_image(
aeron_client_conductor_t *conductor,
int64_t image_correlation_id,
int64_t position,
const char *reason,
int32_t command_type);

inline int aeron_counter_heartbeat_timestamp_find_counter_id_by_registration_id(
aeron_counters_reader_t *counters_reader, int32_t type_id, int64_t registration_id)
{
Expand Down
25 changes: 25 additions & 0 deletions aeron-client/src/main/c/aeron_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ void aeron_default_error_handler(void *clientd, int errcode, const char *message
exit(EXIT_FAILURE);
}

void aeron_default_error_frame_handler(void *clientd, aeron_publication_error_values_t *message)
{
}

int aeron_context_init(aeron_context_t **context)
{
aeron_context_t *_context = NULL;
Expand Down Expand Up @@ -76,6 +80,7 @@ int aeron_context_init(aeron_context_t **context)
_context->client_name = NULL;
_context->error_handler = aeron_default_error_handler;
_context->error_handler_clientd = NULL;
_context->error_frame_handler = aeron_default_error_frame_handler;
_context->on_new_publication = NULL;
_context->on_new_publication_clientd = NULL;
_context->on_new_exclusive_publication = NULL;
Expand Down Expand Up @@ -309,6 +314,26 @@ void *aeron_context_get_error_handler_clientd(aeron_context_t *context)
return NULL != context ? context->error_handler_clientd : NULL;
}

int aeron_context_set_error_frame_handler(aeron_context_t *context, aeron_error_frame_handler_t handler, void *clientd)
{
AERON_CONTEXT_SET_CHECK_ARG_AND_RETURN(-1, context);

context->error_frame_handler = handler;
context->error_frame_handler_clientd = clientd;
return 0;
}

aeron_error_frame_handler_t aeron_context_get_error_frame_handler(aeron_context_t *context)
{
return NULL != context ? context->error_frame_handler : NULL;
}

void *aeron_context_get_error_frame_handler_clientd(aeron_context_t *context)
{
return NULL != context ? context->error_frame_handler_clientd : NULL;
}


int aeron_context_set_on_new_publication(aeron_context_t *context, aeron_on_new_publication_t handler, void *clientd)
{
AERON_CONTEXT_SET_CHECK_ARG_AND_RETURN(-1, context);
Expand Down
3 changes: 3 additions & 0 deletions aeron-client/src/main/c/aeron_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ typedef struct aeron_context_stct
aeron_on_unavailable_counter_t on_unavailable_counter;
void *on_unavailable_counter_clientd;

aeron_error_frame_handler_t error_frame_handler;
void *error_frame_handler_clientd;

aeron_agent_on_start_func_t agent_on_start_func;
void *agent_on_start_state;

Expand Down
20 changes: 20 additions & 0 deletions aeron-client/src/main/c/aeron_image.c
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,26 @@ bool aeron_image_is_closed(aeron_image_t *image)
return is_closed;
}

int aeron_image_reject(aeron_image_t *image, const char *reason)
{
if (NULL == image)
{
AERON_SET_ERR(EINVAL, "%s", "image is null");
return -1;
}

int64_t position = 0;
AERON_GET_VOLATILE(position, *image->subscriber_position);

if (aeron_subscription_reject_image(image->subscription, image->key.correlation_id, position, reason) < 0)
{
AERON_APPEND_ERR("%s", "");
return -1;
}

return 0;
}

extern int64_t aeron_image_removal_change_number(aeron_image_t *image);

extern bool aeron_image_is_in_use_by_subscription(aeron_image_t *image, int64_t last_change_number);
Expand Down
13 changes: 13 additions & 0 deletions aeron-client/src/main/c/aeron_subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,19 @@ int aeron_subscription_try_resolve_channel_endpoint_port(
return result;
}

int aeron_subscription_reject_image(
aeron_subscription_t *subscription, int64_t image_correlation_id, int64_t position, const char *reason)
{
if (aeron_client_conductor_reject_image(
subscription->conductor, image_correlation_id, position, reason, AERON_COMMAND_REJECT_IMAGE) < 0)
{
AERON_APPEND_ERR("%s", "");
return -1;
}

return 0;
}

extern int aeron_subscription_find_image_index(aeron_image_list_t *volatile image_list, aeron_image_t *image);
extern int64_t aeron_subscription_last_image_list_change_number(aeron_subscription_t *subscription);
extern void aeron_subscription_propose_last_image_change_number(
Expand Down
3 changes: 3 additions & 0 deletions aeron-client/src/main/c/aeron_subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,7 @@ inline void aeron_subscription_propose_last_image_change_number(
}
}

int aeron_subscription_reject_image(
aeron_subscription_t *subscription, int64_t image_correlation_id, int64_t position, const char *reason);

#endif //AERON_C_SUBSCRIPTION_H
43 changes: 25 additions & 18 deletions aeron-client/src/main/c/aeronc.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,22 @@ typedef struct aeron_header_values_stct
size_t position_bits_to_shift;
}
aeron_header_values_t;

struct aeron_publication_error_values_stct
{
int64_t registration_id;
int32_t session_id;
int32_t stream_id;
int64_t receiver_id;
int64_t group_tag;
int16_t address_type;
uint16_t address_port;
uint8_t address[16];
int32_t error_code;
int32_t error_message_length;
uint8_t error_message[1];
};
typedef struct aeron_publication_error_values_stct aeron_publication_error_values_t;
#pragma pack(pop)

typedef struct aeron_subscription_stct aeron_subscription_t;
Expand All @@ -84,23 +100,6 @@ typedef struct aeron_image_controlled_fragment_assembler_stct aeron_image_contro
typedef struct aeron_fragment_assembler_stct aeron_fragment_assembler_t;
typedef struct aeron_controlled_fragment_assembler_stct aeron_controlled_fragment_assembler_t;

struct aeron_publication_error_frame_stct
{
int64_t registration_id;
int64_t receiver_id;
int32_t session_id;
int32_t stream_id;
struct
{
bool is_present;
int64_t value;
}
group_tag;
// TODO: How best to handle the source address? String? Do we want to have a dependency on struct sockaddr_storage
size_t message_len;
const char *message;
};
typedef struct aeron_publication_error_frame_stct aeron_publication_error_frame_t;

/**
* Environment variables and functions used for setting values of an aeron_context_t.
Expand Down Expand Up @@ -150,7 +149,7 @@ typedef void (*aeron_error_handler_t)(void *clientd, int errcode, const char *me
/**
* The error frame handler to be called when the driver notifies the client about an error frame being received
*/
typedef void (*aeron_error_frame_handler_t)(void *clientd);
typedef void (*aeron_error_frame_handler_t)(void *clientd, aeron_publication_error_values_t *error_frame);

/**
* Generalised notification callback.
Expand Down Expand Up @@ -2023,6 +2022,14 @@ int aeron_image_block_poll(

bool aeron_image_is_closed(aeron_image_t *image);

/**
* Force the driver to disconnect this image from the remote publication.
*
* @param image to be rejected.
* @param reason an error message to be forwarded back to the publication.
*/
int aeron_image_reject(aeron_image_t *image, const char *reason);

/**
* A fragment handler that sits in a chain-of-responsibility pattern that reassembles fragmented messages
* so that the next handler in the chain only sees whole messages.
Expand Down
6 changes: 3 additions & 3 deletions aeron-client/src/main/c/command/aeron_control_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
#define AERON_COMMAND_ADD_RCV_DESTINATION (0x0C)
#define AERON_COMMAND_REMOVE_RCV_DESTINATION (0x0D)
#define AERON_COMMAND_TERMINATE_DRIVER (0x0E)
#define AERON_COMMAND_INVALIDATE_IMAGE (0x0F)
#define AERON_COMMAND_REJECT_IMAGE (0x0F)

#define AERON_RESPONSE_ON_ERROR (0x0F01)
#define AERON_RESPONSE_ON_AVAILABLE_IMAGE (0x0F02)
Expand Down Expand Up @@ -186,15 +186,15 @@ typedef struct aeron_terminate_driver_command_stct
}
aeron_terminate_driver_command_t;

typedef struct aeron_invalidate_image_command_stct
typedef struct aeron_reject_image_command_stct
{
aeron_correlated_command_t correlated;
int64_t image_correlation_id;
int64_t position;
int32_t reason_length;
uint8_t reason_text[1];
}
aeron_invalidate_image_command_t;
aeron_reject_image_command_t;

struct aeron_publication_error_stct
{
Expand Down
Loading

0 comments on commit a964ffd

Please sign in to comment.