Skip to content

Commit

Permalink
[C] implement aeron_uri_get_publication_window_length_param
Browse files Browse the repository at this point in the history
  • Loading branch information
nbradac committed Jan 15, 2025
1 parent 32d42e8 commit cc01af8
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 5 deletions.
34 changes: 34 additions & 0 deletions aeron-client/src/main/c/concurrent/aeron_logbuffer_descriptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -266,5 +266,39 @@ extern bool aeron_logbuffer_rotate_log(
aeron_logbuffer_metadata_t *log_meta_data, int32_t current_term_count, int32_t current_term_id);
extern void aeron_logbuffer_fill_default_header(
uint8_t *log_meta_data_buffer, int32_t session_id, int32_t stream_id, int32_t initial_term_id);
extern void aeron_logbuffer_metadata_init(
uint8_t *log_meta_data_buffer,
int64_t end_of_stream_position,
int32_t is_connected,
int32_t active_transport_count,
int64_t correlation_id,
int32_t initial_term_id,
int32_t mtu_length,
int32_t term_length,
int32_t page_size,
int32_t publication_window_length,
int32_t receiver_window_length,
int32_t socket_sndbuf_length,
int32_t os_default_socket_sndbuf_length,
int32_t os_max_socket_sndbuf_length,
int32_t socket_rcvbuf_length,
int32_t os_default_socket_rcvbuf_length,
int32_t os_max_socket_rcvbuf_length,
int32_t max_resend,
int32_t session_id,
int32_t stream_id,
//int64_t entity_tag,
//int64_t response_correlation_id,
int64_t linger_timeout_ns,
int64_t untethered_window_limit_timeout_ns,
int64_t untethered_resting_timeout_ns,
//uint8_t group,
//uint8_t is_response,
uint8_t rejoin,
uint8_t reliable,
uint8_t sparse,
uint8_t signal_eos,
uint8_t spies_simulate_connection,
uint8_t tether);
extern void aeron_logbuffer_apply_default_header(uint8_t *log_meta_data_buffer, uint8_t *buffer);
extern size_t aeron_logbuffer_compute_fragmented_length(size_t length, size_t max_payload_length);
3 changes: 1 addition & 2 deletions aeron-driver/src/main/c/aeron_ipc_publication.c
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,7 @@ int aeron_ipc_publication_create(
_pub->starting_term_id = params->has_position ? params->term_id : initial_term_id;
_pub->starting_term_offset = params->has_position ? params->term_offset : 0;
_pub->position_bits_to_shift = (size_t)aeron_number_of_trailing_zeroes((int32_t)params->term_length);
_pub->term_window_length = (int64_t)aeron_producer_window_length(
context->ipc_publication_window_length, params->term_length);
_pub->term_window_length = params->publication_window_length;
_pub->trip_gain = _pub->term_window_length / 8;
_pub->unblock_timeout_ns = (int64_t)context->publication_unblock_timeout_ns;
_pub->untethered_window_limit_timeout_ns = (int64_t)params->untethered_window_limit_timeout_ns;
Expand Down
3 changes: 1 addition & 2 deletions aeron-driver/src/main/c/aeron_network_publication.c
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,7 @@ int aeron_network_publication_create(
_pub->mtu_length = params->mtu_length;
_pub->max_messages_per_send = context->network_publication_max_messages_per_send;
_pub->current_messages_per_send = _pub->max_messages_per_send;
_pub->term_window_length = (int64_t)aeron_producer_window_length(
context->publication_window_length, params->term_length);
_pub->term_window_length = params->publication_window_length;
_pub->linger_timeout_ns = (int64_t)params->linger_timeout_ns;
_pub->untethered_window_limit_timeout_ns = (int64_t)params->untethered_window_limit_timeout_ns;
_pub->untethered_resting_timeout_ns = (int64_t)params->untethered_resting_timeout_ns;
Expand Down
45 changes: 44 additions & 1 deletion aeron-driver/src/main/c/uri/aeron_driver_uri.c
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,45 @@ int aeron_uri_get_mtu_length_param(aeron_uri_params_t *uri_params, aeron_driver_

int aeron_uri_get_publication_window_length_param(aeron_uri_params_t *uri_params, aeron_driver_uri_publication_params_t *params)
{
params->publication_window_length = 0; // TODO
const char *value_str;

if ((value_str = aeron_uri_find_param_value(uri_params, AERON_URI_PUBLICATION_WINDOW_KEY)) != NULL)
{
uint64_t value;

if (-1 == aeron_parse_size64(value_str, &value))
{
AERON_SET_ERR(EINVAL, "could not parse %s=%s in URI", AERON_URI_PUBLICATION_WINDOW_KEY, value_str);
return -1;
}

if (value < params->mtu_length)
{
AERON_SET_ERR(
EINVAL,
"%s=" PRIu64 " cannot be less than the %s=" PRIu64,
AERON_URI_PUBLICATION_WINDOW_KEY,
value,
AERON_URI_MTU_LENGTH_KEY,
params->mtu_length);

Check notice

Code scanning / CodeQL

Too many arguments to formatting function Note

Format for aeron_err_set (in a macro expansion) expects 2 arguments but given 4
return -1;
}

if (value > (params->term_length >> 1))
{
AERON_SET_ERR(
EINVAL,
"%s=" PRIu64 " must not exceed half the %s=" PRIu64,
AERON_URI_PUBLICATION_WINDOW_KEY,
value,
AERON_URI_TERM_LENGTH_KEY,
params->term_length);

Check notice

Code scanning / CodeQL

Too many arguments to formatting function Note

Format for aeron_err_set (in a macro expansion) expects 2 arguments but given 4
return -1;
}

params->publication_window_length = (int32_t)value;
params->has_publication_window_length = true;
}

return 0;
}
Expand Down Expand Up @@ -247,6 +285,11 @@ int aeron_diver_uri_publication_params(
return -1;
}

params->publication_window_length = (int32_t)aeron_producer_window_length(
AERON_URI_IPC == uri->type ? context->ipc_publication_window_length : context->publication_window_length,
params->term_length);
params->has_publication_window_length = false;

if (aeron_uri_get_publication_window_length_param(uri_params, params) < 0)
{
return -1;
Expand Down
1 change: 1 addition & 0 deletions aeron-driver/src/main/c/uri/aeron_driver_uri.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ typedef struct aeron_driver_uri_publication_params_stct
int64_t response_correlation_id;
bool has_max_resend;
uint32_t max_resend;
bool has_publication_window_length;
int32_t publication_window_length;
}
aeron_driver_uri_publication_params_t;
Expand Down

0 comments on commit cc01af8

Please sign in to comment.