diff --git a/plugins/in_tail/tail.c b/plugins/in_tail/tail.c index 34a0fec3dbd..37b1f4f6c68 100644 --- a/plugins/in_tail/tail.c +++ b/plugins/in_tail/tail.c @@ -372,6 +372,15 @@ static int in_tail_init(struct flb_input_instance *in, /* Scan path */ flb_tail_scan(ctx->path_list, ctx); +#ifdef FLB_HAVE_SQLDB + /* Delete stale files that are not monitored from the database */ + ret = flb_tail_db_stale_file_delete(in, config, ctx); + if (ret == -1) { + flb_tail_config_destroy(ctx); + return -1; + } +#endif + /* * After the first scan (on start time), all new files discovered needs to be * read from head, so we switch the 'read_from_head' flag to true so any diff --git a/plugins/in_tail/tail_db.c b/plugins/in_tail/tail_db.c index 664963b6dba..fa17e416241 100644 --- a/plugins/in_tail/tail_db.c +++ b/plugins/in_tail/tail_db.c @@ -168,6 +168,42 @@ static int db_file_insert(struct flb_tail_file *file, struct flb_tail_config *ct return flb_sqldb_last_id(ctx->db); } +static int stmt_add_param_concat(struct flb_tail_config *ctx, + flb_sds_t *stmt_sql, uint64_t count) +{ + uint64_t i; + flb_sds_t sds_tmp; + + sds_tmp = flb_sds_cat(*stmt_sql, SQL_STMT_START_PARAM, + SQL_STMT_START_PARAM_LEN); + if (sds_tmp == NULL) { + flb_plg_debug(ctx->ins, "error concatenating stmt_sql: param start"); + return -1; + } + *stmt_sql = sds_tmp; + + for (i = 1; i < count; i++) { + sds_tmp = flb_sds_cat(*stmt_sql, SQL_STMT_ADD_PARAM, + SQL_STMT_ADD_PARAM_LEN); + if (sds_tmp == NULL) { + flb_plg_debug(ctx->ins, "error concatenating stmt_sql: add param"); + return -1; + } + + *stmt_sql = sds_tmp; + } + + sds_tmp = flb_sds_cat(*stmt_sql, SQL_STMT_PARAM_END, + SQL_STMT_PARAM_END_LEN); + if (sds_tmp == NULL) { + flb_plg_debug(ctx->ins, "error concatenating stmt_sql: param end"); + return -1; + } + *stmt_sql = sds_tmp; + + return 0; +} + int flb_tail_db_file_set(struct flb_tail_file *file, struct flb_tail_config *ctx) { @@ -275,3 +311,121 @@ int flb_tail_db_file_delete(struct flb_tail_file *file, flb_plg_debug(ctx->ins, "db: file deleted from database: %s", file->name); return 0; } + +/* + * Delete stale file from database + */ +int flb_tail_db_stale_file_delete(struct flb_input_instance *ins, + struct flb_config *config, + struct flb_tail_config *ctx) +{ + int ret = -1; + size_t sql_size; + uint64_t i; + uint64_t file_count = ctx->files_static_count; + flb_sds_t stale_delete_sql; + flb_sds_t sds_tmp; + sqlite3_stmt *stmt_delete_inodes = NULL; + struct mk_list *tmp; + struct mk_list *head; + struct flb_tail_file *file; + + if (!ctx->db) { + return 0; + } + + /* Create a stmt sql buffer */ + sql_size = SQL_DELETE_STALE_FILE_START_LEN; + sql_size += SQL_DELETE_STALE_FILE_WHERE_LEN; + sql_size += SQL_STMT_START_PARAM_LEN; + sql_size += SQL_STMT_PARAM_END_LEN; + sql_size += SQL_STMT_END_LEN; + if (file_count > 0) { + sql_size += (SQL_STMT_ADD_PARAM_LEN * file_count); + } + + stale_delete_sql = flb_sds_create_size(sql_size + 1); + if (!stale_delete_sql) { + flb_plg_error(ctx->ins, "cannot allocate buffer for stale_delete_sql:" + " size: %zu", sql_size); + return -1; + } + + /* Create a stmt sql */ + sds_tmp = flb_sds_cat(stale_delete_sql, SQL_DELETE_STALE_FILE_START, + SQL_DELETE_STALE_FILE_START_LEN); + if (sds_tmp == NULL) { + flb_plg_error(ctx->ins, + "error concatenating stale_delete_sql: start"); + flb_sds_destroy(stale_delete_sql); + return -1; + } + stale_delete_sql = sds_tmp; + + if (file_count > 0) { + sds_tmp = flb_sds_cat(stale_delete_sql, SQL_DELETE_STALE_FILE_WHERE, + SQL_DELETE_STALE_FILE_WHERE_LEN); + if (sds_tmp == NULL) { + flb_plg_error(ctx->ins, + "error concatenating stale_delete_sql: where"); + flb_sds_destroy(stale_delete_sql); + return -1; + } + stale_delete_sql = sds_tmp; + + ret = stmt_add_param_concat(ctx, &stale_delete_sql, file_count); + if (ret == -1) { + flb_plg_error(ctx->ins, + "error concatenating stale_delete_sql: param"); + flb_sds_destroy(stale_delete_sql); + return -1; + } + } + + sds_tmp = flb_sds_cat(stale_delete_sql, SQL_STMT_END, SQL_STMT_END_LEN); + if (sds_tmp == NULL) { + flb_plg_error(ctx->ins, + "error concatenating stale_delete_sql: end"); + flb_sds_destroy(stale_delete_sql); + return -1; + } + stale_delete_sql = sds_tmp; + + /* Prepare stmt */ + ret = sqlite3_prepare_v2(ctx->db->handler, stale_delete_sql, -1, + &stmt_delete_inodes, 0); + if (ret != SQLITE_OK) { + flb_plg_error(ctx->ins, "error preparing database SQL statement:" + " stmt_delete_inodes sql:%s, ret=%d", stale_delete_sql, + ret); + flb_sds_destroy(stale_delete_sql); + return -1; + } + + /* Bind parameters */ + i = 1; + mk_list_foreach_safe(head, tmp, &ctx->files_static) { + file = mk_list_entry(head, struct flb_tail_file, _head); + sqlite3_bind_int64(stmt_delete_inodes, i, file->inode); + i++; + } + + /* Run the delete inodes */ + ret = sqlite3_step(stmt_delete_inodes); + if (ret != SQLITE_DONE) { + sqlite3_finalize(stmt_delete_inodes); + flb_sds_destroy(stale_delete_sql); + flb_plg_error(ctx->ins, "cannot execute delete stale inodes: ret=%d", + ret); + return -1; + } + + ret = sqlite3_changes(ctx->db->handler); + flb_plg_info(ctx->ins, "db: delete unmonitored stale inodes from the" + " database: count=%d", ret); + + sqlite3_finalize(stmt_delete_inodes); + flb_sds_destroy(stale_delete_sql); + + return 0; +} diff --git a/plugins/in_tail/tail_db.h b/plugins/in_tail/tail_db.h index 7b5355d229c..b1fde721d29 100644 --- a/plugins/in_tail/tail_db.h +++ b/plugins/in_tail/tail_db.h @@ -40,4 +40,7 @@ int flb_tail_db_file_rotate(const char *new_name, struct flb_tail_config *ctx); int flb_tail_db_file_delete(struct flb_tail_file *file, struct flb_tail_config *ctx); +int flb_tail_db_stale_file_delete(struct flb_input_instance *ins, + struct flb_config *config, + struct flb_tail_config *ctx); #endif diff --git a/plugins/in_tail/tail_sql.h b/plugins/in_tail/tail_sql.h index 855933a0149..bf724f318cd 100644 --- a/plugins/in_tail/tail_sql.h +++ b/plugins/in_tail/tail_sql.h @@ -53,6 +53,28 @@ #define SQL_DELETE_FILE \ "DELETE FROM in_tail_files WHERE id=@id;" +#define SQL_STMT_START_PARAM "(?" +#define SQL_STMT_START_PARAM_LEN (sizeof(SQL_STMT_START_PARAM) - 1) + +#define SQL_STMT_ADD_PARAM ",?" +#define SQL_STMT_ADD_PARAM_LEN (sizeof(SQL_STMT_ADD_PARAM) - 1) + +#define SQL_STMT_PARAM_END ")" +#define SQL_STMT_PARAM_END_LEN (sizeof(SQL_STMT_PARAM_END) - 1) + +#define SQL_STMT_END ";" +#define SQL_STMT_END_LEN (sizeof(SQL_STMT_END) - 1) + +#define SQL_DELETE_STALE_FILE_START \ + "DELETE FROM in_tail_files " +#define SQL_DELETE_STALE_FILE_START_LEN \ + (sizeof(SQL_DELETE_STALE_FILE_START) - 1) + +#define SQL_DELETE_STALE_FILE_WHERE \ + "WHERE inode NOT IN " +#define SQL_DELETE_STALE_FILE_WHERE_LEN \ + (sizeof(SQL_DELETE_STALE_FILE_WHERE) - 1) + #define SQL_PRAGMA_SYNC \ "PRAGMA synchronous=%i;" diff --git a/tests/runtime/in_tail.c b/tests/runtime/in_tail.c index ee5fba88744..756c3d22dc8 100644 --- a/tests/runtime/in_tail.c +++ b/tests/runtime/in_tail.c @@ -1545,6 +1545,195 @@ void flb_test_db() test_tail_ctx_destroy(ctx); unlink(db); } + +void flb_test_db_delete_stale_file() +{ + struct flb_lib_out_cb cb_data; + struct test_tail_ctx *ctx; + char *org_file[] = {"test_db.log", "test_db_stale.log"}; + char *tmp_file[] = {"test_db.log"}; + char *path = "test_db.log, test_db_stale.log"; + char *move_file[] = {"test_db_stale.log", "test_db_stale_new.log"}; + char *new_file[] = {"test_db.log", "test_db_stale_new.log"}; + char *new_path = "test_db.log, test_db_stale_new.log"; + char *empty_path = "empty.log"; + char *db = "test_db.db"; + char *msg_init = "hello world"; + char *msg_end = "hello db end"; + int i; + int ret; + int num; + int unused; + + unlink(db); + + clear_output_num(); + + cb_data.cb = cb_count_msgpack; + cb_data.data = &unused; + + ctx = test_tail_ctx_create(&cb_data, + &org_file[0], + sizeof(org_file)/sizeof(char *), + FLB_FALSE); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->o_ffd, + "path", path, + "read_from_head", "true", + "db", db, + "db.sync", "full", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + ret = write_msg(ctx, msg_init, strlen(msg_init)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + unlink(db); + exit(EXIT_FAILURE); + } + + /* waiting to flush */ + flb_time_msleep(500); + + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no output"); + } + + if (ctx->fds != NULL) { + for (i=0; ifd_num; i++) { + close(ctx->fds[i]); + } + flb_free(ctx->fds); + } + flb_stop(ctx->flb); + flb_destroy(ctx->flb); + flb_free(ctx); + + /* re-init to use db */ + clear_output_num(); + + /* + * Changing the file name from 'test_db_stale.log' to + * 'test_db_stale_new.log.' In this scenario, it is assumed that the + * file was deleted after the FluentBit was terminated. However, since + * the FluentBit was shutdown, the inode remains in the database. + * The reason for renaming is to preserve the existing file for later use. + */ + ret = rename(move_file[0], move_file[1]); + TEST_CHECK(ret == 0); + + cb_data.cb = cb_count_msgpack; + cb_data.data = &unused; + + ctx = test_tail_ctx_create(&cb_data, + &tmp_file[0], + sizeof(tmp_file)/sizeof(char *), + FLB_FALSE); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + unlink(db); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->o_ffd, + "path", path, + "read_from_head", "true", + "db", db, + "db.sync", "full", + NULL); + TEST_CHECK(ret == 0); + + /* + * Start the engine + * FluentBit will delete stale inodes. + */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* waiting to flush */ + flb_time_msleep(500); + + if (ctx->fds != NULL) { + for (i=0; ifd_num; i++) { + close(ctx->fds[i]); + } + flb_free(ctx->fds); + } + flb_stop(ctx->flb); + flb_destroy(ctx->flb); + flb_free(ctx); + + /* re-init to use db */ + clear_output_num(); + + cb_data.cb = cb_count_msgpack; + cb_data.data = &unused; + + ctx = test_tail_ctx_create(&cb_data, + &new_file[0], + sizeof(new_file)/sizeof(char *), + FLB_FALSE); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + unlink(db); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->o_ffd, + "path", new_path, + "read_from_head", "true", + "db", db, + "db.sync", "full", + NULL); + TEST_CHECK(ret == 0); + + /* + * Start the engine + * 'test_db_stale_new.log.' is a new file. + * The inode of 'test_db_stale.log' was deleted previously. + * So, it reads from the beginning of the file. + */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* waiting to flush */ + flb_time_msleep(500); + + ret = write_msg(ctx, msg_end, strlen(msg_end)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + unlink(db); + exit(EXIT_FAILURE); + } + + /* waiting to flush */ + flb_time_msleep(500); + + num = get_output_num(); + if (!TEST_CHECK(num == 3)) { + /* 3 = + * test_db.log : "hello db end" + * test_db_stale.log : "msg_init" + "hello db end" + */ + TEST_MSG("num error. expect=3 got=%d", num); + } + + test_tail_ctx_destroy(ctx); + unlink(db); +} #endif /* FLB_HAVE_SQLDB */ /* Test list */ @@ -1569,6 +1758,7 @@ TEST_LIST = { #ifdef FLB_HAVE_SQLDB {"db", flb_test_db}, + {"db_delete_stale_file", flb_test_db_delete_stale_file}, #endif #ifdef in_tail