Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error Frames and User Image Invalidation #1604

Merged
merged 67 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
e38ed04
[Java] Use private method to update state in PublicationImage.
mikeb01 May 14, 2024
3eae27d
[Java] Add basic test and command to invalidate and image from user c…
mikeb01 May 20, 2024
2b2af13
[Java] Set image invalidation test as a slow test. Early exit of imag…
mikeb01 May 24, 2024
23da70a
[Java] Start adding ErrorFlyweight.
mikeb01 May 27, 2024
0fa5893
[Java] Pass invalidation reason through to the source publication and…
mikeb01 Jun 12, 2024
6e98c4d
[C] Start adding invalidate image information.
mikeb01 May 20, 2024
0c21ecf
[C] Invalidate image on the receiver.
mikeb01 May 24, 2024
108b9fc
[Java/C] Fix CodeQL errors.
mikeb01 May 27, 2024
68a059f
[C] Add support for send/receiving error frames and update flow contr…
mikeb01 Jun 12, 2024
0c58d8e
[C] Tighten stack allocation of buffer to work better with MSVC. Don…
mikeb01 Jun 13, 2024
1eb13e6
[Java] New system counter to track number of error messages received.…
mikeb01 Jun 13, 2024
74c5e7e
[C] New system counter to track number of error messages received. C…
mikeb01 Jun 13, 2024
805924a
[Java] Add additional tests for EOS and errors for MDC publications.
mikeb01 Jun 14, 2024
87ed206
[Java] Limit the length of the invalidation reason.
mikeb01 Jun 14, 2024
cfb36cf
[C] Limit reason length for image invalidation.
mikeb01 Jun 14, 2024
0d3a237
[Java] Allow test to run on C media driver.
mikeb01 Jun 16, 2024
8a40f02
[C/Java] Rename Error Messages to Error Frames.
mikeb01 Jun 16, 2024
9f63664
[Java] Log errors when error frames are received.
mikeb01 Jun 17, 2024
03a34c5
[C] Log errors when error frames are received.
mikeb01 Jun 17, 2024
04b88ce
[Java] Enable test for C driver.
mikeb01 Jun 17, 2024
426cb5c
[Java] Checkstyle.
mikeb01 Jun 17, 2024
d6eea13
[C] Use C++ compliant flexible array member.
mikeb01 Jun 17, 2024
9ecd94c
[C] Handle flexible array members in MSVC.
mikeb01 Jun 17, 2024
f03acaa
[C] Use flexible array member correctly in more places for reason tex…
mikeb01 Jun 18, 2024
3d09285
[Java] First pass at surfacing error frames to the client.
mikeb01 Jun 19, 2024
14b7fd0
[C] Send publication error frames to the client.
mikeb01 Jun 19, 2024
8aece1d
[Java] Enable test for C driver.
mikeb01 Jun 20, 2024
b6c3863
[Java] Only report publication error frame to client that has the pub…
mikeb01 Jun 20, 2024
9aeb7c7
[Java] Start encapsulating all fields relating to an error frame in a…
mikeb01 Jun 20, 2024
999f7a9
[Java] Add additional parameters including source address of the erro…
mikeb01 Jun 24, 2024
b5269ec
[Java] Disable test for IPv6.
mikeb01 Jun 24, 2024
0d38a86
[C] Add additional parameters to user visible error frame.
mikeb01 Jun 24, 2024
0288b6a
[C] Use ntohs instead of betoh16 (fix Mac).
mikeb01 Jun 24, 2024
de21229
[Java] Allow test to run always if the parameter is an IPv4 address.
mikeb01 Jun 25, 2024
6766a50
[Java] Naming and consistency updates.
mikeb01 Jun 27, 2024
234071e
[C] Fix compile error.
mikeb01 Jun 27, 2024
f305c82
[Java] Ensure that other clients with the same publication see the er…
mikeb01 Jun 27, 2024
7e6a7d2
[Java] Use "reject" instead of "invalidate" for marking images for er…
mikeb01 Jul 8, 2024
bb665c6
[Java] Javadoc.
mikeb01 Jul 8, 2024
2ba58e9
[Java] Remove last utf-8 encoding for error frames.
mikeb01 Jul 23, 2024
ae23de5
[Java] Add error frames sent counter.
mikeb01 Jul 23, 2024
1079d47
[C] Add errors frames sent counter.
mikeb01 Jul 23, 2024
5fe6304
[C] Start adding publication error frame information.
mikeb01 Jul 24, 2024
58d5e84
[C] Basic working flow for C & C++ wrapper for error frames.
mikeb01 Aug 1, 2024
b556e44
[Java] Rename to explicitly reference that the handler is dealing spe…
mikeb01 Aug 1, 2024
861d5bb
[C/C++] Start adding test for validating exclusive publications recei…
mikeb01 Aug 1, 2024
5cdaefb
[C++ Wrapper] Comment out test.
mikeb01 Aug 1, 2024
7bca284
[C] Ensure that error frames are pass to the client for exclusive pub…
mikeb01 Aug 2, 2024
7ab586b
[Java] Checkstyle.
mikeb01 Aug 2, 2024
2714904
[C++] Compilation warning.
mikeb01 Aug 2, 2024
76f464f
[Java] Tidy test and docs.
mikeb01 Aug 4, 2024
10d449c
[C/C++] Add support for reading the fields from a PublisherErrorFrame…
mikeb01 Aug 5, 2024
7dbef69
[C/C++] Add comments and fix header.
mikeb01 Aug 5, 2024
6ce16db
[C/C++] Fix compile error on Window.
mikeb01 Aug 5, 2024
86525fc
[Java] Add test to verify rejection doesn't work for IPC.
mikeb01 Aug 6, 2024
b348404
[C] Use correct client data for the publication error frame callback …
mikeb01 Aug 7, 2024
efe1e5a
[C/C++] Add test to validate that publication error frames are passed…
mikeb01 Aug 8, 2024
ee21353
[Java] Update Javadoc.
mikeb01 Aug 28, 2024
ccdc650
[Java] Include destination registration id in publication error frame.
mikeb01 Aug 27, 2024
e608117
[Java] Additional validation in RejectImageTest.
mikeb01 Aug 29, 2024
81be217
[C] Add destination registration id support for publication error fra…
mikeb01 Aug 29, 2024
72e7a69
[Java] Comments and cleanup.
mikeb01 Aug 29, 2024
fb7641c
[Java] Update event code for REJECT_IMAGE.
mikeb01 Sep 2, 2024
d443b27
[Java] Fix Javadoc.
mikeb01 Sep 2, 2024
5c925f1
[C] Small renames, fixes and verification for publication error frames.
mikeb01 Sep 2, 2024
42bc837
[C] Add logging for REJECT_IMAGE.
mikeb01 Sep 2, 2024
a8d3c6f
[Java] Fix CodeQL and add test to RejectImageTest.
mikeb01 Sep 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion aeron-agent/src/main/java/io/aeron/agent/CmdInterceptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ class CmdInterceptor
CMD_IN_REMOVE_RCV_DESTINATION,
CMD_OUT_ON_CLIENT_TIMEOUT,
CMD_IN_TERMINATE_DRIVER,
CMD_IN_REMOVE_DESTINATION_BY_ID);
CMD_IN_REMOVE_DESTINATION_BY_ID,
CMD_IN_REJECT_IMAGE);
mikeb01 marked this conversation as resolved.
Show resolved Hide resolved

@SuppressWarnings("methodlength")
@Advice.OnMethodEnter
Expand Down Expand Up @@ -158,6 +159,10 @@ static void logCmd(final int msgTypeId, final DirectBuffer buffer, final int ind
case REMOVE_DESTINATION_BY_ID:
LOGGER.log(CMD_IN_REMOVE_DESTINATION_BY_ID, buffer, index, length);
break;

case REJECT_IMAGE:
LOGGER.log(CMD_IN_REJECT_IMAGE, buffer, index, length);
break;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,12 @@ public enum DriverEventCode implements EventCode
/**
* Remove destination by id
*/
CMD_IN_REMOVE_DESTINATION_BY_ID(56, DriverEventDissector::dissectCommand);
CMD_IN_REMOVE_DESTINATION_BY_ID(56, DriverEventDissector::dissectCommand),

/**
* Reject image command received by the driver.
*/
CMD_IN_REJECT_IMAGE(57, DriverEventDissector::dissectCommand);

static final int EVENT_CODE_TYPE = EventCodeType.DRIVER.getTypeCode();

Expand Down
16 changes: 16 additions & 0 deletions aeron-agent/src/main/java/io/aeron/agent/DriverEventDissector.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ final class DriverEventDissector
private static final ClientTimeoutFlyweight CLIENT_TIMEOUT = new ClientTimeoutFlyweight();
private static final TerminateDriverFlyweight TERMINATE_DRIVER = new TerminateDriverFlyweight();
private static final DestinationByIdMessageFlyweight DESTINATION_BY_ID = new DestinationByIdMessageFlyweight();
private static final RejectImageFlyweight REJECT_IMAGE = new RejectImageFlyweight();

static final String CONTEXT = "DRIVER";

Expand Down Expand Up @@ -220,6 +221,11 @@ static void dissectCommand(
dissectDestinationById(builder);
break;

case CMD_IN_REJECT_IMAGE:
REJECT_IMAGE.wrap(buffer, offset + encodedLength);
dissectRejectImage(builder);
break;

default:
builder.append("COMMAND_UNKNOWN: ").append(code);
break;
Expand Down Expand Up @@ -779,4 +785,14 @@ private static void dissectDestinationById(final StringBuilder builder)
.append("resourceRegistrationId=").append(DESTINATION_BY_ID.resourceRegistrationId())
.append(" destinationRegistrationId=").append(DESTINATION_BY_ID.destinationRegistrationId());
}

private static void dissectRejectImage(final StringBuilder builder)
{
builder
.append("clientId=").append(REJECT_IMAGE.clientId())
.append(" correlationId=").append(REJECT_IMAGE.correlationId())
.append(" imageCorrelationId=").append(REJECT_IMAGE.imageCorrelationId())
.append(" position=").append(REJECT_IMAGE.position())
.append(" reason=").append(REJECT_IMAGE.reason());
}
}
168 changes: 167 additions & 1 deletion aeron-client/src/main/c/aeron_client_conductor.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ int aeron_client_conductor_init(aeron_client_conductor_t *conductor, aeron_conte
conductor->error_handler = context->error_handler;
conductor->error_handler_clientd = context->error_handler_clientd;

conductor->error_frame_handler = context->error_frame_handler;
conductor->error_frame_handler_clientd = context->error_frame_handler_clientd;

conductor->on_new_publication = context->on_new_publication;
conductor->on_new_publication_clientd = context->on_new_publication_clientd;

Expand Down Expand Up @@ -352,6 +355,19 @@ void aeron_client_conductor_on_driver_response(int32_t type_id, uint8_t *buffer,
break;
}

case AERON_RESPONSE_ON_PUBLICATION_ERROR:
{
aeron_publication_error_t *response = (aeron_publication_error_t *)buffer;

if (length < sizeof(aeron_publication_error_t))
{
goto malformed_command;
}

result = aeron_client_conductor_on_error_frame(conductor, response);
break;
}

case AERON_RESPONSE_ON_STATIC_COUNTER:
{
aeron_static_counter_response_t *response = (aeron_static_counter_response_t *)buffer;
Expand All @@ -367,7 +383,6 @@ void aeron_client_conductor_on_driver_response(int32_t type_id, uint8_t *buffer,

default:
{

AERON_CLIENT_FORMAT_BUFFER(error_message, "response=%x unknown", type_id);
conductor->error_handler(
conductor->error_handler_clientd, AERON_ERROR_CODE_UNKNOWN_COMMAND_TYPE_ID, error_message);
Expand Down Expand Up @@ -2526,6 +2541,119 @@ int aeron_client_conductor_on_operation_success(
return 0;
}

struct aeron_client_conductor_clientd_stct
{
aeron_client_conductor_t *conductor;
void *clientd;
};
typedef struct aeron_client_conductor_clientd_stct aeron_client_conductor_clientd_t;

void aeron_client_conductor_forward_error(void *clientd, int64_t key, void *value)
{
aeron_client_conductor_clientd_t *conductor_clientd = (aeron_client_conductor_clientd_t *)clientd;
aeron_client_conductor_t *conductor = conductor_clientd->conductor;
aeron_publication_error_t *response = (aeron_publication_error_t *)conductor_clientd->clientd;
aeron_client_command_base_t *resource = (aeron_client_command_base_t *)value;

const bool is_publication = AERON_CLIENT_TYPE_PUBLICATION == resource->type &&
((aeron_publication_t *)resource)->original_registration_id == response->registration_id;
const bool is_exclusive_publication = AERON_CLIENT_TYPE_EXCLUSIVE_PUBLICATION == resource->type &&
((aeron_exclusive_publication_t *)resource)->original_registration_id == response->registration_id;

if (is_publication || is_exclusive_publication)
{
conductor->error_frame_handler(
conductor->error_frame_handler_clientd, (aeron_publication_error_values_t *)response);
}
}

#ifdef _MSC_VER
#define _Static_assert static_assert
#endif

_Static_assert(
sizeof(aeron_publication_error_t) == sizeof(aeron_publication_error_values_t),
"sizeof(aeron_publication_error_t) must be equal to sizeof(aeron_publication_error_values_t)");
_Static_assert(
offsetof(aeron_publication_error_t, registration_id) == offsetof(aeron_publication_error_values_t, registration_id),
"offsetof(aeron_publication_error_t, registration_id) must match offsetof(aeron_publication_error_values_t, registration_id)");
_Static_assert(
offsetof(aeron_publication_error_t, destination_registration_id) == offsetof(aeron_publication_error_values_t, destination_registration_id),
"offsetof(aeron_publication_error_t, destination_registration_id) must match offsetof(aeron_publication_error_values_t, destination_registration_id)");
_Static_assert(
offsetof(aeron_publication_error_t, session_id) == offsetof(aeron_publication_error_values_t, session_id),
"offsetof(aeron_publication_error_t, session_id) must match offsetof(aeron_publication_error_values_t, session_id)");
_Static_assert(
offsetof(aeron_publication_error_t, stream_id) == offsetof(aeron_publication_error_values_t, stream_id),
"offsetof(aeron_publication_error_t, stream_id) must match offsetof(aeron_publication_error_values_t, stream_id)");
_Static_assert(
offsetof(aeron_publication_error_t, receiver_id) == offsetof(aeron_publication_error_values_t, receiver_id),
"offsetof(aeron_publication_error_t, receiver_id) must match offsetof(aeron_publication_error_values_t, receiver_id)");
_Static_assert(
offsetof(aeron_publication_error_t, group_tag) == offsetof(aeron_publication_error_values_t, group_tag),
"offsetof(aeron_publication_error_t, group_tag) must match offsetof(aeron_publication_error_values_t, group_tag)");
_Static_assert(
offsetof(aeron_publication_error_t, address_type) == offsetof(aeron_publication_error_values_t, address_type),
"offsetof(aeron_publication_error_t, address_type) must match offsetof(aeron_publication_error_values_t, address_type)");
_Static_assert(
offsetof(aeron_publication_error_t, source_port) == offsetof(aeron_publication_error_values_t, source_port),
"offsetof(aeron_publication_error_t, address_port) must match offsetof(aeron_publication_error_values_t, address_port)");
_Static_assert(
offsetof(aeron_publication_error_t, source_address) == offsetof(aeron_publication_error_values_t, source_address),
"offsetof(aeron_publication_error_t, source_address) must match offsetof(aeron_publication_error_values_t, source_address)");
_Static_assert(
offsetof(aeron_publication_error_t, error_code) == offsetof(aeron_publication_error_values_t, error_code),
"offsetof(aeron_publication_error_t, error_code) must match offsetof(aeron_publication_error_values_t, error_code)");
_Static_assert(
offsetof(aeron_publication_error_t, error_message_length) == offsetof(aeron_publication_error_values_t, error_message_length),
"offsetof(aeron_publication_error_t, error_message_length) must match offsetof(aeron_publication_error_values_t, error_message_length)");
_Static_assert(
offsetof(aeron_publication_error_t, error_message) == offsetof(aeron_publication_error_values_t, error_message),
"offsetof(aeron_publication_error_t, error_message) must match offsetof(aeron_publication_error_values_t, error_message)");

int aeron_client_conductor_on_error_frame(aeron_client_conductor_t *conductor, aeron_publication_error_t *response)
{
aeron_client_conductor_clientd_t clientd = {
.conductor = conductor,
.clientd = response
};

aeron_int64_to_ptr_hash_map_for_each(
&conductor->resource_by_id_map, aeron_client_conductor_forward_error, (void *)&clientd);

return 0;
}

int aeron_publication_error_values_copy(aeron_publication_error_values_t **dst, aeron_publication_error_values_t *src)
{
if (NULL == src)
{
AERON_SET_ERR(-1, "%s", "src must not be NULL");
return -1;
}

if (NULL == dst)
{
AERON_SET_ERR(-1, "%s", "dst must not be NULL");
return -1;
}

size_t error_values_size = sizeof(*src) + (size_t)src->error_message_length;
if (aeron_alloc((void **)dst, error_values_size) < 0)
{
AERON_APPEND_ERR("%s", "");
return -1;
}

memcpy((void *)*dst, (void *)src, error_values_size);
return 0;
}

void aeron_publication_error_values_delete(aeron_publication_error_values_t *to_delete)
{
aeron_free(to_delete);
}

aeron_subscription_t *aeron_client_conductor_find_subscription_by_id(
aeron_client_conductor_t *conductor, int64_t registration_id)
{
Expand Down Expand Up @@ -2859,6 +2987,44 @@ int aeron_client_conductor_offer_destination_command(
return 0;
}

int aeron_client_conductor_reject_image(
aeron_client_conductor_t *conductor,
int64_t image_correlation_id,
int64_t position,
const char *reason,
int32_t command_type)
{
size_t reason_length = strlen(reason);
const size_t command_length = sizeof(aeron_reject_image_command_t) + reason_length;

int rb_offer_fail_count = 0;
int32_t offset;
while ((offset = aeron_mpsc_rb_try_claim(&conductor->to_driver_buffer, command_type, command_length)) < 0)
{
if (++rb_offer_fail_count > AERON_CLIENT_COMMAND_RB_FAIL_THRESHOLD)
{
const char *err_buffer = "reject_image command could not be sent";
conductor->error_handler(conductor->error_handler_clientd, AERON_CLIENT_ERROR_BUFFER_FULL, err_buffer);
AERON_SET_ERR(AERON_CLIENT_ERROR_BUFFER_FULL, "%s", err_buffer);
return -1;
}

sched_yield();
}

uint8_t *ptr = (conductor->to_driver_buffer.buffer + offset);
aeron_reject_image_command_t *command = (aeron_reject_image_command_t *)ptr;
command->image_correlation_id = image_correlation_id;
command->position = position;
command->reason_length = (int32_t)reason_length;
memcpy(ptr + offsetof(aeron_reject_image_command_t, reason_text), reason, reason_length);
command->reason_text[reason_length] = '\0';

aeron_mpsc_rb_commit(&conductor->to_driver_buffer, offset);

return 0;
}

extern int aeron_counter_heartbeat_timestamp_find_counter_id_by_registration_id(
aeron_counters_reader_t *counters_reader, int32_t type_id, int64_t registration_id);
extern bool aeron_counter_heartbeat_timestamp_is_active(
Expand Down
11 changes: 11 additions & 0 deletions aeron-client/src/main/c/aeron_client_conductor.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ typedef struct aeron_client_conductor_stct
aeron_error_handler_t error_handler;
void *error_handler_clientd;

aeron_publication_error_frame_handler_t error_frame_handler;
void *error_frame_handler_clientd;

aeron_on_new_publication_t on_new_publication;
void *on_new_publication_clientd;

Expand Down Expand Up @@ -384,6 +387,7 @@ int aeron_client_conductor_on_unavailable_counter(
int aeron_client_conductor_on_static_counter(aeron_client_conductor_t *conductor, aeron_static_counter_response_t *response);

int aeron_client_conductor_on_client_timeout(aeron_client_conductor_t *conductor, aeron_client_timeout_t *response);
int aeron_client_conductor_on_error_frame(aeron_client_conductor_t *conductor, aeron_publication_error_t *response);

int aeron_client_conductor_get_or_create_log_buffer(
aeron_client_conductor_t *conductor,
Expand All @@ -405,6 +409,13 @@ int aeron_client_conductor_offer_destination_command(
const char *uri,
int64_t *correlation_id);

int aeron_client_conductor_reject_image(
aeron_client_conductor_t *conductor,
int64_t image_correlation_id,
int64_t position,
const char *reason,
int32_t command_type);

inline int aeron_counter_heartbeat_timestamp_find_counter_id_by_registration_id(
aeron_counters_reader_t *counters_reader, int32_t type_id, int64_t registration_id)
{
Expand Down
25 changes: 25 additions & 0 deletions aeron-client/src/main/c/aeron_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ void aeron_default_error_handler(void *clientd, int errcode, const char *message
exit(EXIT_FAILURE);
}

void aeron_default_error_frame_handler(void *clientd, aeron_publication_error_values_t *message)
{
}

int aeron_context_init(aeron_context_t **context)
{
aeron_context_t *_context = NULL;
Expand Down Expand Up @@ -78,6 +82,7 @@ int aeron_context_init(aeron_context_t **context)
_context->client_name = NULL;
_context->error_handler = aeron_default_error_handler;
_context->error_handler_clientd = NULL;
_context->error_frame_handler = aeron_default_error_frame_handler;
_context->on_new_publication = NULL;
_context->on_new_publication_clientd = NULL;
_context->on_new_exclusive_publication = NULL;
Expand Down Expand Up @@ -337,6 +342,26 @@ void *aeron_context_get_error_handler_clientd(aeron_context_t *context)
return NULL != context ? context->error_handler_clientd : NULL;
}

int aeron_context_set_publication_error_frame_handler(aeron_context_t *context, aeron_publication_error_frame_handler_t handler, void *clientd)
{
AERON_CONTEXT_SET_CHECK_ARG_AND_RETURN(-1, context);

context->error_frame_handler = handler;
context->error_frame_handler_clientd = clientd;
return 0;
}

aeron_publication_error_frame_handler_t aeron_context_get_publication_error_frame_handler(aeron_context_t *context)
{
return NULL != context ? context->error_frame_handler : NULL;
}

void *aeron_context_get_publication_error_frame_handler_clientd(aeron_context_t *context)
{
return NULL != context ? context->error_frame_handler_clientd : NULL;
}


int aeron_context_set_on_new_publication(aeron_context_t *context, aeron_on_new_publication_t handler, void *clientd)
{
AERON_CONTEXT_SET_CHECK_ARG_AND_RETURN(-1, context);
Expand Down
3 changes: 3 additions & 0 deletions aeron-client/src/main/c/aeron_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ typedef struct aeron_context_stct
aeron_on_unavailable_counter_t on_unavailable_counter;
void *on_unavailable_counter_clientd;

aeron_publication_error_frame_handler_t error_frame_handler;
void *error_frame_handler_clientd;

aeron_agent_on_start_func_t agent_on_start_func;
void *agent_on_start_state;

Expand Down
20 changes: 20 additions & 0 deletions aeron-client/src/main/c/aeron_image.c
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,26 @@ bool aeron_image_is_closed(aeron_image_t *image)
return is_closed;
}

int aeron_image_reject(aeron_image_t *image, const char *reason)
{
if (NULL == image)
{
AERON_SET_ERR(EINVAL, "%s", "image is null");
return -1;
}

int64_t position = 0;
AERON_GET_VOLATILE(position, *image->subscriber_position);

if (aeron_subscription_reject_image(image->subscription, image->key.correlation_id, position, reason) < 0)
{
AERON_APPEND_ERR("%s", "");
return -1;
}

return 0;
}

extern int64_t aeron_image_removal_change_number(aeron_image_t *image);

extern bool aeron_image_is_in_use_by_subscription(aeron_image_t *image, int64_t last_change_number);
Expand Down
Loading