Skip to content

Commit

Permalink
Added optional lsn data to json output
Browse files Browse the repository at this point in the history
  • Loading branch information
Ethan committed Mar 31, 2015
1 parent 0ed1259 commit 909393d
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
*.o
*.so
27 changes: 27 additions & 0 deletions wal2json.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/relcache.h"
#include "utils/syscache.h"
Expand All @@ -40,6 +41,7 @@ typedef struct
{
MemoryContext context;
bool include_xids; /* include transaction ids */
bool include_lsn; /* include lsn for safer syncing */
bool include_timestamp; /* include transaction timestamp */
bool include_schemas; /* qualify tables */
bool include_types; /* include data types */
Expand Down Expand Up @@ -91,6 +93,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
data->include_xids = true;
data->include_lsn = true;
data->include_timestamp = false;
data->include_schemas = true;
data->include_types = true;
Expand Down Expand Up @@ -160,6 +163,20 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
else if (strcmp(elem->defname, "include-lsn") == 0)
{
/* If option does not provide a value, it means its value is true */
if (elem->arg == NULL)
{
elog(LOG, "include-lsn argument is null");
data->include_xids = true;
}
else if (!parse_bool(strVal(elem->arg), &data->include_lsn))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
else
{
elog(WARNING, "option %s = %s is unknown",
Expand Down Expand Up @@ -194,6 +211,16 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
if (data->include_xids)
appendStringInfo(ctx->out, "\t\"xid\": %u,\n", txn->xid);

if (data->include_lsn) {
// uint32 id, off;
// id = (uint32) (ctx->write_location >> 32);
// off = (uint32) ctx->write_location;
// appendStringInfo(ctx->out, "\t\"lsn\": %X/%X,\n", id, off);
char *lsn_str = DatumGetCString(DirectFunctionCall1(pg_lsn_out, ctx->write_location));
appendStringInfo(ctx->out, "\t\"lsn\": \"%s\",\n", lsn_str);
pfree(lsn_str);
}

if (data->include_timestamp)
appendStringInfo(ctx->out, "\t\"timestamp\": \"%s\",\n", timestamptz_to_str(txn->commit_time));

Expand Down

0 comments on commit 909393d

Please sign in to comment.