Skip to content
This repository has been archived by the owner on May 27, 2020. It is now read-only.

Allow creation of CSV files with newlines in fields #9

Open
wants to merge 31 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
f1c0a2b
npm test should actually run the tests
mkopinsky Jul 3, 2017
eba6fca
Don't pin an old version of mocha and chai
mkopinsky Jul 3, 2017
4f4e33f
We don't need to tell json2csv what our fields are, it can figure it …
mkopinsky Jul 3, 2017
eaa3122
Upgrade to a newer version of json2csv, and pass the preserveNewLines…
mkopinsky Jul 3, 2017
2a21f55
0.2.0
mkopinsky Jul 3, 2017
0779b0d
Switch to my fork of json2csv until they accept my PR (https://github…
mkopinsky Jul 25, 2017
7a72a75
Switch back to npm-published version of json2csv
mkopinsky Aug 22, 2017
aaaed42
Allow a 'removeOlderThan' value of 0
mkopinsky Nov 22, 2017
d2fa555
Bump dependencies
mkopinsky Nov 26, 2017
5b320d0
0.2.1
mkopinsky Nov 26, 2017
8989244
Whitespace
mkopinsky Nov 26, 2017
282e3ed
Add eslint, and do automatic --fix'es
mkopinsky Nov 26, 2017
9d04d1d
Manual fixes based on eslint, disabling some rules that would be hard…
mkopinsky Nov 26, 2017
3b8913a
0.2.2
mkopinsky Nov 26, 2017
87a31c7
Fix syntax error
mkopinsky Feb 9, 2018
2be02bf
Fix issue where we were matching file names poorly for deletion.
benirose Feb 16, 2018
11c0fd9
Merge pull request #1 from benirose/master
mkopinsky Feb 17, 2018
0e84fd2
Switch package.json to use waytohealth namespace in URLs and package …
mkopinsky Feb 22, 2018
c43e24a
0.2.3
mkopinsky Feb 22, 2018
e2346b8
Stream query results
adamleko Feb 20, 2019
c64d9b8
Return error and remove empty file when queries have no results
adamleko Feb 20, 2019
691f388
Call rmDir only after a successful export
adamleko Feb 25, 2019
52d9db8
Merge pull request #2 from LucidCardinality/stream
skipcoon Mar 5, 2019
cb43a04
Update albedo.js
skipcoon Apr 10, 2019
32398a6
Update albedo.js
skipcoon Apr 11, 2019
fc082dc
Update albedo.js
skipcoon Apr 11, 2019
935c833
Will keep the error forwarding to a min
skipcoon Apr 12, 2019
21ab00d
lint picker
skipcoon Apr 12, 2019
6592220
Merge pull request #3 from waytohealth/HF-4421
benirose Apr 17, 2019
b2ab2cf
0.3.1
Apr 30, 2019
179c93e
Update package lock (no version changes, just new npm version doing d…
Apr 30, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module.exports = {
"extends": "airbnb-base",
rules: {
"no-console": "off",
"no-prototype-builtins": "off",
"no-use-before-define": ["error", "nofunc"],
"consistent-return": "off"
}
};
187 changes: 108 additions & 79 deletions albedo.js
Original file line number Diff line number Diff line change
@@ -1,116 +1,145 @@
var json2csv = require('json2csv');
var mysql = require('mysql');
var fs = require('fs');
var moment = require('moment');
var _ = require('underscore');
const { Transform } = require('stream');
const CSVTransform = require('json2csv').Transform;
const mysql = require('mysql');
const fs = require('fs');
const moment = require('moment');
const _ = require('underscore');

module.exports = {
/**
/**
* Processes a report for the given options
*
* @param {obj} options
* @param {function} callback
*/
processReport: function(options, callback) {

processReport(options, callback) {
if (!options) {
return callback('processReport requires options, see documentation');
}

if (options.connection.type != "mysql") {
if (options.connection.type !== 'mysql') {
return callback('The selected database type is not yet supported');
}

var connection = mysql.createConnection({
const connection = mysql.createConnection({
host: options.connection.host,
user: options.connection.user,
password: options.connection.password,
database: options.connection.database,
insecureAuth: true
insecureAuth: true,
});

connection.query(options.query, function (err, rows, fields) {

if (err) {
return callback(err);
}
if (rows.length == 0) {
return callback('No records for query');
}
// set up input, csv, and output streams
const fileName = `${options.name}_${moment().format('YYYY-MM-DD_HH-mm-ss')}.csv`;
const outputPath = `${options.location}/${fileName}`;

// Allow calling code to pass either a function or an array of functions
// with which to process each row of data
if (_.isArray(options.process_row)) {
_.each(options.process_row, function(func) {
rows = _.map(rows, func);
});
} else if (_.isFunction(options.process_row)) {
rows = _.map(rows, options.process_row);
}
const input = connection.query(options.query).stream({ highWaterMark: 64 });
const output = fs.createWriteStream(outputPath, { encoding: 'utf8' });
const json2csv = new CSVTransform(null, { objectMode: true });

var fields = Object.keys(rows[0]);
// handle row transformations
let empty = true;
const rowTransform = new Transform({
writableObjectMode: true,
readableObjectMode: true,

return json2csv(
{
data: rows,
fields: fields
},
function (err, csv) {
if (err) {
return callback(err);
transform(chunk, enc, handler) {
try {
let row = chunk;
if (_.isArray(options.process_row)) {
_.each(options.process_row, (func) => {
row = func(row);
});
} else if (_.isFunction(options.process_row)) {
row = options.process_row(row);
}
handler(null, row);
empty = false;
} catch (e) {
handler(e);
}
},
});

if(options.removeOlderThan) {
rmDir(options.location, options);
}
//make the new report
var fileName = options.name + "_" + moment().format("YYYY-MM-DD_HH-mm-ss") + '.csv';
fs.writeFile(options.location + "/" + fileName, csv, function (err) {
if (err) {
return callback(err);
}
var reportInfo = {
name: fileName,
path: options.location + '/'
};
// route errors from streams to callback
let wasError = false;
function forwardError(e) {
wasError = true;
callback(e);
}
input.on('error', forwardError);
rowTransform.on('error', forwardError);
json2csv.on('error', forwardError);
output.on('error', forwardError);

callback(null, reportInfo);
});
// route output stream finish event to callback
output.on('close', () => {
if (empty) {
// don't leave empty report files if there were no results
fs.unlink(outputPath, (e) => {
if (e) {
callback(`Failed to remove empty report file: ${e}`);
}
});

if (!wasError) {
callback('No records for query');
}
return;
}
if (wasError) {
// suppress final report info callback on errors
return;
}
// prune older reports only after successful export
if (options.hasOwnProperty('removeOlderThan')) {
rmDir(options.location, options, outputPath);
}
// finally return new report info
const reportInfo = {
name: fileName,
path: `${options.location}/`,
};
callback(null, reportInfo);
});

// stream query results through processing pipeline
input.pipe(rowTransform).pipe(json2csv).pipe(output);
connection.end();

}
},
};

function rmDir(dirPath, options) {
function rmDir(dirPath, options, outputPath) {
// TODO: rejigger this whole thing to operate async
try { var files = fs.readdirSync(dirPath); }
catch(e) { callback(e); }
let files;
try {
files = fs.readdirSync(dirPath);
} catch (e) {
console.err('Could not delete files');
return;
}

if (files.length > 0) {
for (var i = 0; i < files.length; i++) {
var filePath = dirPath + '/' + files[i];
var fileName = files[i];
if (fs.statSync(filePath).isFile()) {
var now = moment().unix();
var daysAgo = now - (parseInt(options.removeOlderThan) * 86400);
var fileTime = moment(fs.statSync(filePath).mtime).unix();
if(fileName.substring(0, options.name.length) == options.name) {
if (fileTime < daysAgo)
{
fs.unlinkSync(filePath);
console.log("deleted: " + filePath);
}
else{
console.log("kept: " + filePath);
}
files.forEach((fileName) => {
const filePath = `${dirPath}/${fileName}`;
if (filePath === outputPath) {
console.log(`ignoring: ${filePath}`);
return;
}
if (fs.statSync(filePath).isFile()) {
const now = moment().unix();
const daysAgo = now - (parseInt(options.removeOlderThan, 10) * 86400);
const fileTime = moment(fs.statSync(filePath).mtime).unix();
// get the full name of the report from the file by removing the datetime and extension
if (fileName.slice(0, 0-'_YYYY-MM-DD_HH-mm-ss.csv'.length) === options.name) {
if (fileTime < daysAgo) {
fs.unlinkSync(filePath);
console.log(`deleted: ${filePath}`);
} else {
console.log(`kept: ${filePath}`);
}
}
else {
rmDir(filePath,options);
}
} else {
rmDir(filePath, options, outputPath);
}
}
};
});
}
Loading