From a5586020db7bfa9b029b510766f145e6bd96fc18 Mon Sep 17 00:00:00 2001 From: Gary Arora Date: Tue, 22 Jan 2019 18:49:31 -0500 Subject: [PATCH] v3.0 with utility query support --- README.md | 15 +++- lib/athenaExpress.js | 200 +++++++++++++++++++++++++++---------------- package.json | 2 +- 3 files changed, 142 insertions(+), 75 deletions(-) diff --git a/README.md b/README.md index b279d29..956ecac 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Athena-Express: Simplifying SQL queries on Amazon Athena +# New Document# Athena-Express: Simplifying SQL queries on Amazon Athena [![NPM](https://nodei.co/npm/athena-express.png?compact=true)](https://nodei.co/npm/athena-express/) @@ -298,6 +298,19 @@ exports.handler = async (event, context, callback) => { Athena-Express result +## More Examples +###### UTILITY queries - Added in v3.0 + ```javascript + const results = await athenaExpress.query("SHOW TABLES"); +console.log(results); + +//Output: +{ Items: + [ { row: 'default' }, + { row: 'sampledb' } ] } + ``` + + ## Contributors [Gary Arora](https://twitter.com/AroraGary) diff --git a/lib/athenaExpress.js b/lib/athenaExpress.js index 8664596..1231e44 100644 --- a/lib/athenaExpress.js +++ b/lib/athenaExpress.js @@ -4,12 +4,15 @@ const COST_PER_MB = 0.000004768, //Based on $5/TB BYTES_IN_MB = 1048576, COST_FOR_10MB = COST_PER_MB * 10; +const readline = require("readline"); + module.exports = class AthenaExpress { constructor(init) { validateConstructor(init); this.config = { athena: new init.aws.Athena({ apiVersion: "2017-05-18" }), - s3: + s3: new init.aws.S3({ apiVersion: "2006-03-01" }), + s3Bucket: init.s3 || `s3://athena-express-${init.aws.config.credentials.accessKeyId .substring(0, 10) @@ -22,34 +25,35 @@ module.exports = class AthenaExpress { } async query(query) { + const config = this.config; let results = {}; - if (!this.config) + if (!config) throw new TypeError("Config object not present in the constructor"); if (!query) throw new TypeError("SQL query is missing"); try { - const queryExecutionId = await startQueryExecution( - query, - this.config - ); - const stats = await checkIfExecutionCompleted( - queryExecutionId, - this.config - ); - const queryResults = await getQueryResults( + const queryExecutionId = await startQueryExecution(query, config); + const queryStatus = await checkIfExecutionCompleted( queryExecutionId, - this.config + config ); - results.Items = this.config.formatJson - ? cleanUpResults(queryResults) - : queryResults; + const s3Output = + queryStatus.QueryExecution.ResultConfiguration + .OutputLocation, + statementType = queryStatus.QueryExecution.StatementType; + + results.Items = await getQueryResultsFromS3({ + s3Output, + statementType, + config + }); - if (this.config.getStats) { + if (config.getStats) { const dataInMb = Math.round( - stats.QueryExecution.Statistics.DataScannedInBytes / + queryStatus.QueryExecution.Statistics.DataScannedInBytes / BYTES_IN_MB ); @@ -57,7 +61,7 @@ module.exports = class AthenaExpress { results.QueryCostInUSD = dataInMb > 10 ? dataInMb * COST_PER_MB : COST_FOR_10MB; results.EngineExecutionTimeInMillis = - stats.QueryExecution.Statistics.EngineExecutionTimeInMillis; + queryStatus.QueryExecution.Statistics.EngineExecutionTimeInMillis; results.Count = results.Items.length; } @@ -68,25 +72,27 @@ module.exports = class AthenaExpress { } }; -async function startQueryExecution(query, config) { +function startQueryExecution(query, config) { + const QueryString = query.sql || query; + const params = { - QueryString: query.sql || query, + QueryString, ResultConfiguration: { - OutputLocation: config.s3 + OutputLocation: config.s3Bucket }, QueryExecutionContext: { Database: query.db || config.db } }; return new Promise(function(resolve, reject) { - let startQueryExecutionRecursively = async function() { + const startQueryExecutionRecursively = async function() { try { let data = await config.athena .startQueryExecution(params) .promise(); resolve(data.QueryExecutionId); } catch (err) { - commonAthenaErrors(err) + isCommonAthenaError(err.code) ? setTimeout(() => { startQueryExecutionRecursively(); }, 2000) @@ -97,10 +103,10 @@ async function startQueryExecution(query, config) { }); } -async function checkIfExecutionCompleted(QueryExecutionId, config) { +function checkIfExecutionCompleted(QueryExecutionId, config) { let retry = config.retry; return new Promise(function(resolve, reject) { - let keepCheckingRecursively = async function() { + const keepCheckingRecursively = async function() { try { let data = await config.athena .getQueryExecution({ QueryExecutionId }) @@ -116,7 +122,7 @@ async function checkIfExecutionCompleted(QueryExecutionId, config) { }, retry); } } catch (err) { - if (commonAthenaErrors(err)) { + if (isCommonAthenaError(err.code)) { retry = 2000; setTimeout(() => { keepCheckingRecursively(); @@ -128,50 +134,21 @@ async function checkIfExecutionCompleted(QueryExecutionId, config) { }); } -async function getQueryResults(QueryExecutionId, config) { - return new Promise(function(resolve, reject) { - let gettingQueryResultsRecursively = async function() { - try { - let queryResults = await config.athena - .getQueryResults({ QueryExecutionId }) - .promise(); - resolve(queryResults.ResultSet.Rows); - } catch (err) { - commonAthenaErrors(err) - ? setTimeout(() => { - gettingQueryResultsRecursively(); - }, 2000) - : reject(err); - } - }; - gettingQueryResultsRecursively(); - }); -} - -function cleanUpResults(results) { - if (!results.length) return results; - - let rowIterator = 1, - columnIterator = 0, - cleanedUpObject = {}, - cleanedUpResults = []; - - const rowCount = results.length, - fieldNames = results[0].Data, - columnCount = fieldNames.length; - - for (; rowIterator < rowCount; rowIterator++) { - for (; columnIterator < columnCount; columnIterator++) { - cleanedUpObject[ - Object.values(fieldNames[columnIterator])[0] - ] = Object.values(results[rowIterator].Data[columnIterator])[0]; - } - cleanedUpResults.push(cleanedUpObject); - cleanedUpObject = {}; - columnIterator = 0; +function getQueryResultsFromS3(params) { + const s3Params = { + Bucket: params.s3Output.split("/")[2], + Key: params.s3Output.split("/")[3] + }, + input = params.config.s3.getObject(s3Params).createReadStream(), + lineReader = readline.createInterface({ input }); + + if (params.config.formatJson) { + return params.statementType === "DML" + ? cleanUpDML(lineReader) + : cleanUpNonDML(lineReader); + } else { + return getRawResultsFromS3(lineReader); } - - return cleanedUpResults; } function validateConstructor(init) { @@ -188,10 +165,87 @@ function validateConstructor(init) { } } -function commonAthenaErrors(err) { - return err.code === "TooManyRequestsException" || - err.code === "ThrottlingException" || - err.code === "NetworkingError" +function isCommonAthenaError(err) { + return err === "TooManyRequestsException" || + err === "ThrottlingException" || + err === "NetworkingError" || + err === "UnknownEndpoint" + ? true + : false; +} + +function isCommonS3Error(err) { + return err === "NetworkingError" || + err === "StreamContentLengthMismatch" || + err === "NoSuchKey" ? true : false; } + +function getRawResultsFromS3(lineReader) { + let rawJson = []; + return new Promise(function(resolve, reject) { + lineReader + .on("line", line => { + rawJson.push(line.trim()); + }) + .on("close", function() { + resolve(rawJson); + }); + z; + }); +} + +function cleanUpDML(lineReader) { + let headerList = [], + isFirstRecord = true, + cleanJson = [], + noOfColumns = 0, + singleJsonRow = {}; + + return new Promise(function(resolve, reject) { + lineReader + .on("line", line => { + line = line.substring(1, line.length - 1).split('","'); + + if (isFirstRecord) { + headerList = line; + isFirstRecord = false; + } else { + singleJsonRow = {}; + noOfColumns = line.length; + for (let i = 0; i < noOfColumns; i++) { + singleJsonRow[[headerList[i]]] = line[i]; + } + cleanJson.push(singleJsonRow); + } + }) + .on("close", function() { + resolve(cleanJson); + }); + }); +} + +function cleanUpNonDML(lineReader) { + let cleanJson = []; + return new Promise(function(resolve, reject) { + lineReader + .on("line", line => { + switch (true) { + case line.indexOf("\t") > 0: + line = line.split("\t"); + cleanJson.push({ + [line[0].trim()]: line[1].trim() + }); + break; + default: + cleanJson.push({ + row: line.trim() + }); + } + }) + .on("close", function() { + resolve(cleanJson); + }); + }); +} diff --git a/package.json b/package.json index 0da98ea..e99ae93 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "athena-express", - "version": "2.0.6", + "version": "3.0.0", "description": "Athena-Express makes it easier to execute SQL queries on Amazon Athena by consolidating & astracting several methods in the AWS SDK", "main": "./lib/index.js", "scripts": {