Skip to content

Commit

Permalink
Mysql2 execute (#68)
Browse files Browse the repository at this point in the history
* capture Connection.execute for mysql2

* simplify code
  • Loading branch information
jafl authored and haotianw465 committed Oct 31, 2018
1 parent ef71b60 commit 160c775
Showing 1 changed file with 77 additions and 61 deletions.
138 changes: 77 additions & 61 deletions packages/mysql/lib/mysql_p.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ var PREPARED = 'statement';

/**
* Patches the Node MySQL client to automatically capture query information for the segment.
* Connection.query and pool.query calls are automatically captured.
* In manual mode, connection.query and pool.query require a segment or subsegment object
* as an additional argument as the last argument for the query.
* Connection.query, connection.execute, and pool.query calls are automatically captured.
* In manual mode, these functions require a segment or subsegment object as an additional,
* last argument.
* @param {mysql} module - The MySQL npm module.
* @returns {mysql}
* @see https://github.com/mysqljs/mysql
Expand All @@ -38,15 +38,11 @@ function patchCreateConnection(mysql) {
var connection = mysql[baseFcn].apply(connection, arguments);
if (connection instanceof Promise) {
connection = connection.then((result) => {
if (result.connection.query instanceof Function) {
result.connection.__query = result.connection.query;
result.connection.query = captureQuery;
}
patchObject(result.connection);
return result;
});
} else if (connection.query instanceof Function) {
connection.__query = connection.query;
connection.query = captureQuery;
patchObject(connection);
}
return connection;
};
Expand All @@ -58,13 +54,30 @@ function patchCreatePool(mysql) {

mysql['createPool'] = function patchedCreatePool() {
var pool = mysql[baseFcn].apply(pool, arguments);
pool.__query = pool.query;
pool.query = captureQuery;

if (pool instanceof Promise) {
pool = pool.then((result) => {
patchObject(result.pool);
return result;
});
} else if (pool.query instanceof Function) {
patchObject(pool);
}
return pool;
};
}

function patchObject(connection) {
if (connection.query instanceof Function) {
connection.__query = connection.query;
connection.query = captureOperation('query');
}

if (connection.execute instanceof Function) {
connection.__execute = connection.execute;
connection.execute = captureOperation('execute');
}
}

function resolveArguments(argsObj) {
var args = {};

Expand All @@ -86,75 +99,78 @@ function resolveArguments(argsObj) {
return args;
}

function captureQuery() {
var args = resolveArguments(arguments);
var parent = AWSXRay.resolveSegment(args.segment);
var query;
function captureOperation(name) {
return function() {
var args = resolveArguments(arguments);
var parent = AWSXRay.resolveSegment(args.segment);
var command;
var originalOperation = this['__'+name];

if (args.segment)
delete arguments[arguments.length-1];
if (args.segment)
delete arguments[arguments.length-1];

if (!parent) {
AWSXRay.getLogger().info('Failed to capture MySQL. Cannot resolve sub/segment.');
return this.__query.apply(this, arguments);
}
if (!parent) {
AWSXRay.getLogger().info('Failed to capture MySQL. Cannot resolve sub/segment.');
return originalOperation.apply(this, arguments);
}

var config = this.config.connectionConfig || this.config;
var subsegment = parent.addNewSubsegment(config.database + '@' + config.host);
var config = this.config.connectionConfig || this.config;
var subsegment = parent.addNewSubsegment(config.database + '@' + config.host);

if (args.callback) {
var cb = args.callback;
if (args.callback) {
var cb = args.callback;

if (AWSXRay.isAutomaticMode()) {
args.callback = function autoContext(err, data) {
var session = AWSXRay.getNamespace();
if (AWSXRay.isAutomaticMode()) {
args.callback = function autoContext(err, data) {
var session = AWSXRay.getNamespace();

session.run(function() {
AWSXRay.setSegment(subsegment);
cb(err, data);
});
session.run(function() {
AWSXRay.setSegment(subsegment);
cb(err, data);
});

subsegment.close(err);
};
} else {
args.callback = function wrappedCallback(err, data) {
cb(err, data);
subsegment.close(err);
};
subsegment.close(err);
};
} else {
args.callback = function wrappedCallback(err, data) {
cb(err, data);
subsegment.close(err);
};
}
}
}

query = this.__query.call(this, args.sql, args.values, args.callback);
command = originalOperation.call(this, args.sql, args.values, args.callback);

if (!args.callback) {
query.on('end', function() {
subsegment.close();
});
if (!args.callback) {
command.on('end', function() {
subsegment.close();
});

var errorCapturer = function (err) {
subsegment.close(err);
var errorCapturer = function (err) {
subsegment.close(err);

if (this._events && this._events.error && this._events.error.length === 1) {
this.removeListener('error', errorCapturer);
this.emit('error', err);
}
};
if (this._events && this._events.error && this._events.error.length === 1) {
this.removeListener('error', errorCapturer);
this.emit('error', err);
}
};

query.on('error', errorCapturer);
}
command.on('error', errorCapturer);
}

subsegment.addSqlData(createSqlData(config, query));
subsegment.namespace = 'remote';
subsegment.addSqlData(createSqlData(config, command));
subsegment.namespace = 'remote';

return query;
return command;
}
}

function createSqlData(config, query) {
var queryType = query.values ? PREPARED : null;
function createSqlData(config, command) {
var commandType = command.values ? PREPARED : null;

var data = new SqlData(DATABASE_VERS, DRIVER_VERS, config.user,
config.host + ':' + config.port + '/' + config.database,
queryType);
commandType);

return data;
}

0 comments on commit 160c775

Please sign in to comment.