diff --git a/aeron-driver/src/main/c/aeron_flow_control.c b/aeron-driver/src/main/c/aeron_flow_control.c index f6dc5826e8..7cabe0fa66 100644 --- a/aeron-driver/src/main/c/aeron_flow_control.c +++ b/aeron-driver/src/main/c/aeron_flow_control.c @@ -110,6 +110,36 @@ int64_t aeron_max_flow_control_strategy_on_sm( return snd_lmt > window_edge ? snd_lmt : window_edge; } +size_t aeron_max_flow_control_strategy_max_retransmission_length( + void *state, + int64_t resend_position, + size_t resend_length, + int64_t term_buffer_length, + size_t mtu_length) +{ + size_t initial_window_length = aeron_driver_context_get_rcv_initial_window_length(NULL); + size_t estimated_window_length = aeron_receiver_window_length(initial_window_length, term_buffer_length); + size_t max_retransmit_length = + AERON_MAX_FLOW_CONTROL_RETRANSMIT_RECEIVER_WINDOW_MULTIPLE * estimated_window_length; + + return AERON_MIN(max_retransmit_length, resend_length); +} + +size_t aeron_unicast_flow_control_strategy_max_retransmission_length( + void *state, + int64_t resend_position, + size_t resend_length, + int64_t term_buffer_length, + size_t mtu_length) +{ + size_t initial_window_length = aeron_driver_context_get_rcv_initial_window_length(NULL); + size_t estimated_window_length = aeron_receiver_window_length(initial_window_length, term_buffer_length); + size_t max_retransmit_length = + AERON_UNICAST_FLOW_CONTROL_RETRANSMIT_RECEIVER_WINDOW_MULTIPLE * estimated_window_length; + + return AERON_MIN(max_retransmit_length, resend_length); +} + int aeron_max_flow_control_strategy_fini(aeron_flow_control_strategy_t *strategy) { aeron_free(strategy->state); @@ -141,6 +171,7 @@ int aeron_max_multicast_flow_control_strategy_supplier( _strategy->on_setup = aeron_max_flow_control_strategy_on_setup; _strategy->fini = aeron_max_flow_control_strategy_fini; _strategy->has_required_receivers = aeron_flow_control_strategy_has_required_receivers_default; + _strategy->max_retransmission_length = aeron_max_flow_control_strategy_max_retransmission_length; *strategy = _strategy; @@ -158,9 +189,24 @@ int aeron_unicast_flow_control_strategy_supplier( int32_t initial_term_id, size_t term_length) { - return aeron_max_multicast_flow_control_strategy_supplier( - strategy, context, counters_manager, channel, - stream_id, session_id, registration_id, initial_term_id, term_length); + aeron_flow_control_strategy_t *_strategy; + + if (aeron_alloc((void **)&_strategy, sizeof(aeron_flow_control_strategy_t)) < 0) + { + return -1; + } + + _strategy->state = NULL; // Unicast does not require any state. + _strategy->on_idle = aeron_max_flow_control_strategy_on_idle; + _strategy->on_status_message = aeron_max_flow_control_strategy_on_sm; + _strategy->on_setup = aeron_max_flow_control_strategy_on_setup; + _strategy->fini = aeron_max_flow_control_strategy_fini; + _strategy->has_required_receivers = aeron_flow_control_strategy_has_required_receivers_default; + _strategy->max_retransmission_length = aeron_unicast_flow_control_strategy_max_retransmission_length; + + *strategy = _strategy; + + return 0; } aeron_flow_control_strategy_supplier_func_table_entry_t aeron_flow_control_strategy_supplier_table[] = diff --git a/aeron-driver/src/main/c/aeron_flow_control.h b/aeron-driver/src/main/c/aeron_flow_control.h index 11da053d6d..dfdd6db597 100644 --- a/aeron-driver/src/main/c/aeron_flow_control.h +++ b/aeron-driver/src/main/c/aeron_flow_control.h @@ -27,6 +27,10 @@ #define AERON_MIN_FLOW_CONTROL_RECEIVERS_COUNTER_NAME ("fc-receivers") +#define AERON_MAX_FLOW_CONTROL_RETRANSMIT_RECEIVER_WINDOW_MULTIPLE (4) +#define AERON_UNICAST_FLOW_CONTROL_RETRANSMIT_RECEIVER_WINDOW_MULTIPLE (16) +#define AERON_MIN_FLOW_CONTROL_RETRANSMIT_RECEIVER_WINDOW_MULTIPLE (16) + typedef int64_t (*aeron_flow_control_strategy_on_idle_func_t)( void *state, int64_t now_ns, @@ -53,6 +57,13 @@ typedef int64_t (*aeron_flow_control_strategy_on_setup_func_t)( size_t position_bits_to_shift, int64_t snd_pos); +typedef size_t (*aeron_flow_control_strategy_max_retransmission_length_func_t)( + void *state, + int64_t resend_position, + size_t resend_length, + int64_t term_buffer_length, + size_t mtu_length); + typedef int (*aeron_flow_control_strategy_fini_func_t)(aeron_flow_control_strategy_t *strategy); typedef bool (*aeron_flow_control_strategy_has_required_receivers_func_t)(aeron_flow_control_strategy_t *strategy); @@ -64,6 +75,7 @@ typedef struct aeron_flow_control_strategy_stct aeron_flow_control_strategy_on_setup_func_t on_setup; aeron_flow_control_strategy_fini_func_t fini; aeron_flow_control_strategy_has_required_receivers_func_t has_required_receivers; + aeron_flow_control_strategy_max_retransmission_length_func_t max_retransmission_length; void *state; } aeron_flow_control_strategy_t; diff --git a/aeron-driver/src/main/c/aeron_min_flow_control.c b/aeron-driver/src/main/c/aeron_min_flow_control.c index 61ff65e8e7..b224d6dffc 100644 --- a/aeron-driver/src/main/c/aeron_min_flow_control.c +++ b/aeron-driver/src/main/c/aeron_min_flow_control.c @@ -341,6 +341,21 @@ int64_t aeron_tagged_flow_control_strategy_on_setup( return snd_lmt; } +size_t aeron_min_flow_control_strategy_max_retransmission_length( + void *state, + int64_t resend_position, + size_t resend_length, + int64_t term_buffer_length, + size_t mtu_length) +{ + size_t initial_window_length = aeron_driver_context_get_rcv_initial_window_length(NULL); + size_t estimated_window_length = aeron_receiver_window_length(initial_window_length, term_buffer_length); + size_t max_retransmit_length = + AERON_MIN_FLOW_CONTROL_RETRANSMIT_RECEIVER_WINDOW_MULTIPLE * estimated_window_length; + + return AERON_MIN(max_retransmit_length, resend_length); +} + int aeron_min_flow_control_strategy_fini(aeron_flow_control_strategy_t *strategy) { aeron_min_flow_control_strategy_state_t *strategy_state = @@ -435,6 +450,7 @@ int aeron_tagged_flow_control_strategy_supplier_init( aeron_tagged_flow_control_strategy_on_setup : aeron_min_flow_control_strategy_on_setup; _strategy->fini = aeron_min_flow_control_strategy_fini; _strategy->has_required_receivers = aeron_min_flow_control_strategy_has_required_receivers; + _strategy->max_retransmission_length = aeron_min_flow_control_strategy_max_retransmission_length; aeron_min_flow_control_strategy_state_t *state = (aeron_min_flow_control_strategy_state_t *)_strategy->state; diff --git a/aeron-driver/src/main/c/aeron_network_publication.c b/aeron-driver/src/main/c/aeron_network_publication.c index dcb6c6d337..4827a5e87e 100644 --- a/aeron-driver/src/main/c/aeron_network_publication.c +++ b/aeron-driver/src/main/c/aeron_network_publication.c @@ -547,7 +547,12 @@ int aeron_network_publication_resend(void *clientd, int32_t term_id, int32_t ter { size_t index = aeron_logbuffer_index_by_position(resend_position, publication->position_bits_to_shift); - size_t remaining_bytes = length; + size_t remaining_bytes = publication->flow_control->max_retransmission_length( + publication->flow_control->state, + resend_position, + length, + publication->term_buffer_length, + publication->mtu_length); int32_t bytes_sent = 0; int32_t offset = term_offset;