Skip to content

Commit

Permalink
Add support for static imports
Browse files Browse the repository at this point in the history
  • Loading branch information
vlad-ignatov committed Jul 30, 2021
1 parent 1f5e46a commit 1dfd91a
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 59 deletions.
26 changes: 20 additions & 6 deletions build/BulkDataClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,7 @@ class BulkDataClient {
if (!this.accessToken) {
this.accessToken = await this.authorize();
}
const { body, statusCode, headers } = await this.request(contentLocation, {
responseType: "json",
headers: {
authorization: `Bearer ${this.accessToken}`
}
});
const { body, statusCode, headers } = await this.fetchExportManifest(contentLocation);
if (statusCode !== 200) {
onProgress && await onProgress(parseFloat(headers["x-progress"] + "" || "0"));
await lib_1.wait(1000);
Expand All @@ -204,6 +199,25 @@ class BulkDataClient {
onProgress && await onProgress(100);
return body;
}
/**
* This is used for both static and dynamic imports.
* - For static, `location` is the URL of the already available export
* manifest json.
* - For dynamic, `location` is the URL of the job status endpoint. If
* export is still in progress this will resolve with 202 responses and
* should be called again until status 200 is received
*/
async fetchExportManifest(location) {
if (!this.accessToken) {
this.accessToken = await this.authorize();
}
return this.request(location, {
responseType: "json",
headers: {
authorization: `Bearer ${this.accessToken}`
}
});
}
downloadFile(descriptor) {
const out = {
stream: () => {
Expand Down
53 changes: 37 additions & 16 deletions build/ImportJob.js
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ function getImportPingParameters(body) {
};
return {
exportUrl,
exportType,
exportType: exportType,
exportParams
};
}
Expand All @@ -140,23 +140,25 @@ class ImportJob {
validateKickOffBody(req);
const { exportType, exportUrl, exportParams } = getImportPingParameters(req.body);
const job = await ImportJob.create();
job.state.set("exportType", exportType);
await job.state.save();
// If the exportType is static, the Data Consumer will issue a GET
// request to the exportUrl to retrieve a Bulk Data Manifest file with
// the location of the Bulk Data files. In this abbreviated export flow,
// the Data Provider SHALL respond to the GET request with the Complete
// Status response described in the Bulk Data Export IG.
if (exportType === "static") {
job.startStaticImport(exportUrl);
await job.startStaticImport(exportUrl);
}
// If the exportType is dynamic the Data Consumer will issue a POST
// request to the exportUrl to obtain a dataset from the Data Provider,
// following the Bulk Data Export flow described in the Bulk Data Export IG.
await job.startDynamicImport(exportUrl, exportParams);
if (job.state.get("exportStatusLocation")) {
res.set("content-location", `${lib_1.getRequestBaseURL(req)}/job/${job.state.id}`);
res.status(202);
return res.end();
else {
await job.startDynamicImport(exportUrl, exportParams);
}
res.set("content-location", `${lib_1.getRequestBaseURL(req)}/job/${job.state.id}`);
res.status(202);
return res.end();
}
static async importOutcome(req, res) {
const job = await ImportJob.byId(req.params.id);
Expand All @@ -169,10 +171,14 @@ class ImportJob {
}
static async status(req, res) {
const job = await ImportJob.byId(req.params.id);
const exportProgress = job.state.get("exportProgress");
const exportType = job.state.get("exportType");
const importProgress = job.state.get("importProgress");
const status = job.state.get("status");
const progress = (exportProgress + importProgress) / 2;
let progress = importProgress;
if (exportType == "dynamic") {
const exportProgress = job.state.get("exportProgress");
progress = (exportProgress + importProgress) / 2;
}
res.set({
"Cache-Control": "no-store",
"Pragma": "no-cache"
Expand Down Expand Up @@ -300,17 +306,23 @@ class ImportJob {
});
}
// private methods ---------------------------------------------------------
getBulkDataClient() {
if (!this.client) {
this.client = new BulkDataClient_1.BulkDataClient({
clientId: config_1.default.exportClient.clientId,
privateKey: config_1.default.exportClient.privateKey,
tokenUrl: config_1.default.exportClient.tokenURL,
accessTokenLifetime: 3600,
verbose: false
});
}
return this.client;
}
async startDynamicImport(kickOffUrl, params = {}) {
this.state.set("exportUrl", kickOffUrl);
this.state.set("exportParams", params);
await this.state.save();
const bdClient = new BulkDataClient_1.BulkDataClient({
clientId: config_1.default.exportClient.clientId,
privateKey: config_1.default.exportClient.privateKey,
tokenUrl: config_1.default.exportClient.tokenURL,
accessTokenLifetime: 3600,
verbose: false
});
const bdClient = this.getBulkDataClient();
const kickOffResponse = await bdClient.kickOff(kickOffUrl, params);
const contentLocation = kickOffResponse.headers["content-location"];
this.state.set("exportStatusLocation", contentLocation);
Expand All @@ -320,6 +332,12 @@ class ImportJob {
}
async startStaticImport(manifestUrl) {
console.log(`Starting static import from ${manifestUrl}`);
const bdClient = this.getBulkDataClient();
const { body } = await bdClient.fetchExportManifest(manifestUrl);
this.state.set("exportProgress", 100);
this.state.set("manifest", body);
await this.state.save();
setImmediate(() => this.waitForImport(bdClient));
}
async waitForExport(kickOffResponse, client) {
const manifest = await client.waitForExport(kickOffResponse, async (pct) => {
Expand All @@ -334,6 +352,7 @@ class ImportJob {
this.state.set("importProgress", 0);
await this.state.save();
const manifest = this.state.get("manifest");
const outcomes = this.state.get("outcome");
const len = manifest.output?.length;
let done = 0;
const now = new Date();
Expand Down Expand Up @@ -369,10 +388,12 @@ class ImportJob {
});
await upload.promise();
}
outcomes.push(new OperationOutcome_1.OperationOutcome(`File from ${file.url} imported successfully`, 200, "information"));
}
catch (ex) {
console.log(`Error handling file ${file.url}`);
console.error(ex);
outcomes.push(ex);
}
this.state.set("importProgress", Math.round((++done / len) * 100));
await this.state.save();
Expand Down
4 changes: 1 addition & 3 deletions build/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,7 @@ const config = {
destination: {
type: env.DESTINATION_TYPE,
options: {
bucketName: env.AWS_S3_BUCKET_NAME,
accessKeyId: env.AWS_ACCESS_KEY_ID,
accessKeySecret: env.AWS_SECRET_ACCESS_KEY
bucketName: env.AWS_S3_BUCKET_NAME
}
},
aws: {
Expand Down
5 changes: 2 additions & 3 deletions build/generator.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@ async function default_1(req, res) {
const store = node_jose_1.JWK.createKeyStore();
const settings = config[alg];
const key = await store.generate(settings.kty, settings.size, { alg });
const json = {
res.json({
jwks: store.toJSON(true),
publicAsJWK: key.toJSON(false),
publicAsPEM: key.toPEM(false),
privateAsJWK: key.toJSON(true),
privateAsPEM: key.toPEM(true)
};
res.json(json);
});
}
exports.default = default_1;
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"build": "tsc",
"build:watch": "tsc --watch",
"prestart": "npm run build",
"dev": "concurrently npm:build:watch npm:start:watch",
"dev": "npm run clean && npm run build && concurrently npm:build:watch npm:start:watch",
"clean": "rm -rf ./build"
},
"keywords": ["Bulk Data", "FHIR", "import", "SMART", "SMART Backend Services", "OAuth"],
Expand Down
31 changes: 24 additions & 7 deletions src/BulkDataClient.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import jose from "node-jose"
import got, { Got } from "got"
import got, { Got, Response } from "got"
import jwt from "jsonwebtoken"
import { Writable } from "stream"
import { Parameters } from "fhir/r4"
Expand Down Expand Up @@ -250,12 +250,7 @@ export class BulkDataClient
this.accessToken = await this.authorize()
}

const { body, statusCode, headers } = await this.request<BulkData.ExportManifest>(contentLocation, {
responseType: "json",
headers: {
authorization: `Bearer ${ this.accessToken }`
}
});
const { body, statusCode, headers } = await this.fetchExportManifest(contentLocation);

if (statusCode !== 200) {
onProgress && await onProgress(parseFloat(headers["x-progress"] + "" || "0"))
Expand All @@ -268,6 +263,28 @@ export class BulkDataClient
return body
}

/**
* This is used for both static and dynamic imports.
* - For static, `location` is the URL of the already available export
* manifest json.
* - For dynamic, `location` is the URL of the job status endpoint. If
* export is still in progress this will resolve with 202 responses and
* should be called again until status 200 is received
*/
async fetchExportManifest(location: string): Promise<Response<BulkData.ExportManifest>>
{
if (!this.accessToken) {
this.accessToken = await this.authorize()
}

return this.request<BulkData.ExportManifest>(location, {
responseType: "json",
headers: {
authorization: `Bearer ${ this.accessToken }`
}
});
}

downloadFile(descriptor: BulkData.ExportManifestFile)
{
const out = {
Expand Down
Loading

0 comments on commit 1dfd91a

Please sign in to comment.