From d336d2e86c4da9f46c5491355093e3e89e3119c1 Mon Sep 17 00:00:00 2001 From: RevathiKotla Date: Fri, 2 Jul 2021 18:12:52 +0530 Subject: [PATCH] Issue SC-000: Fix the persist code issue --- .../exhaust/collection/BaseCollectionExhaustJob.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/data-products/src/main/scala/org/sunbird/analytics/exhaust/collection/BaseCollectionExhaustJob.scala b/data-products/src/main/scala/org/sunbird/analytics/exhaust/collection/BaseCollectionExhaustJob.scala index 876feea95..d9c4a7f97 100644 --- a/data-products/src/main/scala/org/sunbird/analytics/exhaust/collection/BaseCollectionExhaustJob.scala +++ b/data-products/src/main/scala/org/sunbird/analytics/exhaust/collection/BaseCollectionExhaustJob.scala @@ -280,9 +280,8 @@ trait BaseCollectionExhaustJob extends BaseReportsJob with IJob with OnDemandExh .join(userCachedDF, Seq("userid"), "inner") .withColumn("collectionName", lit(batch.collectionName)) .withColumn("batchName", lit(batch.batchName)) - val filteredDF = filterUsers(batch, userEnrolmentBatchDF) - .repartition(AppConf.getConfig("exhaust.user.parallelism").toInt, - col("userid"),col("courseid"),col("batchid")).persist(); + .repartition(AppConf.getConfig("exhaust.user.parallelism").toInt,col("userid"),col("courseid"),col("batchid")) + val filteredDF = filterUsers(batch, userEnrolmentBatchDF).persist() val res = CommonUtil.time(filteredDF.count); JobLogger.log("Time to fetch batch enrolment", Some(Map("timeTaken" -> res._1, "count" -> res._2)), INFO) try { @@ -297,14 +296,14 @@ trait BaseCollectionExhaustJob extends BaseReportsJob with IJob with OnDemandExh processedCount = processedCount + 1 processedSize = processedSize + newFileSize unpersistDFs(); - filteredDF.unpersist(true); - userEnrolmentDf.unpersist(true); + filteredDF.unpersist(true) } } else { CollectionBatchResponse("", "", "PROCESSING", "", 0, 0); } } + userEnrolmentDf.unpersist(true); batchResponseList } }.flatMap(f=>f)