diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9d22eb4 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +*.o +*.so diff --git a/wal2json.c b/wal2json.c index e5dc672..ac9a58c 100644 --- a/wal2json.c +++ b/wal2json.c @@ -26,6 +26,7 @@ #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/pg_lsn.h" #include "utils/rel.h" #include "utils/relcache.h" #include "utils/syscache.h" @@ -40,6 +41,7 @@ typedef struct { MemoryContext context; bool include_xids; /* include transaction ids */ + bool include_lsn; /* include lsn for safer syncing */ bool include_timestamp; /* include transaction timestamp */ bool include_schemas; /* qualify tables */ bool include_types; /* include data types */ @@ -91,6 +93,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); data->include_xids = true; + data->include_lsn = true; data->include_timestamp = false; data->include_schemas = true; data->include_types = true; @@ -160,6 +163,20 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "include-lsn") == 0) + { + /* If option does not provide a value, it means its value is true */ + if (elem->arg == NULL) + { + elog(LOG, "include-lsn argument is null"); + data->include_xids = true; + } + else if (!parse_bool(strVal(elem->arg), &data->include_lsn)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } else { elog(WARNING, "option %s = %s is unknown", @@ -194,6 +211,16 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) if (data->include_xids) appendStringInfo(ctx->out, "\t\"xid\": %u,\n", txn->xid); + if (data->include_lsn) { + // uint32 id, off; + // id = (uint32) (ctx->write_location >> 32); + // off = (uint32) ctx->write_location; + // appendStringInfo(ctx->out, "\t\"lsn\": %X/%X,\n", id, off); + char *lsn_str = DatumGetCString(DirectFunctionCall1(pg_lsn_out, ctx->write_location)); + appendStringInfo(ctx->out, "\t\"lsn\": \"%s\",\n", lsn_str); + pfree(lsn_str); + } + if (data->include_timestamp) appendStringInfo(ctx->out, "\t\"timestamp\": \"%s\",\n", timestamptz_to_str(txn->commit_time));