Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_kubernetes_events: fix k8s events end of stream deadlock #9154

Merged
merged 2 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion plugins/in_kubernetes_events/kubernetes_events.c
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,7 @@ static int k8s_events_collect(struct flb_input_instance *ins,
}

if (check_and_init_stream(ctx) == FLB_FALSE) {
pthread_mutex_unlock(&ctx->lock);
FLB_INPUT_RETURN(0);
}

Expand All @@ -921,15 +922,19 @@ static int k8s_events_collect(struct flb_input_instance *ins,
}
/* NOTE: skipping any processing after streaming socket closes */

if (ctx->streaming_client->resp.status != 200 || ret == FLB_HTTP_ERROR) {
if (ctx->streaming_client->resp.status != 200 || ret == FLB_HTTP_ERROR || ret == FLB_HTTP_OK) {
if (ret == FLB_HTTP_ERROR) {
flb_plg_warn(ins, "kubernetes chunked stream error.");
}
else if (ret == FLB_HTTP_OK) {
flb_plg_info(ins, "kubernetes stream closed by api server. Reconnect will happen on next interval.");
}
else {
flb_plg_warn(ins, "events watch failure, http_status=%d payload=%s",
ctx->streaming_client->resp.status, ctx->streaming_client->resp.payload);
}

flb_plg_info(ins, "kubernetes stream disconnected, ret=%d", ret);
flb_http_client_destroy(ctx->streaming_client);
flb_upstream_conn_release(ctx->current_connection);
ctx->streaming_client = NULL;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"type":"MODIFIED","object":{"kind":"Event","apiVersion":"v1","metadata":{"name":"fluent-bit-78945dccd8-hvr55.17e75f85e7d9e678","namespace":"default","uid":"b7cb03e8-0e0b-4e02-971d-24807f563d43","resourceVersion":"177158","creationTimestamp":"2024-07-31T18:26:51Z","managedFields":[{"manager":"kubelet","operation":"Update","apiVersion":"v1","time":"2024-07-31T18:47:15Z","fieldsType":"FieldsV1","fieldsV1":{"f:count":{},"f:firstTimestamp":{},"f:involvedObject":{},"f:lastTimestamp":{},"f:message":{},"f:reason":{},"f:reportingComponent":{},"f:reportingInstance":{},"f:source":{"f:component":{},"f:host":{}},"f:type":{}}}]},"involvedObject":{"kind":"Pod","namespace":"default","name":"fluent-bit-78945dccd8-hvr55","uid":"d5cd8257-e28a-4e64-8b29-6358309d7196","apiVersion":"v1","resourceVersion":"177159"},"reason":"FailedMount","message":"MountVolume.SetUp failed for volume \"config-volume\" : configmap \"fluent-bit-config\" not found","source":{"component":"kubelet","host":"minikube"},"firstTimestamp":"2024-07-31T18:26:51Z","lastTimestamp":"2024-07-31T18:47:15Z","count":16,"type":"Warning","eventTime":null,"reportingComponent":"kubelet","reportingInstance":"minikube"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[1722451635.000000,{"kind":"Event","apiVersion":"v1","metadata":{"name":"fluent-bit-78945dccd8-hvr55.17e75f85e7d9e678","namespace":"default","uid":"b7cb03e8-0e0b-4e02-971d-24807f563d43","resourceVersion":"177158","creationTimestamp":"2024-07-31T18:26:51Z","managedFields":[{"manager":"kubelet","operation":"Update","apiVersion":"v1","time":"2024-07-31T18:47:15Z","fieldsType":"FieldsV1","fieldsV1":{"f:count":{},"f:firstTimestamp":{},"f:involvedObject":{},"f:lastTimestamp":{},"f:message":{},"f:reason":{},"f:reportingComponent":{},"f:reportingInstance":{},"f:source":{"f:component":{},"f:host":{}},"f:type":{}}}]},"involvedObject":{"kind":"Pod","namespace":"default","name":"fluent-bit-78945dccd8-hvr55","uid":"d5cd8257-e28a-4e64-8b29-6358309d7196","apiVersion":"v1","resourceVersion":"177159"},"reason":"FailedMount","message":"MountVolume.SetUp failed for volume \"config-volume\" : configmap \"fluent-bit-config\" not found","source":{"component":"kubelet","host":"minikube"},"firstTimestamp":"2024-07-31T18:26:51Z","lastTimestamp":"2024-07-31T18:47:15Z","count":16,"type":"Warning","eventTime":null,"reportingComponent":"kubelet","reportingInstance":"minikube"}]
140 changes: 114 additions & 26 deletions tests/runtime/in_kubernetes_events.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ struct test_k8s_server_ctx {
int mq_id; /* Message Queue ID */
struct mk_event_loop *evl;
char json_input_file[1024];
char json_input_file_to_stream[1024];
int chunk_size; /* send messages in http chunks of this size, if 0 send all data */
};


Expand Down Expand Up @@ -112,61 +114,97 @@ static flb_sds_t read_file(const char *filename)
/* Callback to check expected results */
static int cb_check_result_json(void *record, size_t size, void *data)
{
char *p;
char *p = NULL;
flb_sds_t expected;
char *result;
int num = get_output_num();
const char *filename;
char full_filename[1024];
flb_sds_t filename = NULL;

set_output_num(num+1);

filename = (const char *) data;
result = (char *) record;
/* Note this is probably confusing, but we expected 1 record from the event list, and 1 record from the stream
* these should be output num 0 and num 1 respectively, each json file should have a corresponding .out
*/
struct test_k8s_server_ctx *k8s_server = data;
if (num == 0) {
filename = flb_sds_create_len(k8s_server->json_input_file, strlen(k8s_server->json_input_file)-5); //remove .json
} else {
filename = flb_sds_create_len(k8s_server->json_input_file_to_stream, strlen(k8s_server->json_input_file_to_stream)-5);
}
filename = flb_sds_cat(filename, ".out", 4);

sprintf(full_filename, "%s/%s.out", IN_KUBERNETES_EVENTS_DATA_PATH, filename);
expected = read_file(full_filename);
result = (char *) record;
expected = read_file(filename);

p = strstr(result, expected);
TEST_CHECK(p != NULL);

if (p == NULL) {
flb_error("Expected to find: '%s' in result '%s'",
flb_error("Expected to find: '%s' \nin result '%s'",
expected, result);
}

flb_free(record);
if (expected) {
flb_sds_destroy(expected);
}
if (filename) {
flb_sds_destroy(filename);
}
return 0;
}

static void cb_root(mk_request_t *request, void *data)
{
flb_sds_t payload;
struct test_k8s_server_ctx *server = data;
payload = read_file(server->json_input_file);

if (request->query_string.data && strstr(request->query_string.data, "watch=1") != NULL) {
// NOTE/TODO: stream via watch not currently supported, this should become 200 status
// and chunked response when we do support it
mk_http_status(request, 500);
mk_http_done(request);
mk_http_status(request, 200);
mk_http_header(request, "Content-Type", 12, JSON_CONTENT_TYPE, 16);

if(strlen(server->json_input_file_to_stream) > 0) {
payload = read_file(server->json_input_file_to_stream);
char* start = payload;
int maxSize = server->chunk_size;
int totalSent = 0;
while(totalSent < strlen(payload)) {
if(strlen(start) < server->chunk_size) {
maxSize = strlen(start);
}
mk_http_send(request, start, maxSize, NULL);

start += maxSize;
totalSent += maxSize;

flb_time_msleep(300);
}

// ensure we send an end of json delimeter so kubernetes_events plugin knows there's a json message
if(payload[strlen(payload)-1] != '\n') {
mk_http_send(request, "\n", 1, NULL);
}
flb_sds_destroy(payload);
}

mk_http_done(request); /* this will end the connection and send the end-chunk */
}
else {
payload = read_file(server->json_input_file);

/* we don't use chunk_size in the non-streamed requests, but we could */
mk_http_status(request, 200);
mk_http_header(request, "Content-Type", 12, JSON_CONTENT_TYPE, 16);
mk_http_send(request, payload, strlen(payload), NULL);
mk_http_done(request);
}

if (payload) {
flb_sds_destroy(payload);
}

}

struct test_k8s_server_ctx *initialize_mock_k8s_api(const char* filename)
struct test_k8s_server_ctx *initialize_mock_k8s_api(const char* filename,
const char* stream_filename, int chunk_size)
{
int vid;
char tmp[32];
Expand All @@ -181,6 +219,15 @@ struct test_k8s_server_ctx *initialize_mock_k8s_api(const char* filename)
sprintf(server->json_input_file, "%s/%s.json",
IN_KUBERNETES_EVENTS_DATA_PATH, filename);

/* setup info for streamed events, if any */
server->chunk_size = chunk_size;
if (strlen(stream_filename) > 0) {
sprintf(server->json_input_file_to_stream, "%s/%s.json",
IN_KUBERNETES_EVENTS_DATA_PATH, stream_filename);
} else {
memset(server->json_input_file_to_stream, 0, sizeof(server->json_input_file_to_stream));
}

/* Create HTTP server context */
server->ctx = mk_create();
if (!server->ctx) {
Expand Down Expand Up @@ -284,19 +331,19 @@ void flb_test_events_v1_with_lastTimestamp()

clear_output_num();

struct test_k8s_server_ctx* k8s_server = initialize_mock_k8s_api(
filename, "", 0
);

cb_data.cb = cb_check_result_json;
cb_data.data = (void *)filename;
cb_data.data = (void *)k8s_server;

ctx = test_ctx_create(&cb_data);
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create failed");
exit(EXIT_FAILURE);
}

struct test_k8s_server_ctx* k8s_server = initialize_mock_k8s_api(
filename
);

ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);

Expand All @@ -321,19 +368,19 @@ void flb_test_events_v1_with_creationTimestamp()

clear_output_num();

struct test_k8s_server_ctx* k8s_server = initialize_mock_k8s_api(
filename, "", 0
);

cb_data.cb = cb_check_result_json;
cb_data.data = (void *)filename;
cb_data.data = (void *)k8s_server;

ctx = test_ctx_create(&cb_data);
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create failed");
exit(EXIT_FAILURE);
}

struct test_k8s_server_ctx* k8s_server = initialize_mock_k8s_api(
filename
);

ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);

Expand All @@ -348,9 +395,50 @@ void flb_test_events_v1_with_creationTimestamp()
test_ctx_destroy(ctx);
}

void flb_test_events_with_chunkedrecv()
{
struct flb_lib_out_cb cb_data;
struct test_ctx *ctx;

int ret;
int num;
const char *filename = "eventlist_v1_with_lastTimestamp";
const char *stream_filename = "watch_v1_with_lastTimestamp";

clear_output_num();

struct test_k8s_server_ctx* k8s_server = initialize_mock_k8s_api(
filename, stream_filename, 1000
);

cb_data.cb = cb_check_result_json;
cb_data.data = (void *)k8s_server;

ctx = test_ctx_create(&cb_data);
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create failed");
exit(EXIT_FAILURE);
}

ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);

// waiting to flush
flb_time_msleep(5000);

num = get_output_num();
if (!TEST_CHECK(num >= 2)) {
TEST_MSG("2 output records are expected found %d", num);
}

mock_k8s_api_destroy(k8s_server);
test_ctx_destroy(ctx);
}

TEST_LIST = {
{"events_v1_with_lastTimestamp", flb_test_events_v1_with_lastTimestamp},
{"events_v1_with_creationTimestamp", flb_test_events_v1_with_creationTimestamp},
//{"events_v1_with_chunkedrecv", flb_test_events_with_chunkedrecv},
{NULL, NULL}
};

Loading