Skip to content

Commit

Permalink
#204 stream analysis and load GC on rxjs subscribers
Browse files Browse the repository at this point in the history
* #204 debug

* #204 cleanup

* #204 cleanup

* #204 subscriber GC

* #204 cleanup
  • Loading branch information
arawinters authored Sep 8, 2021
1 parent c35f909 commit 37a92b9
Showing 1 changed file with 32 additions and 8 deletions.
40 changes: 32 additions & 8 deletions src/app/services/admin.bulk-data.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ export class AdminBulkDataService {

// read file contents as stream
// parse to array of records
this.parseRecordsFromFile(file, (status) => {
let _onStreamRecordParserListener = this.parseRecordsFromFile(file, (status) => {
// on stream complete, do thing
//console.log('SzBulkDataService.streamAnalyze: file stream read complete.', status);
readStreamComplete = true;
Expand All @@ -849,7 +849,7 @@ export class AdminBulkDataService {
);

// when ANYTHING changes, update the singleton "currentAnalysisResult" var so components can read status
retObs.subscribe((summary: AdminStreamAnalysisSummary) => {
let _onAdminStreamAnalysisSummaryChangedListener = retObs.subscribe((summary: AdminStreamAnalysisSummary) => {
this.currentAnalysisResult = summary;
});

Expand All @@ -867,7 +867,7 @@ export class AdminBulkDataService {
});

// on end of records queue double-check if whole thing is complete
this.onStreamAnalysisComplete.pipe(
let _onStreamAnalysisCompleteListener = this.onStreamAnalysisComplete.pipe(
takeUntil(this.streamAnalysisAbort$),
filter((summary: AdminStreamAnalysisSummary) => { return summary !== undefined;}),
take(1),
Expand All @@ -880,13 +880,23 @@ export class AdminBulkDataService {
//this.onAnalysisChange.next( this.currentAnalysisResult );
})
).subscribe((summary: AdminStreamAnalysisSummary) => {
//console.log('onStreamAnalysisComplete: ', summary);
if(readStreamComplete && summary) {
// set this to true to end batching loop Observeable
//console.warn('stream load complete 2', summary);
summary.complete = true;
} else {
//console.warn('stream analysis complete 2', readStreamComplete, summary);
}
if(_onStreamAnalysisCompleteListener && _onStreamAnalysisCompleteListener.unsubscribe) {
_onStreamAnalysisCompleteListener.unsubscribe();
}
if(_onStreamRecordParserListener && _onStreamRecordParserListener.unsubscribe) {
_onStreamRecordParserListener.unsubscribe();
}
if(_onAdminStreamAnalysisSummaryChangedListener && _onAdminStreamAnalysisSummaryChangedListener.unsubscribe) {
_onAdminStreamAnalysisSummaryChangedListener.unsubscribe();
}
});
return retObs;
}
Expand Down Expand Up @@ -976,7 +986,7 @@ export class AdminBulkDataService {
//} else {
// console.log('SzBulkDataService.streamLoad: websocket thinks its still connected: ', this.webSocketService.connected, this.streamConnectionProperties);
//}
this.webSocketService.onMessageRecieved.pipe(
let _onWSMessageRecievedListener = this.webSocketService.onMessageRecieved.pipe(
filter( data => { return data !== undefined}),
map( data => { return (data as AdminStreamLoadSummary) })
).subscribe((data: AdminStreamLoadSummary) => {
Expand All @@ -991,9 +1001,11 @@ export class AdminBulkDataService {
//if(data && data.status === 'COMPLETED' && summary.sentRecordCount === summary.recordCount) {
// all data sent
summary.complete = true;
} else {
summary.complete = false;
}
if(readStreamComplete && sendStreamComplete && summary.complete === true) {
//console.warn('sending _onStreamLoadComplete: ', summary);
//console.warn('sending _onStreamLoadComplete: ', summary, data);
this._onStreamLoadComplete.next(summary);
} else {
//console.log('stream not complete', readStreamComplete, sendStreamComplete, summary.complete);
Expand Down Expand Up @@ -1039,7 +1051,7 @@ export class AdminBulkDataService {

// read file contents as stream
// parse to array of records
this.parseRecordsFromFile(file, (status) => {
let _onStreamRecordParserListener = this.parseRecordsFromFile(file, (status) => {
// on stream complete, do thing
//console.warn('SzBulkDataService.streamLoad: file stream read complete.', status);
readStreamComplete = true;
Expand Down Expand Up @@ -1126,7 +1138,7 @@ export class AdminBulkDataService {
//}

// when ANYTHING changes, update the singleton "currentLoadResult" var so components can read status
retObs.subscribe((summary: AdminStreamLoadSummary) => {
let _onAdminStreamLoadSummaryChangedListener = retObs.subscribe((summary: AdminStreamLoadSummary) => {
this.currentLoadResult = summary;
});

Expand Down Expand Up @@ -1170,7 +1182,7 @@ export class AdminBulkDataService {
});*/

// on end of records queue double-check if whole thing is complete
this._onStreamLoadComplete.pipe(
let _onStreamLoadCompleteListener = this._onStreamLoadComplete.pipe(
takeUntil(this.streamLoadAbort$),
filter((summary: AdminStreamLoadSummary) => { return summary !== undefined;}),
take(5),
Expand All @@ -1180,6 +1192,18 @@ export class AdminBulkDataService {
//setTimeout(() => {
console.log('closing connection: all data sent', summary);
this.webSocketService.disconnect();
if(_onWSMessageRecievedListener && _onWSMessageRecievedListener.unsubscribe) {
_onWSMessageRecievedListener.unsubscribe();
}
if(_onStreamLoadCompleteListener && _onStreamLoadCompleteListener.unsubscribe) {
_onStreamLoadCompleteListener.unsubscribe();
}
if(_onStreamRecordParserListener && _onStreamRecordParserListener.unsubscribe) {
_onStreamRecordParserListener.unsubscribe();
}
if(_onAdminStreamLoadSummaryChangedListener && _onAdminStreamLoadSummaryChangedListener.unsubscribe) {
_onAdminStreamLoadSummaryChangedListener.unsubscribe();
}
//}, 3000)
});

Expand Down

0 comments on commit 37a92b9

Please sign in to comment.