diff --git a/data-products/src/main/scala/org/sunbird/analytics/exhaust/collection/BaseCollectionExhaustJob.scala b/data-products/src/main/scala/org/sunbird/analytics/exhaust/collection/BaseCollectionExhaustJob.scala index 876feea95..d9c4a7f97 100644 --- a/data-products/src/main/scala/org/sunbird/analytics/exhaust/collection/BaseCollectionExhaustJob.scala +++ b/data-products/src/main/scala/org/sunbird/analytics/exhaust/collection/BaseCollectionExhaustJob.scala @@ -280,9 +280,8 @@ trait BaseCollectionExhaustJob extends BaseReportsJob with IJob with OnDemandExh .join(userCachedDF, Seq("userid"), "inner") .withColumn("collectionName", lit(batch.collectionName)) .withColumn("batchName", lit(batch.batchName)) - val filteredDF = filterUsers(batch, userEnrolmentBatchDF) - .repartition(AppConf.getConfig("exhaust.user.parallelism").toInt, - col("userid"),col("courseid"),col("batchid")).persist(); + .repartition(AppConf.getConfig("exhaust.user.parallelism").toInt,col("userid"),col("courseid"),col("batchid")) + val filteredDF = filterUsers(batch, userEnrolmentBatchDF).persist() val res = CommonUtil.time(filteredDF.count); JobLogger.log("Time to fetch batch enrolment", Some(Map("timeTaken" -> res._1, "count" -> res._2)), INFO) try { @@ -297,14 +296,14 @@ trait BaseCollectionExhaustJob extends BaseReportsJob with IJob with OnDemandExh processedCount = processedCount + 1 processedSize = processedSize + newFileSize unpersistDFs(); - filteredDF.unpersist(true); - userEnrolmentDf.unpersist(true); + filteredDF.unpersist(true) } } else { CollectionBatchResponse("", "", "PROCESSING", "", 0, 0); } } + userEnrolmentDf.unpersist(true); batchResponseList } }.flatMap(f=>f)