Skip to content

Commit

Permalink
[C] add max_retransmission_length to C driver
Browse files Browse the repository at this point in the history
  • Loading branch information
tmontgomery committed Nov 20, 2023
1 parent af560d6 commit 0208214
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 4 deletions.
52 changes: 49 additions & 3 deletions aeron-driver/src/main/c/aeron_flow_control.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,36 @@ int64_t aeron_max_flow_control_strategy_on_sm(
return snd_lmt > window_edge ? snd_lmt : window_edge;
}

size_t aeron_max_flow_control_strategy_max_retransmission_length(
void *state,
int64_t resend_position,
size_t resend_length,
int64_t term_buffer_length,
size_t mtu_length)
{
size_t initial_window_length = aeron_driver_context_get_rcv_initial_window_length(NULL);
size_t estimated_window_length = aeron_receiver_window_length(initial_window_length, term_buffer_length);
size_t max_retransmit_length =
AERON_MAX_FLOW_CONTROL_RETRANSMIT_RECEIVER_WINDOW_MULTIPLE * estimated_window_length;

return AERON_MIN(max_retransmit_length, resend_length);
}

size_t aeron_unicast_flow_control_strategy_max_retransmission_length(
void *state,
int64_t resend_position,
size_t resend_length,
int64_t term_buffer_length,
size_t mtu_length)
{
size_t initial_window_length = aeron_driver_context_get_rcv_initial_window_length(NULL);
size_t estimated_window_length = aeron_receiver_window_length(initial_window_length, term_buffer_length);
size_t max_retransmit_length =
AERON_UNICAST_FLOW_CONTROL_RETRANSMIT_RECEIVER_WINDOW_MULTIPLE * estimated_window_length;

return AERON_MIN(max_retransmit_length, resend_length);
}

int aeron_max_flow_control_strategy_fini(aeron_flow_control_strategy_t *strategy)
{
aeron_free(strategy->state);
Expand Down Expand Up @@ -141,6 +171,7 @@ int aeron_max_multicast_flow_control_strategy_supplier(
_strategy->on_setup = aeron_max_flow_control_strategy_on_setup;
_strategy->fini = aeron_max_flow_control_strategy_fini;
_strategy->has_required_receivers = aeron_flow_control_strategy_has_required_receivers_default;
_strategy->max_retransmission_length = aeron_max_flow_control_strategy_max_retransmission_length;

*strategy = _strategy;

Expand All @@ -158,9 +189,24 @@ int aeron_unicast_flow_control_strategy_supplier(
int32_t initial_term_id,
size_t term_length)
{
return aeron_max_multicast_flow_control_strategy_supplier(
strategy, context, counters_manager, channel,
stream_id, session_id, registration_id, initial_term_id, term_length);
aeron_flow_control_strategy_t *_strategy;

if (aeron_alloc((void **)&_strategy, sizeof(aeron_flow_control_strategy_t)) < 0)
{
return -1;
}

_strategy->state = NULL; // Unicast does not require any state.
_strategy->on_idle = aeron_max_flow_control_strategy_on_idle;
_strategy->on_status_message = aeron_max_flow_control_strategy_on_sm;
_strategy->on_setup = aeron_max_flow_control_strategy_on_setup;
_strategy->fini = aeron_max_flow_control_strategy_fini;
_strategy->has_required_receivers = aeron_flow_control_strategy_has_required_receivers_default;
_strategy->max_retransmission_length = aeron_unicast_flow_control_strategy_max_retransmission_length;

*strategy = _strategy;

return 0;
}

aeron_flow_control_strategy_supplier_func_table_entry_t aeron_flow_control_strategy_supplier_table[] =
Expand Down
12 changes: 12 additions & 0 deletions aeron-driver/src/main/c/aeron_flow_control.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@

#define AERON_MIN_FLOW_CONTROL_RECEIVERS_COUNTER_NAME ("fc-receivers")

#define AERON_MAX_FLOW_CONTROL_RETRANSMIT_RECEIVER_WINDOW_MULTIPLE (4)
#define AERON_UNICAST_FLOW_CONTROL_RETRANSMIT_RECEIVER_WINDOW_MULTIPLE (16)
#define AERON_MIN_FLOW_CONTROL_RETRANSMIT_RECEIVER_WINDOW_MULTIPLE (16)

typedef int64_t (*aeron_flow_control_strategy_on_idle_func_t)(
void *state,
int64_t now_ns,
Expand All @@ -53,6 +57,13 @@ typedef int64_t (*aeron_flow_control_strategy_on_setup_func_t)(
size_t position_bits_to_shift,
int64_t snd_pos);

typedef size_t (*aeron_flow_control_strategy_max_retransmission_length_func_t)(
void *state,
int64_t resend_position,
size_t resend_length,
int64_t term_buffer_length,
size_t mtu_length);

typedef int (*aeron_flow_control_strategy_fini_func_t)(aeron_flow_control_strategy_t *strategy);

typedef bool (*aeron_flow_control_strategy_has_required_receivers_func_t)(aeron_flow_control_strategy_t *strategy);
Expand All @@ -64,6 +75,7 @@ typedef struct aeron_flow_control_strategy_stct
aeron_flow_control_strategy_on_setup_func_t on_setup;
aeron_flow_control_strategy_fini_func_t fini;
aeron_flow_control_strategy_has_required_receivers_func_t has_required_receivers;
aeron_flow_control_strategy_max_retransmission_length_func_t max_retransmission_length;
void *state;
}
aeron_flow_control_strategy_t;
Expand Down
16 changes: 16 additions & 0 deletions aeron-driver/src/main/c/aeron_min_flow_control.c
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,21 @@ int64_t aeron_tagged_flow_control_strategy_on_setup(
return snd_lmt;
}

size_t aeron_min_flow_control_strategy_max_retransmission_length(
void *state,
int64_t resend_position,
size_t resend_length,
int64_t term_buffer_length,
size_t mtu_length)
{
size_t initial_window_length = aeron_driver_context_get_rcv_initial_window_length(NULL);
size_t estimated_window_length = aeron_receiver_window_length(initial_window_length, term_buffer_length);
size_t max_retransmit_length =
AERON_MIN_FLOW_CONTROL_RETRANSMIT_RECEIVER_WINDOW_MULTIPLE * estimated_window_length;

return AERON_MIN(max_retransmit_length, resend_length);
}

int aeron_min_flow_control_strategy_fini(aeron_flow_control_strategy_t *strategy)
{
aeron_min_flow_control_strategy_state_t *strategy_state =
Expand Down Expand Up @@ -435,6 +450,7 @@ int aeron_tagged_flow_control_strategy_supplier_init(
aeron_tagged_flow_control_strategy_on_setup : aeron_min_flow_control_strategy_on_setup;
_strategy->fini = aeron_min_flow_control_strategy_fini;
_strategy->has_required_receivers = aeron_min_flow_control_strategy_has_required_receivers;
_strategy->max_retransmission_length = aeron_min_flow_control_strategy_max_retransmission_length;

aeron_min_flow_control_strategy_state_t *state = (aeron_min_flow_control_strategy_state_t *)_strategy->state;

Expand Down
7 changes: 6 additions & 1 deletion aeron-driver/src/main/c/aeron_network_publication.c
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,12 @@ int aeron_network_publication_resend(void *clientd, int32_t term_id, int32_t ter
{
size_t index = aeron_logbuffer_index_by_position(resend_position, publication->position_bits_to_shift);

size_t remaining_bytes = length;
size_t remaining_bytes = publication->flow_control->max_retransmission_length(
publication->flow_control->state,
resend_position,
length,
publication->term_buffer_length,
publication->mtu_length);
int32_t bytes_sent = 0;
int32_t offset = term_offset;

Expand Down

0 comments on commit 0208214

Please sign in to comment.