From 88fdec760137082e803e80b376ab26ed3f247ce7 Mon Sep 17 00:00:00 2001 From: Manjunath Davanam Date: Tue, 6 Jul 2021 13:36:44 +0530 Subject: [PATCH 1/4] Issue DIK-5379 fix: Collection Summary V2 Job Issue fix --- .../job/report/CollectionSummaryJob.scala | 4 +- .../job/report/CollectionSummaryJobV2.scala | 38 ++++++++++--------- .../sunbird/analytics/util/CourseUtils.scala | 13 ++++++- 3 files changed, 33 insertions(+), 22 deletions(-) diff --git a/data-products/src/main/scala/org/sunbird/analytics/job/report/CollectionSummaryJob.scala b/data-products/src/main/scala/org/sunbird/analytics/job/report/CollectionSummaryJob.scala index fd1cdc62d..9ef17e476 100644 --- a/data-products/src/main/scala/org/sunbird/analytics/job/report/CollectionSummaryJob.scala +++ b/data-products/src/main/scala/org/sunbird/analytics/job/report/CollectionSummaryJob.scala @@ -120,7 +120,7 @@ object CollectionSummaryJob extends optional.Application with IJob with BaseRepo val searchFilter = config.modelParams.get.get("searchFilter").asInstanceOf[Option[Map[String, AnyRef]]]; val reportDF = if (searchFilter.isEmpty) { val courseIds = processedBatches.select(col("courseid")).distinct().collect().map(_ (0)).toList.asInstanceOf[List[String]] - val courseInfo = CourseUtils.getCourseInfo(courseIds, None, config.modelParams.get.getOrElse("maxlimit", 500).asInstanceOf[Int]).toDF("framework", "identifier", "name", "channel", "batches", "organisation", "status", "keywords") + val courseInfo = CourseUtils.getCourseInfo(courseIds, None, config.modelParams.get.getOrElse("maxlimit", 500).asInstanceOf[Int], None, None).toDF("framework", "identifier", "name", "channel", "batches", "organisation", "status", "keywords") JobLogger.log(s"Total courseInfo records ${courseInfo.count()}", None, INFO) processedBatches.join(courseInfo, processedBatches.col("courseid") === courseInfo.col("identifier"), "inner") .withColumn("collectionName", col("name")) @@ -195,7 +195,7 @@ object CollectionSummaryJob extends optional.Application with IJob with BaseRepo val courseBatchData = getCourseBatch(spark, fetchData) val filteredBatches = if (searchFilter.nonEmpty) { JobLogger.log("Generating reports only search query", None, INFO) - val collectionDF = CourseUtils.getCourseInfo(List(), Some(searchFilter.get), 0).toDF("framework", "identifier", "name", "channel", "batches", "organisation", "status", "keywords") + val collectionDF = CourseUtils.getCourseInfo(List(), Some(searchFilter.get), 0, None, None).toDF("framework", "identifier", "name", "channel", "batches", "organisation", "status", "keywords") .withColumnRenamed("name", "collectionName") .withColumn("publishedBy", concat_ws(", ", col("organisation"))) courseBatchData.join(collectionDF, courseBatchData("courseid") === collectionDF("identifier"), "inner") diff --git a/data-products/src/main/scala/org/sunbird/analytics/job/report/CollectionSummaryJobV2.scala b/data-products/src/main/scala/org/sunbird/analytics/job/report/CollectionSummaryJobV2.scala index 2d3b55be3..9bc1ee939 100644 --- a/data-products/src/main/scala/org/sunbird/analytics/job/report/CollectionSummaryJobV2.scala +++ b/data-products/src/main/scala/org/sunbird/analytics/job/report/CollectionSummaryJobV2.scala @@ -39,7 +39,7 @@ object CollectionSummaryJobV2 extends optional.Application with IJob with BaseRe init() try { val res = CommonUtil.time(prepareReport(spark, fetchData)) - saveToBlob(res._2, jobConfig) // Saving report to blob stroage + saveToBlob(res._2, jobConfig) // Saving report to blob storage JobLogger.log(s"Submitting Druid Ingestion Task", None, INFO) val ingestionSpecPath: String = jobConfig.modelParams.get.getOrElse("specPath", "").asInstanceOf[String] val druidIngestionUrl: String = jobConfig.modelParams.get.getOrElse("druidIngestionUrl", "http://localhost:8081/druid/indexer/v1/task").asInstanceOf[String] @@ -84,10 +84,23 @@ object CollectionSummaryJobV2 extends optional.Application with IJob with BaseRe when(col("certificates").isNotNull && size(col("certificates").cast("array>")) > 0 || col("issued_certificates").isNotNull && size(col("issued_certificates").cast("array>")) > 0, "Y").otherwise("N")) .withColumn("enrolleddate", UDFUtils.getLatestValue(col("enrolled_date"), col("enrolleddate"))) - .select(col("batchid"), col("userid"), col("courseid"), col("enrolleddate"), col("completedon"), col("status"), col("isCertified")) + .select(col("batchid"), col("userid"), col("courseid"), col("enrolleddate"), col("completedon"), col("isCertified")) .persist() } + def getContentMetaData(processBatches: DataFrame, spark: SparkSession)(implicit fc: FrameworkContext, config: JobConfig): DataFrame = { + import spark.implicits._ + val courseIds = processBatches.select(col("courseid")).distinct().map(f => f.getString(0)).collect.toList + JobLogger.log(s"Total distinct Course Id's ${courseIds.size}", None, INFO) + val courseInfo = CourseUtils.getCourseInfo(courseIds, None, config.modelParams.get.getOrElse("maxlimit", 50).asInstanceOf[Int], Option(config.modelParams.get.getOrElse("contentStatus", CourseUtils.defaultContentStatus.toList).asInstanceOf[List[String]].toArray), Option(config.modelParams.get.getOrElse("contentFields", CourseUtils.defaultContentFields.toList).asInstanceOf[List[String]].toArray)).toDF(contentFields: _*) + JobLogger.log(s"Total fetched records from content search ${courseInfo.count()}", None, INFO) + processBatches.join(courseInfo, processBatches.col("courseid") === courseInfo.col("identifier"), "inner") + .withColumn("collectionname", col("name")) + .withColumnRenamed("status", "contentstatus") + .withColumnRenamed("organisation", "contentorg") + .withColumnRenamed("createdFor", "createdfor") + } + def prepareReport(spark: SparkSession, fetchData: (SparkSession, Map[String, String], String, StructType) => DataFrame)(implicit fc: FrameworkContext, config: JobConfig): DataFrame = { implicit val sparkSession: SparkSession = spark implicit val sqlContext: SQLContext = spark.sqlContext @@ -96,21 +109,10 @@ object CollectionSummaryJobV2 extends optional.Application with IJob with BaseRe val processBatches: DataFrame = filterBatches(spark, fetchData, config) .join(getUserEnrollment(spark, fetchData), Seq("batchid", "courseid"), "left_outer") .join(userCachedDF, Seq("userid"), "inner") - val processedBatches = computeValues(processBatches) - val searchFilter = config.modelParams.get.get("searchFilter").asInstanceOf[Option[Map[String, AnyRef]]]; - val reportDF = if (null == searchFilter || searchFilter.isEmpty) { - val courseIds = processedBatches.select(col("courseid")).distinct().collect().map(_ (0)).toList.asInstanceOf[List[String]] - val courseInfo = CourseUtils.getCourseInfo(courseIds, None, config.modelParams.get.getOrElse("maxlimit", 500).asInstanceOf[Int]).toDF(contentFields:_*) - JobLogger.log(s"Total courseInfo records ${courseInfo.count()}", None, INFO) - processedBatches.join(courseInfo, processedBatches.col("courseid") === courseInfo.col("identifier"), "inner") - .withColumn("collectionname", col("name")) - .withColumnRenamed("status", "contentstatus") - .withColumnRenamed("organisation", "contentorg") - .withColumnRenamed("createdFor", "createdfor") - } else { - processedBatches - } - reportDF.select(filterColumns.head, filterColumns.tail: _*).persist() + val searchFilter = config.modelParams.get.get("searchFilter").asInstanceOf[Option[Map[String, AnyRef]]] + val reportDF: DataFrame = if (null == searchFilter || searchFilter.isEmpty) getContentMetaData(processBatches, spark) else processBatches + val processedBatches = computeValues(reportDF) + processedBatches.select(filterColumns.head, filterColumns.tail: _*).persist() } def computeValues(transformedDF: DataFrame): DataFrame = { @@ -157,7 +159,7 @@ object CollectionSummaryJobV2 extends optional.Application with IJob with BaseRe val courseBatchData = getCourseBatch(spark, fetchData) val filteredBatches = if (null != searchFilter && searchFilter.nonEmpty) { JobLogger.log("Generating reports only search query", None, INFO) - val collectionDF = CourseUtils.getCourseInfo(List(), Some(searchFilter.get), 0).toDF(contentFields: _*) + val collectionDF = CourseUtils.getCourseInfo(List(), Some(searchFilter.get), 0, None, None).toDF(contentFields: _*) .withColumnRenamed("name", "collectionname") .withColumnRenamed("status", "contentstatus") .withColumnRenamed("organisation", "contentorg") diff --git a/data-products/src/main/scala/org/sunbird/analytics/util/CourseUtils.scala b/data-products/src/main/scala/org/sunbird/analytics/util/CourseUtils.scala index 90c27e05f..68d1cc1f0 100644 --- a/data-products/src/main/scala/org/sunbird/analytics/util/CourseUtils.scala +++ b/data-products/src/main/scala/org/sunbird/analytics/util/CourseUtils.scala @@ -39,6 +39,8 @@ trait CourseReport { object CourseUtils { implicit val className: String = "org.sunbird.analytics.util.CourseUtils" + val defaultContentStatus: Array[String] = Array("Live", "Unlisted", "Retired") + val defaultContentFields: Array[String] = Array("identifier","name","organisation","channel","status","keywords","createdFor","medium","subject") def getCourse(config: Map[String, AnyRef])(implicit sc: SparkContext, fc: FrameworkContext, sqlContext: SQLContext): DataFrame = { import sqlContext.implicits._ @@ -129,12 +131,18 @@ object CourseUtils { } } - def getCourseInfo(courseIds: List[String], request: Option[Map[String, AnyRef]], maxSize: Int): List[CourseBatchInfo] = { + def getCourseInfo(courseIds: List[String], + request: Option[Map[String, AnyRef]], + maxSize: Int, + status: Option[Array[String]], + fields: Option[Array[String]] + ): List[CourseBatchInfo] = { if (courseIds.nonEmpty) { val subCourseIds = courseIds.grouped(maxSize).toList val responses = Future.traverse(subCourseIds)(ids => { JobLogger.log(s"Batch Size Invoke ${ids.size}", None, INFO) - fetchContents(JSONUtils.serialize(Map("request" -> Map("filters" -> Map("identifier" -> ids, "status" -> Array("Live")), "fields" -> Array("channel", "identifier", "name", "organisation"))))) + val query = JSONUtils.serialize(Map("request" -> Map("filters" -> Map("identifier" -> ids, "status" -> status.getOrElse(defaultContentStatus)), "fields" -> fields.getOrElse(defaultContentFields)))) + fetchContents(query) }) Await.result(responses, 60.seconds).flatten } else { @@ -148,6 +156,7 @@ object CourseUtils { val apiUrl = Constants.COMPOSITE_SEARCH_URL val response = RestUtil.post[CourseResponse](apiUrl, query) if (null != response && response.responseCode.equalsIgnoreCase("ok") && null != response.result.content && response.result.content.nonEmpty) { + JobLogger.log(s"Total content Identifiers Response Size ${response.result.content.size}", None, INFO) response.result.content } else List[CourseBatchInfo]() } From 0d28092dd2a25f90d799c99c138cd207ab4f4c55 Mon Sep 17 00:00:00 2001 From: Manjunath Davanam Date: Tue, 6 Jul 2021 13:38:53 +0530 Subject: [PATCH 2/4] Issue DIK-5379 fix: Collection Summary V2 Job Issue fix --- .../org/sunbird/analytics/job/report/CollectionSummaryJob.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-products/src/main/scala/org/sunbird/analytics/job/report/CollectionSummaryJob.scala b/data-products/src/main/scala/org/sunbird/analytics/job/report/CollectionSummaryJob.scala index 9ef17e476..585a38815 100644 --- a/data-products/src/main/scala/org/sunbird/analytics/job/report/CollectionSummaryJob.scala +++ b/data-products/src/main/scala/org/sunbird/analytics/job/report/CollectionSummaryJob.scala @@ -162,7 +162,7 @@ object CollectionSummaryJob extends optional.Application with IJob with BaseRepo basePath = modelParams.getOrElse("baseScriptPath", "/mount/data/analytics/tmp/").asInstanceOf[String], merge = MergeFiles(List( Map("deltaPath" -> s"${getReportName(keyword, reportPath, "summary-report-latest")}.csv", "reportPath" -> s"${getReportName(keyword, reportPath, "summary-report-latest")}.csv"), - Map("deltaPath" -> s"${getReportName(keyword, reportPath, s"summary-report-$getDate")}.csv", "reportPath" -> s"${getReportName(keyword, reportPath, s"summary-report-$getDate")}.csv")), List() + Map("deltaPath" -> s"${getReportName(keyword, reportPath s"summary-report-$getDate")}.csv", "reportPath" -> s"${getReportName(keyword, reportPath, s"summary-report-$getDate")}.csv")), List() ), container = container, postContainer = Some(container) From 8d085c5301ed20662ac4ccfffc8f007754905f53 Mon Sep 17 00:00:00 2001 From: Manjunath Davanam Date: Tue, 6 Jul 2021 13:39:02 +0530 Subject: [PATCH 3/4] Issue DIK-5379 fix: Collection Summary V2 Job Issue fix --- .../org/sunbird/analytics/job/report/CollectionSummaryJob.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-products/src/main/scala/org/sunbird/analytics/job/report/CollectionSummaryJob.scala b/data-products/src/main/scala/org/sunbird/analytics/job/report/CollectionSummaryJob.scala index 585a38815..9ef17e476 100644 --- a/data-products/src/main/scala/org/sunbird/analytics/job/report/CollectionSummaryJob.scala +++ b/data-products/src/main/scala/org/sunbird/analytics/job/report/CollectionSummaryJob.scala @@ -162,7 +162,7 @@ object CollectionSummaryJob extends optional.Application with IJob with BaseRepo basePath = modelParams.getOrElse("baseScriptPath", "/mount/data/analytics/tmp/").asInstanceOf[String], merge = MergeFiles(List( Map("deltaPath" -> s"${getReportName(keyword, reportPath, "summary-report-latest")}.csv", "reportPath" -> s"${getReportName(keyword, reportPath, "summary-report-latest")}.csv"), - Map("deltaPath" -> s"${getReportName(keyword, reportPath s"summary-report-$getDate")}.csv", "reportPath" -> s"${getReportName(keyword, reportPath, s"summary-report-$getDate")}.csv")), List() + Map("deltaPath" -> s"${getReportName(keyword, reportPath, s"summary-report-$getDate")}.csv", "reportPath" -> s"${getReportName(keyword, reportPath, s"summary-report-$getDate")}.csv")), List() ), container = container, postContainer = Some(container) From 2baf21cbc6387642b19874dcfa4370de2cd66d7d Mon Sep 17 00:00:00 2001 From: Manjunath Davanam Date: Tue, 6 Jul 2021 13:52:17 +0530 Subject: [PATCH 4/4] Issue DIK-5379 fix: Added batchsize config --- .../org/sunbird/analytics/job/report/CollectionSummaryJob.scala | 2 +- .../sunbird/analytics/job/report/CollectionSummaryJobV2.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/data-products/src/main/scala/org/sunbird/analytics/job/report/CollectionSummaryJob.scala b/data-products/src/main/scala/org/sunbird/analytics/job/report/CollectionSummaryJob.scala index 9ef17e476..ccff22d56 100644 --- a/data-products/src/main/scala/org/sunbird/analytics/job/report/CollectionSummaryJob.scala +++ b/data-products/src/main/scala/org/sunbird/analytics/job/report/CollectionSummaryJob.scala @@ -120,7 +120,7 @@ object CollectionSummaryJob extends optional.Application with IJob with BaseRepo val searchFilter = config.modelParams.get.get("searchFilter").asInstanceOf[Option[Map[String, AnyRef]]]; val reportDF = if (searchFilter.isEmpty) { val courseIds = processedBatches.select(col("courseid")).distinct().collect().map(_ (0)).toList.asInstanceOf[List[String]] - val courseInfo = CourseUtils.getCourseInfo(courseIds, None, config.modelParams.get.getOrElse("maxlimit", 500).asInstanceOf[Int], None, None).toDF("framework", "identifier", "name", "channel", "batches", "organisation", "status", "keywords") + val courseInfo = CourseUtils.getCourseInfo(courseIds, None, config.modelParams.get.getOrElse("batchSize", 500).asInstanceOf[Int], None, None).toDF("framework", "identifier", "name", "channel", "batches", "organisation", "status", "keywords") JobLogger.log(s"Total courseInfo records ${courseInfo.count()}", None, INFO) processedBatches.join(courseInfo, processedBatches.col("courseid") === courseInfo.col("identifier"), "inner") .withColumn("collectionName", col("name")) diff --git a/data-products/src/main/scala/org/sunbird/analytics/job/report/CollectionSummaryJobV2.scala b/data-products/src/main/scala/org/sunbird/analytics/job/report/CollectionSummaryJobV2.scala index 9bc1ee939..5a1488c6c 100644 --- a/data-products/src/main/scala/org/sunbird/analytics/job/report/CollectionSummaryJobV2.scala +++ b/data-products/src/main/scala/org/sunbird/analytics/job/report/CollectionSummaryJobV2.scala @@ -92,7 +92,7 @@ object CollectionSummaryJobV2 extends optional.Application with IJob with BaseRe import spark.implicits._ val courseIds = processBatches.select(col("courseid")).distinct().map(f => f.getString(0)).collect.toList JobLogger.log(s"Total distinct Course Id's ${courseIds.size}", None, INFO) - val courseInfo = CourseUtils.getCourseInfo(courseIds, None, config.modelParams.get.getOrElse("maxlimit", 50).asInstanceOf[Int], Option(config.modelParams.get.getOrElse("contentStatus", CourseUtils.defaultContentStatus.toList).asInstanceOf[List[String]].toArray), Option(config.modelParams.get.getOrElse("contentFields", CourseUtils.defaultContentFields.toList).asInstanceOf[List[String]].toArray)).toDF(contentFields: _*) + val courseInfo = CourseUtils.getCourseInfo(courseIds, None, config.modelParams.get.getOrElse("batchSize", 50).asInstanceOf[Int], Option(config.modelParams.get.getOrElse("contentStatus", CourseUtils.defaultContentStatus.toList).asInstanceOf[List[String]].toArray), Option(config.modelParams.get.getOrElse("contentFields", CourseUtils.defaultContentFields.toList).asInstanceOf[List[String]].toArray)).toDF(contentFields: _*) JobLogger.log(s"Total fetched records from content search ${courseInfo.count()}", None, INFO) processBatches.join(courseInfo, processBatches.col("courseid") === courseInfo.col("identifier"), "inner") .withColumn("collectionname", col("name"))