Skip to content

Commit

Permalink
Error Frames and User Image Invalidation (#1604)
Browse files Browse the repository at this point in the history
* [Java] Use private method to update state in PublicationImage.

* [Java] Add basic test and command to invalidate and image from user code. Prevent new data being inserted into an invalidated image. Add logging.

* [Java] Set image invalidation test as a slow test. Early exit of image is found during invalidation on the receiver.

* [Java] Start adding ErrorFlyweight.

* [Java] Pass invalidation reason through to the source publication and have it disconnect.  Allow receiver side image to linger to prevent immediately recreation of the image.  Treat errors received by the NetworkPublication to behaviour like end of stream messages.

* [C] Start adding invalidate image information.

* [C] Invalidate image on the receiver.

* [Java/C] Fix CodeQL errors.

* [C] Add support for send/receiving error frames and update flow control and liveness.

* [C] Tighten stack allocation of buffer to work better with MSVC.  Don't send max frame size each time.

* [Java] New system counter to track number of error messages received.  Check that we don't spam out error messages too quickly.  Reset timestamp of last SM sent by the publication image when sending errors.

* [C] New system counter to track number of error messages received.  Check that we don't spam out error messages too quickly.  Reset timestamp of last SM sent by the publication image when sending errors.

* [Java] Add additional tests for EOS and errors for MDC publications.

* [Java] Limit the length of the invalidation reason.

* [C] Limit reason length for image invalidation.

* [Java] Allow test to run on C media driver.

* [C/Java] Rename Error Messages to Error Frames.

* [Java] Log errors when error frames are received.

* [C] Log errors when error frames are received.

* [Java] Enable test for C driver.

* [Java] Checkstyle.

* [C] Use C++ compliant flexible array member.

* [C] Handle flexible array members in MSVC.

* [C] Use flexible array member correctly in more places for reason text on invalidate image.

* [Java] First pass at surfacing error frames to the client.

* [C] Send publication error frames to the client.

* [Java] Enable test for C driver.

* [Java] Only report publication error frame to client that has the publication registered.

* [Java] Start encapsulating all fields relating to an error frame in an object to be reported to the caller.

* [Java] Add additional parameters including source address of the error frame.

* [Java] Disable test for IPv6.

* [C] Add additional parameters to user visible error frame.

* [C] Use ntohs instead of betoh16 (fix Mac).

* [Java] Allow test to run always if the parameter is an IPv4 address.

* [Java] Naming and consistency updates.

* [C] Fix compile error.

* [Java] Ensure that other clients with the same publication see the error frames.

* [Java] Use "reject" instead of "invalidate" for marking images for errors.

* [Java] Javadoc.

* [Java] Remove last utf-8 encoding for error frames.

* [Java] Add error frames sent counter.

* [C] Add errors frames sent counter.

* [C] Start adding publication error frame information.

* [C] Basic working flow for C & C++ wrapper for error frames.

* [Java] Rename to explicitly reference that the handler is dealing specifically with error frames for publications. Add test to verify that it works for exclusive publications.

* [C/C++] Start adding test for validating exclusive publications receive error frames.

* [C++ Wrapper] Comment out test.

* [C] Ensure that error frames are pass to the client for exclusive publications.

* [Java] Checkstyle.

* [C++] Compilation warning.

* [Java] Tidy test and docs.

* [C/C++] Add support for reading the fields from a PublisherErrorFrame.  Also add copy and move constructors for dealing with the limit lifecycle of callback objects.

* [C/C++] Add comments and fix header.

* [C/C++] Fix compile error on Window.

* [Java] Add test to verify rejection doesn't work for IPC.

* [C] Use correct client data for the publication error frame callback within the client conductor.

* [C/C++] Add test to validate that publication error frames are passed to all clients with the same publication and not to ones that without it.

* [Java] Update Javadoc.

* [Java] Include destination registration id in publication error frame.

* [Java] Additional validation in RejectImageTest.

* [C] Add destination registration id support for publication error frames.

* [Java] Comments and cleanup.

* [Java] Update event code for REJECT_IMAGE.

* [Java] Fix Javadoc.

* [C] Small renames, fixes and verification for publication error frames.

* [C] Add logging for REJECT_IMAGE.

* [Java] Fix CodeQL and add test to RejectImageTest.
  • Loading branch information
mikeb01 authored Sep 4, 2024
1 parent 3b14277 commit b00799a
Show file tree
Hide file tree
Showing 87 changed files with 3,979 additions and 79 deletions.
7 changes: 6 additions & 1 deletion aeron-agent/src/main/java/io/aeron/agent/CmdInterceptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ class CmdInterceptor
CMD_IN_REMOVE_RCV_DESTINATION,
CMD_OUT_ON_CLIENT_TIMEOUT,
CMD_IN_TERMINATE_DRIVER,
CMD_IN_REMOVE_DESTINATION_BY_ID);
CMD_IN_REMOVE_DESTINATION_BY_ID,
CMD_IN_REJECT_IMAGE);

@SuppressWarnings("methodlength")
@Advice.OnMethodEnter
Expand Down Expand Up @@ -158,6 +159,10 @@ static void logCmd(final int msgTypeId, final DirectBuffer buffer, final int ind
case REMOVE_DESTINATION_BY_ID:
LOGGER.log(CMD_IN_REMOVE_DESTINATION_BY_ID, buffer, index, length);
break;

case REJECT_IMAGE:
LOGGER.log(CMD_IN_REJECT_IMAGE, buffer, index, length);
break;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,12 @@ public enum DriverEventCode implements EventCode
/**
* Remove destination by id
*/
CMD_IN_REMOVE_DESTINATION_BY_ID(56, DriverEventDissector::dissectCommand);
CMD_IN_REMOVE_DESTINATION_BY_ID(56, DriverEventDissector::dissectCommand),

/**
* Reject image command received by the driver.
*/
CMD_IN_REJECT_IMAGE(57, DriverEventDissector::dissectCommand);

static final int EVENT_CODE_TYPE = EventCodeType.DRIVER.getTypeCode();

Expand Down
16 changes: 16 additions & 0 deletions aeron-agent/src/main/java/io/aeron/agent/DriverEventDissector.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ final class DriverEventDissector
private static final ClientTimeoutFlyweight CLIENT_TIMEOUT = new ClientTimeoutFlyweight();
private static final TerminateDriverFlyweight TERMINATE_DRIVER = new TerminateDriverFlyweight();
private static final DestinationByIdMessageFlyweight DESTINATION_BY_ID = new DestinationByIdMessageFlyweight();
private static final RejectImageFlyweight REJECT_IMAGE = new RejectImageFlyweight();

static final String CONTEXT = "DRIVER";

Expand Down Expand Up @@ -220,6 +221,11 @@ static void dissectCommand(
dissectDestinationById(builder);
break;

case CMD_IN_REJECT_IMAGE:
REJECT_IMAGE.wrap(buffer, offset + encodedLength);
dissectRejectImage(builder);
break;

default:
builder.append("COMMAND_UNKNOWN: ").append(code);
break;
Expand Down Expand Up @@ -779,4 +785,14 @@ private static void dissectDestinationById(final StringBuilder builder)
.append("resourceRegistrationId=").append(DESTINATION_BY_ID.resourceRegistrationId())
.append(" destinationRegistrationId=").append(DESTINATION_BY_ID.destinationRegistrationId());
}

private static void dissectRejectImage(final StringBuilder builder)
{
builder
.append("clientId=").append(REJECT_IMAGE.clientId())
.append(" correlationId=").append(REJECT_IMAGE.correlationId())
.append(" imageCorrelationId=").append(REJECT_IMAGE.imageCorrelationId())
.append(" position=").append(REJECT_IMAGE.position())
.append(" reason=").append(REJECT_IMAGE.reason());
}
}
168 changes: 167 additions & 1 deletion aeron-client/src/main/c/aeron_client_conductor.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ int aeron_client_conductor_init(aeron_client_conductor_t *conductor, aeron_conte
conductor->error_handler = context->error_handler;
conductor->error_handler_clientd = context->error_handler_clientd;

conductor->error_frame_handler = context->error_frame_handler;
conductor->error_frame_handler_clientd = context->error_frame_handler_clientd;

conductor->on_new_publication = context->on_new_publication;
conductor->on_new_publication_clientd = context->on_new_publication_clientd;

Expand Down Expand Up @@ -352,6 +355,19 @@ void aeron_client_conductor_on_driver_response(int32_t type_id, uint8_t *buffer,
break;
}

case AERON_RESPONSE_ON_PUBLICATION_ERROR:
{
aeron_publication_error_t *response = (aeron_publication_error_t *)buffer;

if (length < sizeof(aeron_publication_error_t))
{
goto malformed_command;
}

result = aeron_client_conductor_on_error_frame(conductor, response);
break;
}

case AERON_RESPONSE_ON_STATIC_COUNTER:
{
aeron_static_counter_response_t *response = (aeron_static_counter_response_t *)buffer;
Expand All @@ -367,7 +383,6 @@ void aeron_client_conductor_on_driver_response(int32_t type_id, uint8_t *buffer,

default:
{

AERON_CLIENT_FORMAT_BUFFER(error_message, "response=%x unknown", type_id);
conductor->error_handler(
conductor->error_handler_clientd, AERON_ERROR_CODE_UNKNOWN_COMMAND_TYPE_ID, error_message);
Expand Down Expand Up @@ -2526,6 +2541,119 @@ int aeron_client_conductor_on_operation_success(
return 0;
}

struct aeron_client_conductor_clientd_stct
{
aeron_client_conductor_t *conductor;
void *clientd;
};
typedef struct aeron_client_conductor_clientd_stct aeron_client_conductor_clientd_t;

void aeron_client_conductor_forward_error(void *clientd, int64_t key, void *value)
{
aeron_client_conductor_clientd_t *conductor_clientd = (aeron_client_conductor_clientd_t *)clientd;
aeron_client_conductor_t *conductor = conductor_clientd->conductor;
aeron_publication_error_t *response = (aeron_publication_error_t *)conductor_clientd->clientd;
aeron_client_command_base_t *resource = (aeron_client_command_base_t *)value;

const bool is_publication = AERON_CLIENT_TYPE_PUBLICATION == resource->type &&
((aeron_publication_t *)resource)->original_registration_id == response->registration_id;
const bool is_exclusive_publication = AERON_CLIENT_TYPE_EXCLUSIVE_PUBLICATION == resource->type &&
((aeron_exclusive_publication_t *)resource)->original_registration_id == response->registration_id;

if (is_publication || is_exclusive_publication)
{
conductor->error_frame_handler(
conductor->error_frame_handler_clientd, (aeron_publication_error_values_t *)response);
}
}

#ifdef _MSC_VER
#define _Static_assert static_assert
#endif

_Static_assert(
sizeof(aeron_publication_error_t) == sizeof(aeron_publication_error_values_t),
"sizeof(aeron_publication_error_t) must be equal to sizeof(aeron_publication_error_values_t)");
_Static_assert(
offsetof(aeron_publication_error_t, registration_id) == offsetof(aeron_publication_error_values_t, registration_id),
"offsetof(aeron_publication_error_t, registration_id) must match offsetof(aeron_publication_error_values_t, registration_id)");
_Static_assert(
offsetof(aeron_publication_error_t, destination_registration_id) == offsetof(aeron_publication_error_values_t, destination_registration_id),
"offsetof(aeron_publication_error_t, destination_registration_id) must match offsetof(aeron_publication_error_values_t, destination_registration_id)");
_Static_assert(
offsetof(aeron_publication_error_t, session_id) == offsetof(aeron_publication_error_values_t, session_id),
"offsetof(aeron_publication_error_t, session_id) must match offsetof(aeron_publication_error_values_t, session_id)");
_Static_assert(
offsetof(aeron_publication_error_t, stream_id) == offsetof(aeron_publication_error_values_t, stream_id),
"offsetof(aeron_publication_error_t, stream_id) must match offsetof(aeron_publication_error_values_t, stream_id)");
_Static_assert(
offsetof(aeron_publication_error_t, receiver_id) == offsetof(aeron_publication_error_values_t, receiver_id),
"offsetof(aeron_publication_error_t, receiver_id) must match offsetof(aeron_publication_error_values_t, receiver_id)");
_Static_assert(
offsetof(aeron_publication_error_t, group_tag) == offsetof(aeron_publication_error_values_t, group_tag),
"offsetof(aeron_publication_error_t, group_tag) must match offsetof(aeron_publication_error_values_t, group_tag)");
_Static_assert(
offsetof(aeron_publication_error_t, address_type) == offsetof(aeron_publication_error_values_t, address_type),
"offsetof(aeron_publication_error_t, address_type) must match offsetof(aeron_publication_error_values_t, address_type)");
_Static_assert(
offsetof(aeron_publication_error_t, source_port) == offsetof(aeron_publication_error_values_t, source_port),
"offsetof(aeron_publication_error_t, address_port) must match offsetof(aeron_publication_error_values_t, address_port)");
_Static_assert(
offsetof(aeron_publication_error_t, source_address) == offsetof(aeron_publication_error_values_t, source_address),
"offsetof(aeron_publication_error_t, source_address) must match offsetof(aeron_publication_error_values_t, source_address)");
_Static_assert(
offsetof(aeron_publication_error_t, error_code) == offsetof(aeron_publication_error_values_t, error_code),
"offsetof(aeron_publication_error_t, error_code) must match offsetof(aeron_publication_error_values_t, error_code)");
_Static_assert(
offsetof(aeron_publication_error_t, error_message_length) == offsetof(aeron_publication_error_values_t, error_message_length),
"offsetof(aeron_publication_error_t, error_message_length) must match offsetof(aeron_publication_error_values_t, error_message_length)");
_Static_assert(
offsetof(aeron_publication_error_t, error_message) == offsetof(aeron_publication_error_values_t, error_message),
"offsetof(aeron_publication_error_t, error_message) must match offsetof(aeron_publication_error_values_t, error_message)");

int aeron_client_conductor_on_error_frame(aeron_client_conductor_t *conductor, aeron_publication_error_t *response)
{
aeron_client_conductor_clientd_t clientd = {
.conductor = conductor,
.clientd = response
};

aeron_int64_to_ptr_hash_map_for_each(
&conductor->resource_by_id_map, aeron_client_conductor_forward_error, (void *)&clientd);

return 0;
}

int aeron_publication_error_values_copy(aeron_publication_error_values_t **dst, aeron_publication_error_values_t *src)
{
if (NULL == src)
{
AERON_SET_ERR(-1, "%s", "src must not be NULL");
return -1;
}

if (NULL == dst)
{
AERON_SET_ERR(-1, "%s", "dst must not be NULL");
return -1;
}

size_t error_values_size = sizeof(*src) + (size_t)src->error_message_length;
if (aeron_alloc((void **)dst, error_values_size) < 0)
{
AERON_APPEND_ERR("%s", "");
return -1;
}

memcpy((void *)*dst, (void *)src, error_values_size);
return 0;
}

void aeron_publication_error_values_delete(aeron_publication_error_values_t *to_delete)
{
aeron_free(to_delete);
}

aeron_subscription_t *aeron_client_conductor_find_subscription_by_id(
aeron_client_conductor_t *conductor, int64_t registration_id)
{
Expand Down Expand Up @@ -2859,6 +2987,44 @@ int aeron_client_conductor_offer_destination_command(
return 0;
}

int aeron_client_conductor_reject_image(
aeron_client_conductor_t *conductor,
int64_t image_correlation_id,
int64_t position,
const char *reason,
int32_t command_type)
{
size_t reason_length = strlen(reason);
const size_t command_length = sizeof(aeron_reject_image_command_t) + reason_length;

int rb_offer_fail_count = 0;
int32_t offset;
while ((offset = aeron_mpsc_rb_try_claim(&conductor->to_driver_buffer, command_type, command_length)) < 0)
{
if (++rb_offer_fail_count > AERON_CLIENT_COMMAND_RB_FAIL_THRESHOLD)
{
const char *err_buffer = "reject_image command could not be sent";
conductor->error_handler(conductor->error_handler_clientd, AERON_CLIENT_ERROR_BUFFER_FULL, err_buffer);
AERON_SET_ERR(AERON_CLIENT_ERROR_BUFFER_FULL, "%s", err_buffer);
return -1;
}

sched_yield();
}

uint8_t *ptr = (conductor->to_driver_buffer.buffer + offset);
aeron_reject_image_command_t *command = (aeron_reject_image_command_t *)ptr;
command->image_correlation_id = image_correlation_id;
command->position = position;
command->reason_length = (int32_t)reason_length;
memcpy(ptr + offsetof(aeron_reject_image_command_t, reason_text), reason, reason_length);
command->reason_text[reason_length] = '\0';

aeron_mpsc_rb_commit(&conductor->to_driver_buffer, offset);

return 0;
}

extern int aeron_counter_heartbeat_timestamp_find_counter_id_by_registration_id(
aeron_counters_reader_t *counters_reader, int32_t type_id, int64_t registration_id);
extern bool aeron_counter_heartbeat_timestamp_is_active(
Expand Down
11 changes: 11 additions & 0 deletions aeron-client/src/main/c/aeron_client_conductor.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ typedef struct aeron_client_conductor_stct
aeron_error_handler_t error_handler;
void *error_handler_clientd;

aeron_publication_error_frame_handler_t error_frame_handler;
void *error_frame_handler_clientd;

aeron_on_new_publication_t on_new_publication;
void *on_new_publication_clientd;

Expand Down Expand Up @@ -384,6 +387,7 @@ int aeron_client_conductor_on_unavailable_counter(
int aeron_client_conductor_on_static_counter(aeron_client_conductor_t *conductor, aeron_static_counter_response_t *response);

int aeron_client_conductor_on_client_timeout(aeron_client_conductor_t *conductor, aeron_client_timeout_t *response);
int aeron_client_conductor_on_error_frame(aeron_client_conductor_t *conductor, aeron_publication_error_t *response);

int aeron_client_conductor_get_or_create_log_buffer(
aeron_client_conductor_t *conductor,
Expand All @@ -405,6 +409,13 @@ int aeron_client_conductor_offer_destination_command(
const char *uri,
int64_t *correlation_id);

int aeron_client_conductor_reject_image(
aeron_client_conductor_t *conductor,
int64_t image_correlation_id,
int64_t position,
const char *reason,
int32_t command_type);

inline int aeron_counter_heartbeat_timestamp_find_counter_id_by_registration_id(
aeron_counters_reader_t *counters_reader, int32_t type_id, int64_t registration_id)
{
Expand Down
25 changes: 25 additions & 0 deletions aeron-client/src/main/c/aeron_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ void aeron_default_error_handler(void *clientd, int errcode, const char *message
exit(EXIT_FAILURE);
}

void aeron_default_error_frame_handler(void *clientd, aeron_publication_error_values_t *message)
{
}

int aeron_context_init(aeron_context_t **context)
{
aeron_context_t *_context = NULL;
Expand Down Expand Up @@ -78,6 +82,7 @@ int aeron_context_init(aeron_context_t **context)
_context->client_name = NULL;
_context->error_handler = aeron_default_error_handler;
_context->error_handler_clientd = NULL;
_context->error_frame_handler = aeron_default_error_frame_handler;
_context->on_new_publication = NULL;
_context->on_new_publication_clientd = NULL;
_context->on_new_exclusive_publication = NULL;
Expand Down Expand Up @@ -337,6 +342,26 @@ void *aeron_context_get_error_handler_clientd(aeron_context_t *context)
return NULL != context ? context->error_handler_clientd : NULL;
}

int aeron_context_set_publication_error_frame_handler(aeron_context_t *context, aeron_publication_error_frame_handler_t handler, void *clientd)
{
AERON_CONTEXT_SET_CHECK_ARG_AND_RETURN(-1, context);

context->error_frame_handler = handler;
context->error_frame_handler_clientd = clientd;
return 0;
}

aeron_publication_error_frame_handler_t aeron_context_get_publication_error_frame_handler(aeron_context_t *context)
{
return NULL != context ? context->error_frame_handler : NULL;
}

void *aeron_context_get_publication_error_frame_handler_clientd(aeron_context_t *context)
{
return NULL != context ? context->error_frame_handler_clientd : NULL;
}


int aeron_context_set_on_new_publication(aeron_context_t *context, aeron_on_new_publication_t handler, void *clientd)
{
AERON_CONTEXT_SET_CHECK_ARG_AND_RETURN(-1, context);
Expand Down
3 changes: 3 additions & 0 deletions aeron-client/src/main/c/aeron_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ typedef struct aeron_context_stct
aeron_on_unavailable_counter_t on_unavailable_counter;
void *on_unavailable_counter_clientd;

aeron_publication_error_frame_handler_t error_frame_handler;
void *error_frame_handler_clientd;

aeron_agent_on_start_func_t agent_on_start_func;
void *agent_on_start_state;

Expand Down
20 changes: 20 additions & 0 deletions aeron-client/src/main/c/aeron_image.c
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,26 @@ bool aeron_image_is_closed(aeron_image_t *image)
return is_closed;
}

int aeron_image_reject(aeron_image_t *image, const char *reason)
{
if (NULL == image)
{
AERON_SET_ERR(EINVAL, "%s", "image is null");
return -1;
}

int64_t position = 0;
AERON_GET_VOLATILE(position, *image->subscriber_position);

if (aeron_subscription_reject_image(image->subscription, image->key.correlation_id, position, reason) < 0)
{
AERON_APPEND_ERR("%s", "");
return -1;
}

return 0;
}

extern int64_t aeron_image_removal_change_number(aeron_image_t *image);

extern bool aeron_image_is_in_use_by_subscription(aeron_image_t *image, int64_t last_change_number);
Expand Down
Loading

0 comments on commit b00799a

Please sign in to comment.