Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C] add a call to init the new fields in the logbuffer metadata #1717

Merged
merged 3 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions aeron-client/src/main/c/concurrent/aeron_logbuffer_descriptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -266,5 +266,39 @@ extern bool aeron_logbuffer_rotate_log(
aeron_logbuffer_metadata_t *log_meta_data, int32_t current_term_count, int32_t current_term_id);
extern void aeron_logbuffer_fill_default_header(
uint8_t *log_meta_data_buffer, int32_t session_id, int32_t stream_id, int32_t initial_term_id);
extern void aeron_logbuffer_metadata_init(
uint8_t *log_meta_data_buffer,
int64_t end_of_stream_position,
int32_t is_connected,
int32_t active_transport_count,
int64_t correlation_id,
int32_t initial_term_id,
int32_t mtu_length,
int32_t term_length,
int32_t page_size,
int32_t publication_window_length,
int32_t receiver_window_length,
int32_t socket_sndbuf_length,
int32_t os_default_socket_sndbuf_length,
int32_t os_max_socket_sndbuf_length,
int32_t socket_rcvbuf_length,
int32_t os_default_socket_rcvbuf_length,
int32_t os_max_socket_rcvbuf_length,
int32_t max_resend,
int32_t session_id,
int32_t stream_id,
int64_t entity_tag,
int64_t response_correlation_id,
int64_t linger_timeout_ns,
int64_t untethered_window_limit_timeout_ns,
int64_t untethered_resting_timeout_ns,
uint8_t group,
uint8_t is_response,
uint8_t rejoin,
uint8_t reliable,
uint8_t sparse,
uint8_t signal_eos,
uint8_t spies_simulate_connection,
uint8_t tether);
extern void aeron_logbuffer_apply_default_header(uint8_t *log_meta_data_buffer, uint8_t *buffer);
extern size_t aeron_logbuffer_compute_fragmented_length(size_t length, size_t max_payload_length);
79 changes: 79 additions & 0 deletions aeron-client/src/main/c/concurrent/aeron_logbuffer_descriptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,85 @@ inline void aeron_logbuffer_fill_default_header(
data_header->reserved_value = AERON_DATA_HEADER_DEFAULT_RESERVED_VALUE;
}

/*
* Does NOT initialize the following fields:
* - term_tail_counters
* - active_term_count
*/
inline void aeron_logbuffer_metadata_init(
uint8_t *log_meta_data_buffer,
int64_t end_of_stream_position,
int32_t is_connected,
int32_t active_transport_count,
int64_t correlation_id,
int32_t initial_term_id,
int32_t mtu_length,
int32_t term_length,
int32_t page_size,
int32_t publication_window_length,
int32_t receiver_window_length,
int32_t socket_sndbuf_length,
int32_t os_default_socket_sndbuf_length,
int32_t os_max_socket_sndbuf_length,
int32_t socket_rcvbuf_length,
int32_t os_default_socket_rcvbuf_length,
int32_t os_max_socket_rcvbuf_length,
int32_t max_resend,
int32_t session_id,
int32_t stream_id,
int64_t entity_tag,
int64_t response_correlation_id,
int64_t linger_timeout_ns,
int64_t untethered_window_limit_timeout_ns,
int64_t untethered_resting_timeout_ns,
uint8_t group,
uint8_t is_response,
uint8_t rejoin,
uint8_t reliable,
uint8_t sparse,
uint8_t signal_eos,
uint8_t spies_simulate_connection,
uint8_t tether)
{
aeron_logbuffer_metadata_t *log_meta_data = (aeron_logbuffer_metadata_t *)log_meta_data_buffer;

log_meta_data->end_of_stream_position = end_of_stream_position;
log_meta_data->is_connected = is_connected;
log_meta_data->active_transport_count = active_transport_count;

log_meta_data->correlation_id = correlation_id;
log_meta_data->initial_term_id = initial_term_id;
log_meta_data->mtu_length = mtu_length;
log_meta_data->term_length = term_length;
log_meta_data->page_size = page_size;

log_meta_data->publication_window_length = publication_window_length;
log_meta_data->receiver_window_length = receiver_window_length;
log_meta_data->socket_sndbuf_length = socket_sndbuf_length;
log_meta_data->os_default_socket_sndbuf_length = os_default_socket_sndbuf_length;
log_meta_data->os_max_socket_sndbuf_length = os_max_socket_sndbuf_length;
log_meta_data->socket_rcvbuf_length = socket_rcvbuf_length;
log_meta_data->os_default_socket_rcvbuf_length = os_default_socket_rcvbuf_length;
log_meta_data->os_max_socket_rcvbuf_length = os_max_socket_rcvbuf_length;
log_meta_data->max_resend = max_resend;

aeron_logbuffer_fill_default_header(log_meta_data_buffer, session_id, stream_id, initial_term_id);

log_meta_data->entity_tag = entity_tag;
log_meta_data->response_correlation_id = response_correlation_id;
log_meta_data->linger_timeout_ns = linger_timeout_ns;
log_meta_data->untethered_window_limit_timeout_ns = untethered_window_limit_timeout_ns;
log_meta_data->untethered_resting_timeout_ns = untethered_resting_timeout_ns;
log_meta_data->group = group;
log_meta_data->is_response = is_response;
log_meta_data->rejoin = rejoin;
log_meta_data->reliable = reliable;
log_meta_data->sparse = sparse;
log_meta_data->signal_eos = signal_eos;
log_meta_data->spies_simulate_connection = spies_simulate_connection;
log_meta_data->tether = tether;
}

inline void aeron_logbuffer_apply_default_header(uint8_t *log_meta_data_buffer, uint8_t *buffer)
{
aeron_logbuffer_metadata_t *log_meta_data = (aeron_logbuffer_metadata_t *)log_meta_data_buffer;
Expand Down
2 changes: 1 addition & 1 deletion aeron-driver/src/main/c/aeron_driver_conductor.c
Original file line number Diff line number Diff line change
Expand Up @@ -6009,7 +6009,7 @@ void aeron_driver_conductor_on_create_publication_image(void *clientd, void *ite
&image,
endpoint,
destination,
conductor->context,
conductor,
registration_id,
command->session_id,
command->stream_id,
Expand Down
47 changes: 35 additions & 12 deletions aeron-driver/src/main/c/aeron_ipc_publication.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,16 +137,40 @@ int aeron_ipc_publication_create(

int64_t now_ns = aeron_clock_cached_nano_time(context->cached_clock);

_pub->log_meta_data->initial_term_id = initial_term_id;
_pub->log_meta_data->mtu_length = (int32_t)params->mtu_length;
_pub->log_meta_data->term_length = (int32_t)params->term_length;
_pub->log_meta_data->page_size = (int32_t)context->file_page_size;
_pub->log_meta_data->correlation_id = registration_id;
_pub->log_meta_data->is_connected = 0;
_pub->log_meta_data->active_transport_count = 0;
_pub->log_meta_data->end_of_stream_position = INT64_MAX;
aeron_logbuffer_fill_default_header(
_pub->mapped_raw_log.log_meta_data.addr, session_id, stream_id, initial_term_id);
aeron_logbuffer_metadata_init(
_pub->mapped_raw_log.log_meta_data.addr,
INT64_MAX,
0,
0,
registration_id,
initial_term_id,
(int32_t)params->mtu_length,
(int32_t)params->term_length,
(int32_t)context->file_page_size,
(int32_t)params->publication_window_length,
0,
0,
(int32_t)context->os_buffer_lengths.default_so_sndbuf,
(int32_t)context->os_buffer_lengths.max_so_sndbuf,
0,
(int32_t)context->os_buffer_lengths.default_so_rcvbuf,
(int32_t)context->os_buffer_lengths.max_so_rcvbuf,
(int32_t)params->max_resend,
session_id,
stream_id,
(int64_t)params->entity_tag,
(int64_t)params->response_correlation_id,
(int64_t)params->linger_timeout_ns,
(int64_t)params->untethered_window_limit_timeout_ns,
(int64_t)params->untethered_resting_timeout_ns,
(uint8_t)false,
(uint8_t)params->is_response,
(uint8_t)false,
(uint8_t)false,
(uint8_t)params->is_sparse,
(uint8_t)params->signal_eos,
(uint8_t)params->spies_simulate_connection,
(uint8_t)false);

_pub->conductor_fields.subscribable.correlation_id = registration_id;
_pub->conductor_fields.subscribable.array = NULL;
Expand Down Expand Up @@ -174,8 +198,7 @@ int aeron_ipc_publication_create(
_pub->starting_term_id = params->has_position ? params->term_id : initial_term_id;
_pub->starting_term_offset = params->has_position ? params->term_offset : 0;
_pub->position_bits_to_shift = (size_t)aeron_number_of_trailing_zeroes((int32_t)params->term_length);
_pub->term_window_length = (int64_t)aeron_producer_window_length(
context->ipc_publication_window_length, params->term_length);
_pub->term_window_length = params->publication_window_length;
_pub->trip_gain = _pub->term_window_length / 8;
_pub->unblock_timeout_ns = (int64_t)context->publication_unblock_timeout_ns;
_pub->untethered_window_limit_timeout_ns = (int64_t)params->untethered_window_limit_timeout_ns;
Expand Down
51 changes: 38 additions & 13 deletions aeron-driver/src/main/c/aeron_network_publication.c
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,14 @@ int aeron_network_publication_create(
int64_t *retransmit_overflow_counter = aeron_system_counter_addr(
system_counters, AERON_SYSTEM_COUNTER_RETRANSMIT_OVERFLOW);

bool has_group_semantics = aeron_udp_channel_has_group_semantics(endpoint->conductor_fields.udp_channel);

if (aeron_retransmit_handler_init(
&_pub->retransmit_handler,
aeron_system_counter_addr(system_counters, AERON_SYSTEM_COUNTER_INVALID_PACKETS),
context->retransmit_unicast_delay_ns,
context->retransmit_unicast_linger_ns,
aeron_udp_channel_has_group_semantics(endpoint->conductor_fields.udp_channel),
has_group_semantics,
params->has_max_resend ? params->max_resend : context->max_resend,
retransmit_overflow_counter) < 0)
{
Expand Down Expand Up @@ -226,16 +228,40 @@ int aeron_network_publication_create(
// Called from conductor thread...
int64_t now_ns = aeron_clock_cached_nano_time(context->cached_clock);

_pub->log_meta_data->initial_term_id = initial_term_id;
_pub->log_meta_data->mtu_length = (int32_t)params->mtu_length;
_pub->log_meta_data->term_length = (int32_t)params->term_length;
_pub->log_meta_data->page_size = (int32_t)context->file_page_size;
_pub->log_meta_data->correlation_id = registration_id;
_pub->log_meta_data->is_connected = 0;
_pub->log_meta_data->active_transport_count = 0;
_pub->log_meta_data->end_of_stream_position = INT64_MAX;
aeron_logbuffer_fill_default_header(
_pub->mapped_raw_log.log_meta_data.addr, session_id, stream_id, initial_term_id);
aeron_logbuffer_metadata_init(
_pub->mapped_raw_log.log_meta_data.addr,
INT64_MAX,
0,
0,
registration_id,
initial_term_id,
(int32_t)params->mtu_length,
(int32_t)params->term_length,
(int32_t)context->file_page_size,
(int32_t)params->publication_window_length,
0,
(int32_t)endpoint->conductor_fields.udp_channel->socket_sndbuf_length,
(int32_t)context->os_buffer_lengths.default_so_sndbuf,
(int32_t)context->os_buffer_lengths.max_so_sndbuf,
(int32_t)endpoint->conductor_fields.udp_channel->socket_rcvbuf_length,
(int32_t)context->os_buffer_lengths.default_so_rcvbuf,
(int32_t)context->os_buffer_lengths.max_so_rcvbuf,
(int32_t)params->max_resend,
session_id,
stream_id,
(int64_t)params->entity_tag,
(int64_t)params->response_correlation_id,
(int64_t)params->linger_timeout_ns,
(int64_t)params->untethered_window_limit_timeout_ns,
(int64_t)params->untethered_resting_timeout_ns,
(uint8_t)has_group_semantics,
(uint8_t)params->is_response,
(uint8_t)false,
(uint8_t)false,
(uint8_t)params->is_sparse,
(uint8_t)params->signal_eos,
(uint8_t)params->spies_simulate_connection,
(uint8_t)false);

_pub->endpoint = endpoint;
_pub->flow_control = flow_control_strategy;
Expand Down Expand Up @@ -279,8 +305,7 @@ int aeron_network_publication_create(
_pub->mtu_length = params->mtu_length;
_pub->max_messages_per_send = context->network_publication_max_messages_per_send;
_pub->current_messages_per_send = _pub->max_messages_per_send;
_pub->term_window_length = (int64_t)aeron_producer_window_length(
context->publication_window_length, params->term_length);
_pub->term_window_length = params->publication_window_length;
_pub->linger_timeout_ns = (int64_t)params->linger_timeout_ns;
_pub->untethered_window_limit_timeout_ns = (int64_t)params->untethered_window_limit_timeout_ns;
_pub->untethered_resting_timeout_ns = (int64_t)params->untethered_resting_timeout_ns;
Expand Down
55 changes: 44 additions & 11 deletions aeron-driver/src/main/c/aeron_publication_image.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ int aeron_publication_image_create(
aeron_publication_image_t **image,
aeron_receive_channel_endpoint_t *endpoint,
aeron_receive_destination_t *destination,
aeron_driver_context_t *context,
aeron_driver_conductor_t *conductor,
int64_t correlation_id,
int32_t session_id,
int32_t stream_id,
Expand All @@ -131,6 +131,8 @@ int aeron_publication_image_create(
bool treat_as_multicast,
aeron_system_counters_t *system_counters)
{
aeron_driver_context_t *context = conductor->context;

aeron_publication_image_t *_image = NULL;
const uint64_t log_length = aeron_logbuffer_compute_log_length(
(uint64_t)term_buffer_length, context->file_page_size);
Expand Down Expand Up @@ -216,16 +218,47 @@ int aeron_publication_image_create(
_image->log_file_name_length = (size_t)path_length;
_image->log_meta_data = (aeron_logbuffer_metadata_t *)(_image->mapped_raw_log.log_meta_data.addr);

_image->log_meta_data->initial_term_id = initial_term_id;
_image->log_meta_data->mtu_length = sender_mtu_length;
_image->log_meta_data->term_length = term_buffer_length;
_image->log_meta_data->page_size = (int32_t)context->file_page_size;
_image->log_meta_data->correlation_id = correlation_id;
_image->log_meta_data->is_connected = 0;
_image->log_meta_data->active_transport_count = 0;
_image->log_meta_data->end_of_stream_position = INT64_MAX;
aeron_logbuffer_fill_default_header(
_image->mapped_raw_log.log_meta_data.addr, session_id, stream_id, initial_term_id);
aeron_driver_uri_subscription_params_t params;
if (aeron_driver_uri_subscription_params(&endpoint->conductor_fields.udp_channel->uri, &params, conductor) < 0)
{
AERON_APPEND_ERR("%s", "");
goto error;
}

aeron_logbuffer_metadata_init(
_image->mapped_raw_log.log_meta_data.addr,
INT64_MAX,
0,
0,
correlation_id,
initial_term_id,
sender_mtu_length,
term_buffer_length,
(int32_t)context->file_page_size,
0,
(int32_t)params.initial_window_length,
(int32_t)endpoint->conductor_fields.udp_channel->socket_sndbuf_length,
(int32_t)context->os_buffer_lengths.default_so_sndbuf,
(int32_t)context->os_buffer_lengths.max_so_sndbuf,
(int32_t)endpoint->conductor_fields.udp_channel->socket_rcvbuf_length,
(int32_t)context->os_buffer_lengths.default_so_rcvbuf,
(int32_t)context->os_buffer_lengths.max_so_rcvbuf,
0,
session_id,
stream_id,
0,
0,
0,
0,
0,
(uint8_t)treat_as_multicast,
(uint8_t)params.is_response,
(uint8_t)params.is_rejoin,
(uint8_t)params.is_reliable,
(uint8_t)params.is_sparse,
(uint8_t)false,
(uint8_t)false,
(uint8_t)params.is_tether);

_image->endpoint = endpoint;
_image->conductor_fields.endpoint = endpoint;
Expand Down
2 changes: 1 addition & 1 deletion aeron-driver/src/main/c/aeron_publication_image.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ int aeron_publication_image_create(
aeron_publication_image_t **image,
aeron_receive_channel_endpoint_t *endpoint,
aeron_receive_destination_t *destination,
aeron_driver_context_t *context,
aeron_driver_conductor_t *conductor,
int64_t correlation_id,
int32_t session_id,
int32_t stream_id,
Expand Down
Loading
Loading