Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DIK-5379 - Missing records in the collection summary v2 report issue fix #417

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ object CollectionSummaryJob extends optional.Application with IJob with BaseRepo
val searchFilter = config.modelParams.get.get("searchFilter").asInstanceOf[Option[Map[String, AnyRef]]];
val reportDF = if (searchFilter.isEmpty) {
val courseIds = processedBatches.select(col("courseid")).distinct().collect().map(_ (0)).toList.asInstanceOf[List[String]]
val courseInfo = CourseUtils.getCourseInfo(courseIds, None, config.modelParams.get.getOrElse("maxlimit", 500).asInstanceOf[Int]).toDF("framework", "identifier", "name", "channel", "batches", "organisation", "status", "keywords")
val courseInfo = CourseUtils.getCourseInfo(courseIds, None, config.modelParams.get.getOrElse("batchSize", 500).asInstanceOf[Int], None, None).toDF("framework", "identifier", "name", "channel", "batches", "organisation", "status", "keywords")
JobLogger.log(s"Total courseInfo records ${courseInfo.count()}", None, INFO)
processedBatches.join(courseInfo, processedBatches.col("courseid") === courseInfo.col("identifier"), "inner")
.withColumn("collectionName", col("name"))
Expand Down Expand Up @@ -195,7 +195,7 @@ object CollectionSummaryJob extends optional.Application with IJob with BaseRepo
val courseBatchData = getCourseBatch(spark, fetchData)
val filteredBatches = if (searchFilter.nonEmpty) {
JobLogger.log("Generating reports only search query", None, INFO)
val collectionDF = CourseUtils.getCourseInfo(List(), Some(searchFilter.get), 0).toDF("framework", "identifier", "name", "channel", "batches", "organisation", "status", "keywords")
val collectionDF = CourseUtils.getCourseInfo(List(), Some(searchFilter.get), 0, None, None).toDF("framework", "identifier", "name", "channel", "batches", "organisation", "status", "keywords")
.withColumnRenamed("name", "collectionName")
.withColumn("publishedBy", concat_ws(", ", col("organisation")))
courseBatchData.join(collectionDF, courseBatchData("courseid") === collectionDF("identifier"), "inner")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ object CollectionSummaryJobV2 extends optional.Application with IJob with BaseRe
init()
try {
val res = CommonUtil.time(prepareReport(spark, fetchData))
saveToBlob(res._2, jobConfig) // Saving report to blob stroage
saveToBlob(res._2, jobConfig) // Saving report to blob storage
JobLogger.log(s"Submitting Druid Ingestion Task", None, INFO)
val ingestionSpecPath: String = jobConfig.modelParams.get.getOrElse("specPath", "").asInstanceOf[String]
val druidIngestionUrl: String = jobConfig.modelParams.get.getOrElse("druidIngestionUrl", "http://localhost:8081/druid/indexer/v1/task").asInstanceOf[String]
Expand Down Expand Up @@ -84,10 +84,23 @@ object CollectionSummaryJobV2 extends optional.Application with IJob with BaseRe
when(col("certificates").isNotNull && size(col("certificates").cast("array<map<string, string>>")) > 0
|| col("issued_certificates").isNotNull && size(col("issued_certificates").cast("array<map<string, string>>")) > 0, "Y").otherwise("N"))
.withColumn("enrolleddate", UDFUtils.getLatestValue(col("enrolled_date"), col("enrolleddate")))
.select(col("batchid"), col("userid"), col("courseid"), col("enrolleddate"), col("completedon"), col("status"), col("isCertified"))
.select(col("batchid"), col("userid"), col("courseid"), col("enrolleddate"), col("completedon"), col("isCertified"))
.persist()
}

def getContentMetaData(processBatches: DataFrame, spark: SparkSession)(implicit fc: FrameworkContext, config: JobConfig): DataFrame = {
import spark.implicits._
val courseIds = processBatches.select(col("courseid")).distinct().map(f => f.getString(0)).collect.toList
JobLogger.log(s"Total distinct Course Id's ${courseIds.size}", None, INFO)
val courseInfo = CourseUtils.getCourseInfo(courseIds, None, config.modelParams.get.getOrElse("batchSize", 50).asInstanceOf[Int], Option(config.modelParams.get.getOrElse("contentStatus", CourseUtils.defaultContentStatus.toList).asInstanceOf[List[String]].toArray), Option(config.modelParams.get.getOrElse("contentFields", CourseUtils.defaultContentFields.toList).asInstanceOf[List[String]].toArray)).toDF(contentFields: _*)
JobLogger.log(s"Total fetched records from content search ${courseInfo.count()}", None, INFO)
processBatches.join(courseInfo, processBatches.col("courseid") === courseInfo.col("identifier"), "inner")
.withColumn("collectionname", col("name"))
.withColumnRenamed("status", "contentstatus")
.withColumnRenamed("organisation", "contentorg")
.withColumnRenamed("createdFor", "createdfor")
}

def prepareReport(spark: SparkSession, fetchData: (SparkSession, Map[String, String], String, StructType) => DataFrame)(implicit fc: FrameworkContext, config: JobConfig): DataFrame = {
implicit val sparkSession: SparkSession = spark
implicit val sqlContext: SQLContext = spark.sqlContext
Expand All @@ -96,21 +109,10 @@ object CollectionSummaryJobV2 extends optional.Application with IJob with BaseRe
val processBatches: DataFrame = filterBatches(spark, fetchData, config)
.join(getUserEnrollment(spark, fetchData), Seq("batchid", "courseid"), "left_outer")
.join(userCachedDF, Seq("userid"), "inner")
val processedBatches = computeValues(processBatches)
val searchFilter = config.modelParams.get.get("searchFilter").asInstanceOf[Option[Map[String, AnyRef]]];
val reportDF = if (null == searchFilter || searchFilter.isEmpty) {
val courseIds = processedBatches.select(col("courseid")).distinct().collect().map(_ (0)).toList.asInstanceOf[List[String]]
val courseInfo = CourseUtils.getCourseInfo(courseIds, None, config.modelParams.get.getOrElse("maxlimit", 500).asInstanceOf[Int]).toDF(contentFields:_*)
JobLogger.log(s"Total courseInfo records ${courseInfo.count()}", None, INFO)
processedBatches.join(courseInfo, processedBatches.col("courseid") === courseInfo.col("identifier"), "inner")
.withColumn("collectionname", col("name"))
.withColumnRenamed("status", "contentstatus")
.withColumnRenamed("organisation", "contentorg")
.withColumnRenamed("createdFor", "createdfor")
} else {
processedBatches
}
reportDF.select(filterColumns.head, filterColumns.tail: _*).persist()
val searchFilter = config.modelParams.get.get("searchFilter").asInstanceOf[Option[Map[String, AnyRef]]]
val reportDF: DataFrame = if (null == searchFilter || searchFilter.isEmpty) getContentMetaData(processBatches, spark) else processBatches
val processedBatches = computeValues(reportDF)
processedBatches.select(filterColumns.head, filterColumns.tail: _*).persist()
}

def computeValues(transformedDF: DataFrame): DataFrame = {
Expand Down Expand Up @@ -157,7 +159,7 @@ object CollectionSummaryJobV2 extends optional.Application with IJob with BaseRe
val courseBatchData = getCourseBatch(spark, fetchData)
val filteredBatches = if (null != searchFilter && searchFilter.nonEmpty) {
JobLogger.log("Generating reports only search query", None, INFO)
val collectionDF = CourseUtils.getCourseInfo(List(), Some(searchFilter.get), 0).toDF(contentFields: _*)
val collectionDF = CourseUtils.getCourseInfo(List(), Some(searchFilter.get), 0, None, None).toDF(contentFields: _*)
.withColumnRenamed("name", "collectionname")
.withColumnRenamed("status", "contentstatus")
.withColumnRenamed("organisation", "contentorg")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ trait CourseReport {
object CourseUtils {

implicit val className: String = "org.sunbird.analytics.util.CourseUtils"
val defaultContentStatus: Array[String] = Array("Live", "Unlisted", "Retired")
val defaultContentFields: Array[String] = Array("identifier","name","organisation","channel","status","keywords","createdFor","medium","subject")

def getCourse(config: Map[String, AnyRef])(implicit sc: SparkContext, fc: FrameworkContext, sqlContext: SQLContext): DataFrame = {
import sqlContext.implicits._
Expand Down Expand Up @@ -129,12 +131,18 @@ object CourseUtils {
}
}

def getCourseInfo(courseIds: List[String], request: Option[Map[String, AnyRef]], maxSize: Int): List[CourseBatchInfo] = {
def getCourseInfo(courseIds: List[String],
request: Option[Map[String, AnyRef]],
maxSize: Int,
status: Option[Array[String]],
fields: Option[Array[String]]
): List[CourseBatchInfo] = {
if (courseIds.nonEmpty) {
val subCourseIds = courseIds.grouped(maxSize).toList
val responses = Future.traverse(subCourseIds)(ids => {
JobLogger.log(s"Batch Size Invoke ${ids.size}", None, INFO)
fetchContents(JSONUtils.serialize(Map("request" -> Map("filters" -> Map("identifier" -> ids, "status" -> Array("Live")), "fields" -> Array("channel", "identifier", "name", "organisation")))))
val query = JSONUtils.serialize(Map("request" -> Map("filters" -> Map("identifier" -> ids, "status" -> status.getOrElse(defaultContentStatus)), "fields" -> fields.getOrElse(defaultContentFields))))
fetchContents(query)
})
Await.result(responses, 60.seconds).flatten
} else {
Expand All @@ -148,6 +156,7 @@ object CourseUtils {
val apiUrl = Constants.COMPOSITE_SEARCH_URL
val response = RestUtil.post[CourseResponse](apiUrl, query)
if (null != response && response.responseCode.equalsIgnoreCase("ok") && null != response.result.content && response.result.content.nonEmpty) {
JobLogger.log(s"Total content Identifiers Response Size ${response.result.content.size}", None, INFO)
response.result.content
} else List[CourseBatchInfo]()
}
Expand Down