Skip to content

Commit

Permalink
[C] Add more fields to the log buffer metadata section.
Browse files Browse the repository at this point in the history
  • Loading branch information
vyazelenko committed Dec 20, 2024
1 parent 281d027 commit 306dfe8
Showing 1 changed file with 163 additions and 6 deletions.
169 changes: 163 additions & 6 deletions aeron-client/src/main/c/concurrent/aeron_logbuffer_descriptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,30 +33,189 @@

#define AERON_MAX_UDP_PAYLOAD_LENGTH (65504)

#ifdef _MSC_VER
#define _Static_assert static_assert
#endif

#pragma pack(push)
#pragma pack(4)
typedef struct aeron_logbuffer_metadata_stct
{
volatile int64_t term_tail_counters[AERON_LOGBUFFER_PARTITION_COUNT];
volatile int32_t active_term_count;

uint8_t pad1[(2 * AERON_CACHE_LINE_LENGTH) - ((AERON_LOGBUFFER_PARTITION_COUNT * sizeof(int64_t)) + sizeof(int32_t))];
volatile int64_t end_of_stream_position;
volatile int32_t is_connected;
volatile int32_t active_transport_count;

uint8_t pad2[(2 * AERON_CACHE_LINE_LENGTH) - (sizeof(int64_t) + (2 * sizeof(int32_t)))];
int64_t correlation_id;
int32_t initial_term_id;
int32_t default_frame_header_length;
int32_t mtu_length;
int32_t term_length;
int32_t page_size;
uint8_t pad3[(AERON_CACHE_LINE_LENGTH) - (7 * sizeof(int32_t))];
int32_t publication_window_length;
int32_t receiver_window_length;
int32_t socket_sndbuf_length;
int32_t socket_rcvbuf_length;
int32_t max_resend;
int64_t entity_tag;
int64_t response_correlation_id;
uint8_t default_header[AERON_LOGBUFFER_DEFAULT_FRAME_HEADER_MAX_LENGTH];
int64_t linger_timeout_ns;
int64_t untethered_window_limit_timeout_ns;
int64_t untethered_resting_timeout_ns;
uint8_t group;
uint8_t is_response;
uint8_t rejoin;
uint8_t reliable;
uint8_t sparse;
uint8_t signal_eos;
uint8_t spies_simulate_connection;
uint8_t tether;
}
aeron_logbuffer_metadata_t;
#pragma pack(pop)

_Static_assert(
offsetof(aeron_logbuffer_metadata_t, term_tail_counters) == 0,
"offsetof(aeron_logbuffer_metadata_t, term_tail_counters) is wrong");
_Static_assert(
offsetof(aeron_logbuffer_metadata_t, active_term_count) == 24,
"offsetof(aeron_logbuffer_metadata_t, active_term_count) is wrong");

_Static_assert(
offsetof(aeron_logbuffer_metadata_t, end_of_stream_position) == 128,
"offsetof(aeron_logbuffer_metadata_t, end_of_stream_position) is wrong");

_Static_assert(
offsetof(aeron_logbuffer_metadata_t, is_connected) == 136,
"offsetof(aeron_logbuffer_metadata_t, is_connected) is wrong");

_Static_assert(
offsetof(aeron_logbuffer_metadata_t, active_transport_count) == 140,
"offsetof(aeron_logbuffer_metadata_t, active_transport_count) is wrong");

_Static_assert(
offsetof(aeron_logbuffer_metadata_t, correlation_id) == 256,
"offsetof(aeron_logbuffer_metadata_t, correlation_id) is wrong");

_Static_assert(
offsetof(aeron_logbuffer_metadata_t, initial_term_id) == 264,
"offsetof(aeron_logbuffer_metadata_t, initial_term_id) is wrong");

_Static_assert(
offsetof(aeron_logbuffer_metadata_t, default_frame_header_length) == 268,
"offsetof(aeron_logbuffer_metadata_t, default_frame_header_length) is wrong");

_Static_assert(
offsetof(aeron_logbuffer_metadata_t, mtu_length) == 272,
"offsetof(aeron_logbuffer_metadata_t, mtu_length) is wrong");

_Static_assert(
offsetof(aeron_logbuffer_metadata_t, term_length) == 276,
"offsetof(aeron_logbuffer_metadata_t, term_length) is wrong");

_Static_assert(
offsetof(aeron_logbuffer_metadata_t, page_size) == 280,
"offsetof(aeron_logbuffer_metadata_t, page_size) is wrong");

_Static_assert(
offsetof(aeron_logbuffer_metadata_t, publication_window_length) == 284,
"offsetof(aeron_logbuffer_metadata_t, publication_window_length) is wrong");

_Static_assert(
offsetof(aeron_logbuffer_metadata_t, receiver_window_length) == 288,
"offsetof(aeron_logbuffer_metadata_t, receiver_window_length) is wrong");

_Static_assert(
offsetof(aeron_logbuffer_metadata_t, socket_sndbuf_length) == 292,
"offsetof(aeron_logbuffer_metadata_t, socket_sndbuf_length) is wrong");

_Static_assert(
offsetof(aeron_logbuffer_metadata_t, socket_rcvbuf_length) == 296,
"offsetof(aeron_logbuffer_metadata_t, socket_rcvbuf_length) is wrong");

_Static_assert(
offsetof(aeron_logbuffer_metadata_t, max_resend) == 300,
"offsetof(aeron_logbuffer_metadata_t, max_resend) is wrong");

_Static_assert(
offsetof(aeron_logbuffer_metadata_t, entity_tag) == 304,
"offsetof(aeron_logbuffer_metadata_t, entity_tag) is wrong");

_Static_assert(
offsetof(aeron_logbuffer_metadata_t, entity_tag) % sizeof(int64_t) == 0,
"offsetof(aeron_logbuffer_metadata_t, entity_tag) not aligned");

_Static_assert(
offsetof(aeron_logbuffer_metadata_t, response_correlation_id) == 312,
"offsetof(aeron_logbuffer_metadata_t, response_correlation_id) is wrong");

_Static_assert(
offsetof(aeron_logbuffer_metadata_t, default_header) == 320,
"offsetof(aeron_logbuffer_metadata_t, default_header) is wrong");

_Static_assert(
AERON_LOGBUFFER_DEFAULT_FRAME_HEADER_MAX_LENGTH >= AERON_DATA_HEADER_LENGTH,
"AERON_LOGBUFFER_DEFAULT_FRAME_HEADER_MAX_LENGTH < AERON_DATA_HEADER_LENGTH");

_Static_assert(
offsetof(aeron_logbuffer_metadata_t, linger_timeout_ns) == 448,
"offsetof(aeron_logbuffer_metadata_t, linger_timeout_ns) is wrong");

_Static_assert(
offsetof(aeron_logbuffer_metadata_t, untethered_window_limit_timeout_ns) == 456,
"offsetof(aeron_logbuffer_metadata_t, untethered_window_limit_timeout_ns) is wrong");

_Static_assert(
offsetof(aeron_logbuffer_metadata_t, untethered_resting_timeout_ns) == 464,
"offsetof(aeron_logbuffer_metadata_t, untethered_resting_timeout_ns) is wrong");

_Static_assert(
offsetof(aeron_logbuffer_metadata_t, group) == 472,
"offsetof(aeron_logbuffer_metadata_t, group) is wrong");

_Static_assert(
offsetof(aeron_logbuffer_metadata_t, is_response) == 473,
"offsetof(aeron_logbuffer_metadata_t, is_response) is wrong");

_Static_assert(
offsetof(aeron_logbuffer_metadata_t, rejoin) == 474,
"offsetof(aeron_logbuffer_metadata_t, rejoin) is wrong");

_Static_assert(
offsetof(aeron_logbuffer_metadata_t, reliable) == 475,
"offsetof(aeron_logbuffer_metadata_t, reliable) is wrong");

_Static_assert(
offsetof(aeron_logbuffer_metadata_t, sparse) == 476,
"offsetof(aeron_logbuffer_metadata_t, sparse) is wrong");

_Static_assert(
offsetof(aeron_logbuffer_metadata_t, signal_eos) == 477,
"offsetof(aeron_logbuffer_metadata_t, signal_eos) is wrong");

_Static_assert(
offsetof(aeron_logbuffer_metadata_t, spies_simulate_connection) == 478,
"offsetof(aeron_logbuffer_metadata_t, spies_simulate_connection) is wrong");

_Static_assert(
offsetof(aeron_logbuffer_metadata_t, tether) == 479,
"offsetof(aeron_logbuffer_metadata_t, tether) is wrong");

_Static_assert(
sizeof(aeron_logbuffer_metadata_t) == 480,
"sizeof(aeron_logbuffer_metadata_t) is wrong");

#define AERON_LOGBUFFER_META_DATA_LENGTH \
(AERON_ALIGN((sizeof(aeron_logbuffer_metadata_t) + AERON_LOGBUFFER_DEFAULT_FRAME_HEADER_MAX_LENGTH), AERON_PAGE_MIN_SIZE))
(AERON_ALIGN(sizeof(aeron_logbuffer_metadata_t), AERON_PAGE_MIN_SIZE))

_Static_assert(
AERON_LOGBUFFER_META_DATA_LENGTH == AERON_PAGE_MIN_SIZE,
"AERON_LOGBUFFER_META_DATA_LENGTH != AERON_PAGE_MIN_SIZE");

#define AERON_LOGBUFFER_FRAME_ALIGNMENT (32)

Expand Down Expand Up @@ -187,8 +346,7 @@ inline void aeron_logbuffer_fill_default_header(
uint8_t *log_meta_data_buffer, int32_t session_id, int32_t stream_id, int32_t initial_term_id)
{
aeron_logbuffer_metadata_t *log_meta_data = (aeron_logbuffer_metadata_t *)log_meta_data_buffer;
aeron_data_header_t *data_header =
(aeron_data_header_t *)(log_meta_data_buffer + sizeof(aeron_logbuffer_metadata_t));
aeron_data_header_t *data_header = (aeron_data_header_t *)(log_meta_data->default_header);

log_meta_data->default_frame_header_length = AERON_DATA_HEADER_LENGTH;
data_header->frame_header.frame_length = 0;
Expand All @@ -205,9 +363,8 @@ inline void aeron_logbuffer_fill_default_header(
inline void aeron_logbuffer_apply_default_header(uint8_t *log_meta_data_buffer, uint8_t *buffer)
{
aeron_logbuffer_metadata_t *log_meta_data = (aeron_logbuffer_metadata_t *)log_meta_data_buffer;
uint8_t *default_header = log_meta_data_buffer + sizeof(aeron_logbuffer_metadata_t);

memcpy(buffer, default_header, (size_t)log_meta_data->default_frame_header_length);
memcpy(buffer, log_meta_data->default_header, (size_t)log_meta_data->default_frame_header_length);
}

inline size_t aeron_logbuffer_compute_fragmented_length(size_t length, size_t max_payload_length)
Expand Down

0 comments on commit 306dfe8

Please sign in to comment.