Skip to content

Commit

Permalink
in_tail: Delete unmanaged inodes from db during startup (fluent#8025)…
Browse files Browse the repository at this point in the history
… (1/2)

To prevent incorrect inode references,
FluentBit automatically removes unmanaged inodes during startup.

Signed-off-by: jinyong.choi <[email protected]>
  • Loading branch information
jinyongchoi committed Oct 18, 2023
1 parent 672acb4 commit 15ba9ec
Show file tree
Hide file tree
Showing 5 changed files with 378 additions and 0 deletions.
9 changes: 9 additions & 0 deletions plugins/in_tail/tail.c
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,15 @@ static int in_tail_init(struct flb_input_instance *in,
/* Scan path */
flb_tail_scan(ctx->path_list, ctx);

#ifdef FLB_HAVE_SQLDB
/* Delete stale files that are not monitored from the database */
ret = flb_tail_db_stale_file_delete(in, config, ctx);
if (ret == -1) {
flb_tail_config_destroy(ctx);
return -1;
}
#endif

/*
* After the first scan (on start time), all new files discovered needs to be
* read from head, so we switch the 'read_from_head' flag to true so any
Expand Down
154 changes: 154 additions & 0 deletions plugins/in_tail/tail_db.c
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,42 @@ static int db_file_insert(struct flb_tail_file *file, struct flb_tail_config *ct
return flb_sqldb_last_id(ctx->db);
}

static int stmt_add_param_concat(struct flb_tail_config *ctx,
flb_sds_t *stmt_sql, uint64_t count)
{
uint64_t i;
flb_sds_t sds_tmp;

sds_tmp = flb_sds_cat(*stmt_sql, SQL_STMT_START_PARAM,
SQL_STMT_START_PARAM_LEN);
if (sds_tmp == NULL) {
flb_plg_debug(ctx->ins, "error concatenating stmt_sql: param start");
return -1;
}
*stmt_sql = sds_tmp;

for (i = 1; i < count; i++) {
sds_tmp = flb_sds_cat(*stmt_sql, SQL_STMT_ADD_PARAM,
SQL_STMT_ADD_PARAM_LEN);
if (sds_tmp == NULL) {
flb_plg_debug(ctx->ins, "error concatenating stmt_sql: add param");
return -1;
}

*stmt_sql = sds_tmp;
}

sds_tmp = flb_sds_cat(*stmt_sql, SQL_STMT_PARAM_END,
SQL_STMT_PARAM_END_LEN);
if (sds_tmp == NULL) {
flb_plg_debug(ctx->ins, "error concatenating stmt_sql: param end");
return -1;
}
*stmt_sql = sds_tmp;

return 0;
}

int flb_tail_db_file_set(struct flb_tail_file *file,
struct flb_tail_config *ctx)
{
Expand Down Expand Up @@ -275,3 +311,121 @@ int flb_tail_db_file_delete(struct flb_tail_file *file,
flb_plg_debug(ctx->ins, "db: file deleted from database: %s", file->name);
return 0;
}

/*
* Delete stale file from database
*/
int flb_tail_db_stale_file_delete(struct flb_input_instance *ins,
struct flb_config *config,
struct flb_tail_config *ctx)
{
int ret = -1;
size_t sql_size;
uint64_t i;
uint64_t file_count = ctx->files_static_count;
flb_sds_t stale_delete_sql;
flb_sds_t sds_tmp;
sqlite3_stmt *stmt_delete_inodes = NULL;
struct mk_list *tmp;
struct mk_list *head;
struct flb_tail_file *file;

if (!ctx->db) {
return 0;
}

/* Create a stmt sql buffer */
sql_size = SQL_DELETE_STALE_FILE_START_LEN;
sql_size += SQL_DELETE_STALE_FILE_WHERE_LEN;
sql_size += SQL_STMT_START_PARAM_LEN;
sql_size += SQL_STMT_PARAM_END_LEN;
sql_size += SQL_STMT_END_LEN;
if (file_count > 0) {
sql_size += (SQL_STMT_ADD_PARAM_LEN * file_count);
}

stale_delete_sql = flb_sds_create_size(sql_size + 1);
if (!stale_delete_sql) {
flb_plg_error(ctx->ins, "cannot allocate buffer for stale_delete_sql:"
" size: %zu", sql_size);
return -1;
}

/* Create a stmt sql */
sds_tmp = flb_sds_cat(stale_delete_sql, SQL_DELETE_STALE_FILE_START,
SQL_DELETE_STALE_FILE_START_LEN);
if (sds_tmp == NULL) {
flb_plg_error(ctx->ins,
"error concatenating stale_delete_sql: start");
flb_sds_destroy(stale_delete_sql);
return -1;
}
stale_delete_sql = sds_tmp;

if (file_count > 0) {
sds_tmp = flb_sds_cat(stale_delete_sql, SQL_DELETE_STALE_FILE_WHERE,
SQL_DELETE_STALE_FILE_WHERE_LEN);
if (sds_tmp == NULL) {
flb_plg_error(ctx->ins,
"error concatenating stale_delete_sql: where");
flb_sds_destroy(stale_delete_sql);
return -1;
}
stale_delete_sql = sds_tmp;

ret = stmt_add_param_concat(ctx, &stale_delete_sql, file_count);
if (ret == -1) {
flb_plg_error(ctx->ins,
"error concatenating stale_delete_sql: param");
flb_sds_destroy(stale_delete_sql);
return -1;
}
}

sds_tmp = flb_sds_cat(stale_delete_sql, SQL_STMT_END, SQL_STMT_END_LEN);
if (sds_tmp == NULL) {
flb_plg_error(ctx->ins,
"error concatenating stale_delete_sql: end");
flb_sds_destroy(stale_delete_sql);
return -1;
}
stale_delete_sql = sds_tmp;

/* Prepare stmt */
ret = sqlite3_prepare_v2(ctx->db->handler, stale_delete_sql, -1,
&stmt_delete_inodes, 0);
if (ret != SQLITE_OK) {
flb_plg_error(ctx->ins, "error preparing database SQL statement:"
" stmt_delete_inodes sql:%s, ret=%d", stale_delete_sql,
ret);
flb_sds_destroy(stale_delete_sql);
return -1;
}

/* Bind parameters */
i = 1;
mk_list_foreach_safe(head, tmp, &ctx->files_static) {
file = mk_list_entry(head, struct flb_tail_file, _head);
sqlite3_bind_int64(stmt_delete_inodes, i, file->inode);
i++;
}

/* Run the delete inodes */
ret = sqlite3_step(stmt_delete_inodes);
if (ret != SQLITE_DONE) {
sqlite3_finalize(stmt_delete_inodes);
flb_sds_destroy(stale_delete_sql);
flb_plg_error(ctx->ins, "cannot execute delete stale inodes: ret=%d",
ret);
return -1;
}

ret = sqlite3_changes(ctx->db->handler);
flb_plg_info(ctx->ins, "db: delete unmonitored stale inodes from the"
" database: count=%d", ret);

sqlite3_finalize(stmt_delete_inodes);
flb_sds_destroy(stale_delete_sql);

return 0;
}
3 changes: 3 additions & 0 deletions plugins/in_tail/tail_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,7 @@ int flb_tail_db_file_rotate(const char *new_name,
struct flb_tail_config *ctx);
int flb_tail_db_file_delete(struct flb_tail_file *file,
struct flb_tail_config *ctx);
int flb_tail_db_stale_file_delete(struct flb_input_instance *ins,
struct flb_config *config,
struct flb_tail_config *ctx);
#endif
22 changes: 22 additions & 0 deletions plugins/in_tail/tail_sql.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,28 @@
#define SQL_DELETE_FILE \
"DELETE FROM in_tail_files WHERE id=@id;"

#define SQL_STMT_START_PARAM "(?"
#define SQL_STMT_START_PARAM_LEN (sizeof(SQL_STMT_START_PARAM) - 1)

#define SQL_STMT_ADD_PARAM ",?"
#define SQL_STMT_ADD_PARAM_LEN (sizeof(SQL_STMT_ADD_PARAM) - 1)

#define SQL_STMT_PARAM_END ")"
#define SQL_STMT_PARAM_END_LEN (sizeof(SQL_STMT_PARAM_END) - 1)

#define SQL_STMT_END ";"
#define SQL_STMT_END_LEN (sizeof(SQL_STMT_END) - 1)

#define SQL_DELETE_STALE_FILE_START \
"DELETE FROM in_tail_files "
#define SQL_DELETE_STALE_FILE_START_LEN \
(sizeof(SQL_DELETE_STALE_FILE_START) - 1)

#define SQL_DELETE_STALE_FILE_WHERE \
"WHERE inode NOT IN "
#define SQL_DELETE_STALE_FILE_WHERE_LEN \
(sizeof(SQL_DELETE_STALE_FILE_WHERE) - 1)

#define SQL_PRAGMA_SYNC \
"PRAGMA synchronous=%i;"

Expand Down
Loading

0 comments on commit 15ba9ec

Please sign in to comment.