diff --git a/data-products/src/main/scala/org/sunbird/analytics/audit/AssessmentScoreCorrectionJob.scala b/data-products/src/main/scala/org/sunbird/analytics/audit/AssessmentScoreCorrectionJob.scala new file mode 100644 index 000000000..d68e05a62 --- /dev/null +++ b/data-products/src/main/scala/org/sunbird/analytics/audit/AssessmentScoreCorrectionJob.scala @@ -0,0 +1,201 @@ +package org.sunbird.analytics.audit + +import com.datastax.spark.connector.cql.CassandraConnectorConf +import com.datastax.spark.connector.{SomeColumns, toRDDFunctions} +import org.apache.commons.lang3.StringUtils +import org.apache.spark.SparkContext +import org.apache.spark.sql.cassandra._ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.ekstep.analytics.framework.Level.{ERROR, INFO} +import org.ekstep.analytics.framework.conf.AppConf +import org.ekstep.analytics.framework.util.{CommonUtil, JSONUtils, JobLogger, RestUtil} +import org.ekstep.analytics.framework.{FrameworkContext, IJob, JobConfig} +import org.sunbird.analytics.exhaust.BaseReportsJob + +import java.io._ +import scala.collection.immutable.List + +case class ContentResponse(result: ContentResult, responseCode: String) + +case class ContentResult(content: Map[String, AnyRef]) + +case class AssessmentCorrectionMetrics(batchId: String, contentId: String, invalidRecords: Long, totalAffectedUsers: Long, contentTotalQuestions: Long) + +case class ContentMeta(totalQuestions: Int, contentType: String, contentId: String) + + +object AssessmentScoreCorrectionJob extends optional.Application with IJob with BaseReportsJob { + implicit val className: String = "org.sunbird.analytics.audit.AssessmentScoreCorrectionJob" + val cassandraFormat = "org.apache.spark.sql.cassandra" + private val assessmentAggDBSettings = Map("table" -> "assessment_aggregator", "keyspace" -> AppConf.getConfig("sunbird.courses.keyspace"), "cluster" -> "LMSCluster") + private val userEnrolmentDBSettings = Map("table" -> "user_enrolments", "keyspace" -> AppConf.getConfig("sunbird.courses.keyspace"), "cluster" -> "ReportCluster") + + // $COVERAGE-OFF$ Disabling scoverage for main and execute method + override def main(config: String)(implicit sc: Option[SparkContext], fc: Option[FrameworkContext]): Unit = { + val jobName: String = "AssessmentScoreCorrectionJob" + implicit val jobConfig: JobConfig = JSONUtils.deserialize[JobConfig](config) + JobLogger.init(jobName) + JobLogger.start(s"$jobName started executing", Option(Map("config" -> config, "model" -> jobName))) + implicit val frameworkContext: FrameworkContext = getReportingFrameworkContext() + implicit val spark: SparkSession = openSparkSession(jobConfig) + implicit val sc: SparkContext = spark.sparkContext + try { + spark.setCassandraConf("LMSCluster", CassandraConnectorConf.ConnectionHostParam.option(AppConf.getConfig("sunbird.courses.cluster.host"))) + spark.setCassandraConf("ReportCluster", CassandraConnectorConf.ConnectionHostParam.option(AppConf.getConfig("sunbird.report.cluster.host"))) + val res = CommonUtil.time(processBatches()) + JobLogger.end(s"$jobName completed execution", "SUCCESS", Option(Map("time_taken" -> res._1, "processed_batches" -> res._2))) + } catch { + case ex: Exception => + JobLogger.log(ex.getMessage, None, ERROR) + JobLogger.end(s"$jobName execution failed", "FAILED", Option(Map("model" -> jobName, "statusMsg" -> ex.getMessage))); + } + finally { + frameworkContext.closeContext() + spark.close() + } + } + + // $COVERAGE-ON$ Enabling scoverage + def processBatches()(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig, sc: SparkContext): List[List[AssessmentCorrectionMetrics]] = { + val modelParams = config.modelParams.getOrElse(Map[String, Option[AnyRef]]()) + val batchIds: List[String] = modelParams.getOrElse("assessment.score.correction.batches", List()).asInstanceOf[List[String]].filter(x => x.nonEmpty) + val isDryRunMode = modelParams.getOrElse("isDryRunMode", true).asInstanceOf[Boolean] + for (batchId <- batchIds) yield { + JobLogger.log("Started Processing the Batch", Option(Map("batch_id" -> batchId, "isDryRunMode" -> isDryRunMode)), INFO) + process(batchId = batchId, isDryRunMode = isDryRunMode) + } + } + + def process(batchId: String, + isDryRunMode: Boolean)(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig, sc: SparkContext): List[AssessmentCorrectionMetrics] = { + val modelParams = config.modelParams.getOrElse(Map[String, Option[AnyRef]]()) + val assessmentData: DataFrame = getAssessmentAggData(batchId = batchId) + .withColumn("question_data", explode_outer(col("question"))) + .withColumn("question_id", col("question_data.id")) + .select("course_id", "batch_id", "content_id", "attempt_id", "user_id", "question_id") + .groupBy("course_id", "batch_id", "content_id", "attempt_id", "user_id").agg(size(collect_list("question_id")).as("total_question")).persist() + val userEnrolmentDF = getUserEnrolment(batchId = batchId).persist() + val contentIds: List[String] = assessmentData.select("content_id").distinct().collect().map(_ (0)).toList.asInstanceOf[List[String]] + val res = for (contentId <- contentIds) yield { + val contentMetaURL: String = modelParams.getOrElse("contentReadAPI", "https://diksha.gov.in/api/content/v1/read/").asInstanceOf[String] + val supportedContentType: String = modelParams.getOrElse("supportedContentType", "SelfAssess").asInstanceOf[String] + val contentMeta: ContentMeta = getTotalQuestions(contentId, contentMetaURL) + JobLogger.log("Fetched the content meta value to the processing batch", Option(contentMeta), INFO) + if (StringUtils.equals(contentMeta.contentType, supportedContentType)) { + correctData(assessmentData, userEnrolmentDF, batchId, contentMeta.contentId, contentMeta.totalQuestions, isDryRunMode) + } else { + JobLogger.log("The content ID is not self assess, Skipping data removal process", Some(Map("contentId" -> contentId, "contentType" -> contentMeta.contentType)), INFO) + AssessmentCorrectionMetrics(batchId = batchId, contentId = contentId, invalidRecords = 0, totalAffectedUsers = 0, contentTotalQuestions = contentMeta.totalQuestions) + } + } + userEnrolmentDF.unpersist() + res + } + + def correctData(assessmentDF: DataFrame, + userEnrolDF: DataFrame, + batchId: String, + contentId: String, + totalQuestions: Int, + isDryRunMode: Boolean + )(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig, sc: SparkContext): AssessmentCorrectionMetrics = { + val incorrectRecords: DataFrame = assessmentDF.filter(col("content_id") === contentId).filter(col("total_question") =!= totalQuestions) + .select("course_id", "batch_id", "content_id", "attempt_id", "user_id") + val incorrectRecordsSize: Long = incorrectRecords.count() + val metrics = AssessmentCorrectionMetrics(batchId = batchId, contentId = contentId, invalidRecords = incorrectRecordsSize, totalAffectedUsers = incorrectRecords.select("user_id").distinct().count(), contentTotalQuestions = totalQuestions) + JobLogger.log("Total Incorrect Records", Option(metrics), INFO) + if (incorrectRecordsSize > 0) { + removeAssessmentRecords(incorrectRecords, batchId, contentId, isDryRunMode) + generateInstructionEvents(incorrectRecords, batchId, contentId) + saveRevokingCertIds(incorrectRecords, userEnrolDF, batchId, contentId) + } + metrics + } + + + def saveRevokingCertIds(incorrectRecords: DataFrame, + userEnrolmentDF: DataFrame, + batchId: String, + contentId: String)(implicit config: JobConfig): Unit = { + val certIdsDF: DataFrame = incorrectRecords.select("course_id", "batch_id", "content_id", "user_id").distinct() + .join(userEnrolmentDF, incorrectRecords.col("user_id") === userEnrolmentDF.col("userid") && incorrectRecords.col("course_id") === userEnrolmentDF.col("courseid") && + incorrectRecords.col("batch_id") === userEnrolmentDF.col("batchid"), "left_outer") + .withColumn("certificate_data", explode_outer(col("issued_certificates"))) + .withColumn("certificate_id", col("certificate_data.identifier")) + .select("courseid", "batchid", "userid", "certificate_id").filter(col("certificate_id") =!= "") + saveLocal(certIdsDF, batchId, contentId = contentId, "revoked-cert-data") + } + + def saveLocal(data: DataFrame, + batchId: String, + contentId: String, + folderName: String)(implicit config: JobConfig): Unit = { + JobLogger.log("Generating the CSV File", Option(Map("batch_id" -> batchId, "content_id" -> contentId)), INFO) + val outputPath = config.modelParams.getOrElse(Map[String, Option[AnyRef]]()).getOrElse("csvPath", "").asInstanceOf[String] + data.repartition(1).write.option("header", value = true).format("com.databricks.spark.csv").save(outputPath.concat(s"/$folderName-$batchId-$contentId-${System.currentTimeMillis()}.csv")) + } + + + def removeAssessmentRecords(incorrectRecords: DataFrame, + batchId: String, + contentId: String, + isDryRunMode: Boolean)(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig, sc: SparkContext): Unit = { + if (isDryRunMode) { + saveLocal(incorrectRecords, batchId, contentId, "assessment-invalid-attempts-records") + } else { + JobLogger.log("Deleting the records from the table", None, INFO) + saveLocal(incorrectRecords, batchId, contentId, "assessment-invalid-attempts-records") // For cross verification purpose + incorrectRecords.select("course_id", "batch_id", "user_id", "content_id", "attempt_id").rdd.deleteFromCassandra(AppConf.getConfig("sunbird.courses.keyspace"), "assessment_aggregator", keyColumns = SomeColumns("course_id", "batch_id", "user_id", "content_id", "attempt_id")) + } + } + + def generateInstructionEvents(inCorrectRecordsDF: DataFrame, + batchId: String, + contentId: String)(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig, sc: SparkContext): Unit = { + val outputPath = config.modelParams.getOrElse(Map[String, Option[AnyRef]]()).getOrElse("csvPath", "").asInstanceOf[String] + val file = new File(outputPath.concat(s"/instruction-events-$batchId-$contentId-${System.currentTimeMillis()}.json")) + val writer: BufferedWriter = new BufferedWriter(new FileWriter(file)) + try { + val userIds: List[String] = inCorrectRecordsDF.select("user_id").distinct().collect().map(_ (0)).toList.asInstanceOf[List[String]] + val courseId: String = inCorrectRecordsDF.select("course_id").distinct().head.getString(0) + for (userId <- userIds) yield { + val event = s"""{"assessmentTs":${System.currentTimeMillis()},"batchId":"$batchId","courseId":"$courseId","userId":"$userId","contentId":"$contentId"}""" + writer.write(event) + writer.newLine() + } + writer.flush() + } catch { + case ex: IOException => ex.printStackTrace() + } finally { + writer.close() + } + } + + // Method to fetch the totalQuestion count from the content meta + def getTotalQuestions(contentId: String, + apiUrl: String): ContentMeta = { + val response = RestUtil.get[ContentResponse](apiUrl.concat(contentId)) + if (null != response && response.responseCode.equalsIgnoreCase("ok") && null != response.result.content && response.result.content.nonEmpty) { + val totalQuestions: Int = response.result.content.getOrElse("totalQuestions", 0).asInstanceOf[Int] + val contentType: String = response.result.content.getOrElse("contentType", null).asInstanceOf[String] + ContentMeta(totalQuestions, contentType, contentId) + } else { + throw new Exception(s"Failed to fetch the content meta for the content ID: $contentId") // Job should stop if the api has failed + } + } + + // Start of fetch logic from the DB + def getAssessmentAggData(batchId: String)(implicit spark: SparkSession): DataFrame = { + loadData(assessmentAggDBSettings, cassandraFormat, new StructType()) + .filter(col("batch_id") === batchId) + } + + def getUserEnrolment(batchId: String)(implicit spark: SparkSession): DataFrame = { + loadData(userEnrolmentDBSettings, cassandraFormat, new StructType()).select("batchid", "courseid", "userid", "issued_certificates") + .filter(col("batchid") === batchId) + } + + // End of fetch logic from the DB +} diff --git a/data-products/src/main/scala/org/sunbird/analytics/exhaust/BaseReportsJob.scala b/data-products/src/main/scala/org/sunbird/analytics/exhaust/BaseReportsJob.scala index a4d475718..45434a051 100644 --- a/data-products/src/main/scala/org/sunbird/analytics/exhaust/BaseReportsJob.scala +++ b/data-products/src/main/scala/org/sunbird/analytics/exhaust/BaseReportsJob.scala @@ -37,7 +37,8 @@ trait BaseReportsJob { val sparkUserDbRedisPort = modelParams.get("sparkUserDbRedisPort") JobContext.parallelization = CommonUtil.getParallelization(config) val readConsistencyLevel = modelParams.getOrElse("cassandraReadConsistency", "LOCAL_QUORUM").asInstanceOf[String]; - val sparkSession = CommonUtil.getSparkSession(JobContext.parallelization, config.appName.getOrElse(config.model), sparkCassandraConnectionHost, sparkElasticsearchConnectionHost, Option(readConsistencyLevel), sparkRedisConnectionHost, sparkUserDbRedisIndex, sparkUserDbRedisPort) + val writeConsistencyLevel = modelParams.getOrElse("cassandraWriteConsistency", "LOCAL_QUORUM").asInstanceOf[String] + val sparkSession = CommonUtil.getSparkSession(JobContext.parallelization, config.appName.getOrElse(config.model), sparkCassandraConnectionHost, sparkElasticsearchConnectionHost, Option(readConsistencyLevel),sparkRedisConnectionHost, sparkUserDbRedisIndex, sparkUserDbRedisPort, writeConsistencyLevel) setReportsStorageConfiguration(config)(sparkSession) sparkSession; diff --git a/data-products/src/main/scala/org/sunbird/analytics/util/TextBookUtils.scala b/data-products/src/main/scala/org/sunbird/analytics/util/TextBookUtils.scala index 624b3cfe4..80636947a 100644 --- a/data-products/src/main/scala/org/sunbird/analytics/util/TextBookUtils.scala +++ b/data-products/src/main/scala/org/sunbird/analytics/util/TextBookUtils.scala @@ -38,7 +38,7 @@ object TextBookUtils { def getTextBooks(config: Map[String, AnyRef])(implicit sc:SparkContext,fc: FrameworkContext): List[TextbookData] = { val request = JSONUtils.serialize(config.get("druidConfig").get) val druidQuery = JSONUtils.deserialize[DruidQueryModel](request) - val druidResponse = DruidDataFetcher.getDruidData(druidQuery) + val druidResponse = DruidDataFetcher.getDruidData(druidQuery, true) val result = druidResponse.map(f => { JSONUtils.deserialize[TextbookData](f) diff --git a/data-products/src/test/resources/assessment-score-correction/assessment.cql b/data-products/src/test/resources/assessment-score-correction/assessment.cql new file mode 100644 index 000000000..4ec0ec435 --- /dev/null +++ b/data-products/src/test/resources/assessment-score-correction/assessment.cql @@ -0,0 +1,87 @@ +CREATE KEYSPACE IF NOT EXISTS sunbird_courses WITH replication = { +'class': 'SimpleStrategy', +'replication_factor': '1' +}; + + +CREATE TYPE IF NOT EXISTS sunbird_courses.question ( + id text, + assess_ts timestamp, + max_score double, + score double, + type text, + title text, + resvalues list>>, + params list>>, + description text, + duration decimal +); + +CREATE TABLE IF NOT EXISTS sunbird_courses.assessment_aggregator ( + user_id text, + course_id text, + batch_id text, + content_id text, + attempt_id text, + created_on timestamp, + grand_total text, + last_attempted_on timestamp, + question list>, + total_max_score double, + total_score double, + updated_on timestamp, + PRIMARY KEY ((user_id, course_id), batch_id, content_id, attempt_id) +); +TRUNCATE sunbird_courses.assessment_aggregator; + +INSERT INTO sunbird_courses.assessment_aggregator (course_id, batch_id, user_id, content_id, attempt_id, grand_total, total_max_score, total_score, question, updated_on) VALUES ('do_1130928636168192001667', 'batch-00001', 'user-001', 'do_11307972307046400011917', 'attempat-001', '20', 20, 20, [{id: 'do_213019475454476288155', assess_ts: '2021-06-18T20:15:56.490+0000', max_score: 1, score: 1, type: 'mcq', title: 'testQuestiontextandformula', resvalues: [{'1': '{"text":"A=\\\\pi r^2\n"}'}], params: [{'1': '{"text":"A=\\\\pi r^2\n"}'}, {'2': '{"text":"no\n"}'}, {'answer': '{"correct":["1"]}'}], description: 'testQuestiontextandformula', duration: 1.0}, {id: 'do_213019970118279168165', assess_ts: '2020-06-18T20:15:56.490+0000', max_score: 1, score: 1, type: 'mcq', title: 'test with formula', resvalues: [{'1': '{"text":"1\nA=\\\\pi r^2A=\\\\pi r^2\n"}'}], params: [{'1': '{"text":"1\nA=\\\\pi r^2A=\\\\pi r^2\n"}'}, {'2': '{"text":"2\n"}'}, {'answer': '{"correct":["1"]}'}], description: '', duration: 1.0}, {id: 'do_213019972814823424168', assess_ts: '2020-06-17T18:15:56.490+0000', max_score: 1, score: 0.33, type: 'mtf', title: 'Copy of - Match the following:\n\nx=\\frac{-b\\pm\\sqrt{b^2-4ac}}{2a}\nArrange the following equations in correct order.\n', resvalues: [{'lhs': '[{"1":"{\"text\":\"A=\\\\\\\\pi r^2\\n\"}"},{"2":"{\"text\":\"\\\\\\\\frac{4}{3}\\\\\\\\pi r^3\\n\"}"},{"3":"{\"text\":\"a^n\\\\\\\\times a^m=a^{n+m}\\n\"}"}]'}, {'rhs': '[{"1":"{\"text\":\"Volume of sphere\\n\"}"},{"2":"{\"text\":\"Area of Circle\\n\"}"},{"3":"{\"text\":\"Product Rule\\n\"}"}]'}], params: [{'lhs': '[{"1":"{\"text\":\"A=\\\\\\\\pi r^2\\n\"}"},{"2":"{\"text\":\"\\\\\\\\frac{4}{3}\\\\\\\\pi r^3\\n\"}"},{"3":"{\"text\":\"a^n\\\\\\\\times a^m=a^{n+m}\\n\"}"}]'}, {'rhs': '[{"1":"{\"text\":\"Volume of sphere\\n\"}"},{"2":"{\"text\":\"Product Rule\\n\"}"},{"3":"{\"text\":\"Area of Circle\\n\"}"}]'}, {'answer': '{"lhs":["1","2","3"],"rhs":["3","1","2"]}'}], description: '', duration: 2.0}, {id: 'do_2130256513760624641171', assess_ts: '2020-06-18T21:15:56.490+0000', max_score: 10, score: 10, type: 'mcq', title: '2 +2 is..? mark ia 10\n', resvalues: [{'1': '{"text":"4\n"}'}], params: [{'1': '{"text":"4\n"}'}, {'2': '{"text":"3\n"}'}, {'3': '{"text":"8\n"}'}, {'4': '{"text":"10\n"}'}, {'answer': '{"correct":["1"]}'}], description: '', duration: 12.0},{id: 'do_2130256513760624641171', assess_ts: '2020-06-11T21:15:56.490+0000', max_score: 10, score: 10, type: 'mcq', title: '2 +2 is..? mark ia 10\n', resvalues: [{'1': '{"text":"4\n"}'}], params: [{'1': '{"text":"4\n"}'}, {'2': '{"text":"3\n"}'}, {'3': '{"text":"8\n"}'}, {'4': '{"text":"10\n"}'}, {'answer': '{"correct":["1"]}'}], description: '', duration: 12.0},{id: 'do_2130256513760624641171', assess_ts: '2020-06-21T21:15:56.490+0000', max_score: 10, score: 10, type: 'mcq', title: '2 +2 is..? mark ia 10\n', resvalues: [{'1': '{"text":"4\n"}'}], params: [{'1': '{"text":"4\n"}'}, {'2': '{"text":"3\n"}'}, {'3': '{"text":"8\n"}'}, {'4': '{"text":"10\n"}'}, {'answer': '{"correct":["1"]}'}], description: '', duration: 12.0},{id: 'do_2130256513760624641171', assess_ts: '2020-06-22T21:15:56.490+0000', max_score: 10, score: 10, type: 'mcq', title: '2 +2 is..? mark ia 10\n', resvalues: [{'1': '{"text":"4\n"}'}], params: [{'1': '{"text":"4\n"}'}, {'2': '{"text":"3\n"}'}, {'3': '{"text":"8\n"}'}, {'4': '{"text":"10\n"}'}, {'answer': '{"correct":["1"]}'}], description: '', duration: 12.0}], toTimeStamp(toDate(now()))); +INSERT INTO sunbird_courses.assessment_aggregator (course_id, batch_id, user_id, content_id, attempt_id, grand_total, total_max_score, total_score, question, updated_on) VALUES ('do_1130928636168192001667', 'batch-00001', 'user-001', 'do_11307972307046400011917', 'attempat-002', '20', 6, 20, [{id: 'do_2130194754544762881551', assess_ts: '2021-08-01T10:15:56.490+0000', max_score: 1, score: 1, type: 'mcq', title: 'testQuestiontextandformula', resvalues: [{'1': '{"text":"A=\\\\pi r^2\n"}'}], params: [{'1': '{"text":"A=\\\\pi r^2\n"}'}, {'2': '{"text":"no\n"}'}, {'answer': '{"correct":["1"]}'}], description: 'testQuestiontextandformula', duration: 1.0}, {id: 'do_213019970118279168165', assess_ts: '2021-08-01T11:15:56.490+0000', max_score: 1, score: 1, type: 'mcq', title: 'test with formula', resvalues: [{'1': '{"text":"1\nA=\\\\pi r^2A=\\\\pi r^2\n"}'}], params: [{'1': '{"text":"1\nA=\\\\pi r^2A=\\\\pi r^2\n"}'}, {'2': '{"text":"2\n"}'}, {'answer': '{"correct":["1"]}'}], description: '', duration: 1.0}, {id: 'do_213019972814823424168', assess_ts: '2021-08-01T11:00:56.490+0000', max_score: 1, score: 0.33, type: 'mtf', title: 'Copy of - Match the following:\n\nx=\\frac{-b\\pm\\sqrt{b^2-4ac}}{2a}\nArrange the following equations in correct order.\n', resvalues: [{'lhs': '[{"1":"{\"text\":\"A=\\\\\\\\pi r^2\\n\"}"},{"2":"{\"text\":\"\\\\\\\\frac{4}{3}\\\\\\\\pi r^3\\n\"}"},{"3":"{\"text\":\"a^n\\\\\\\\times a^m=a^{n+m}\\n\"}"}]'}, {'rhs': '[{"1":"{\"text\":\"Volume of sphere\\n\"}"},{"2":"{\"text\":\"Area of Circle\\n\"}"},{"3":"{\"text\":\"Product Rule\\n\"}"}]'}], params: [{'lhs': '[{"1":"{\"text\":\"A=\\\\\\\\pi r^2\\n\"}"},{"2":"{\"text\":\"\\\\\\\\frac{4}{3}\\\\\\\\pi r^3\\n\"}"},{"3":"{\"text\":\"a^n\\\\\\\\times a^m=a^{n+m}\\n\"}"}]'}, {'rhs': '[{"1":"{\"text\":\"Volume of sphere\\n\"}"},{"2":"{\"text\":\"Product Rule\\n\"}"},{"3":"{\"text\":\"Area of Circle\\n\"}"}]'}, {'answer': '{"lhs":["1","2","3"],"rhs":["3","1","2"]}'}], description: '', duration: 2.0}, {id: 'do_2130256513760624641171', assess_ts: '2021-08-01T11:15:56.490+0000', max_score: 10, score: 10, type: 'mcq', title: '2 +2 is..? mark ia 10\n', resvalues: [{'1': '{"text":"4\n"}'}], params: [{'1': '{"text":"4\n"}'}, {'2': '{"text":"3\n"}'}, {'3': '{"text":"8\n"}'}, {'4': '{"text":"10\n"}'}, {'answer': '{"correct":["1"]}'}], description: '', duration: 12.0},{id: 'do_213025651376062464117111', assess_ts: '2021-08-01T11:15:56.490+0000', max_score: 10, score: 10, type: 'mcq', title: '2 +2 is..? mark ia 10\n', resvalues: [{'1': '{"text":"4\n"}'}], params: [{'1': '{"text":"4\n"}'}, {'2': '{"text":"3\n"}'}, {'3': '{"text":"8\n"}'}, {'4': '{"text":"10\n"}'}, {'answer': '{"correct":["1"]}'}], description: '', duration: 12.0}], toTimeStamp(toDate(now()))); +INSERT INTO sunbird_courses.assessment_aggregator (course_id, batch_id, user_id, content_id, attempt_id, grand_total, total_max_score, total_score, question, updated_on) VALUES ('do_1130928636168192001667', 'batch-00001', 'user-001', 'do_11307972307046400011917', 'attempat-003', '20', 4, 20, [{id: 'do_213019475454476288155', assess_ts: '2021-08-01T10:15:56.490+0000', max_score: 1, score: 1, type: 'mcq', title: 'testQuestiontextandformula', resvalues: [{'1': '{"text":"A=\\\\pi r^2\n"}'}], params: [{'1': '{"text":"A=\\\\pi r^2\n"}'}, {'2': '{"text":"no\n"}'}, {'answer': '{"correct":["1"]}'}], description: 'testQuestiontextandformula', duration: 1.0}, {id: 'do_213019970118279168165', assess_ts: '2021-08-01T11:15:56.490+0000', max_score: 1, score: 1, type: 'mcq', title: 'test with formula', resvalues: [{'1': '{"text":"1\nA=\\\\pi r^2A=\\\\pi r^2\n"}'}], params: [{'1': '{"text":"1\nA=\\\\pi r^2A=\\\\pi r^2\n"}'}, {'2': '{"text":"2\n"}'}, {'answer': '{"correct":["1"]}'}], description: '', duration: 1.0}, {id: 'do_213019972814823424168', assess_ts: '2021-08-01T11:00:56.490+0000', max_score: 1, score: 0.33, type: 'mtf', title: 'Copy of - Match the following:\n\nx=\\frac{-b\\pm\\sqrt{b^2-4ac}}{2a}\nArrange the following equations in correct order.\n', resvalues: [{'lhs': '[{"1":"{\"text\":\"A=\\\\\\\\pi r^2\\n\"}"},{"2":"{\"text\":\"\\\\\\\\frac{4}{3}\\\\\\\\pi r^3\\n\"}"},{"3":"{\"text\":\"a^n\\\\\\\\times a^m=a^{n+m}\\n\"}"}]'}, {'rhs': '[{"1":"{\"text\":\"Volume of sphere\\n\"}"},{"2":"{\"text\":\"Area of Circle\\n\"}"},{"3":"{\"text\":\"Product Rule\\n\"}"}]'}], params: [{'lhs': '[{"1":"{\"text\":\"A=\\\\\\\\pi r^2\\n\"}"},{"2":"{\"text\":\"\\\\\\\\frac{4}{3}\\\\\\\\pi r^3\\n\"}"},{"3":"{\"text\":\"a^n\\\\\\\\times a^m=a^{n+m}\\n\"}"}]'}, {'rhs': '[{"1":"{\"text\":\"Volume of sphere\\n\"}"},{"2":"{\"text\":\"Product Rule\\n\"}"},{"3":"{\"text\":\"Area of Circle\\n\"}"}]'}, {'answer': '{"lhs":["1","2","3"],"rhs":["3","1","2"]}'}], description: '', duration: 2.0}, {id: 'do_2130256513760624641171', assess_ts: '2021-08-01T11:15:56.490+0000', max_score: 10, score: 10, type: 'mcq', title: '2 +2 is..? mark ia 10\n', resvalues: [{'1': '{"text":"4\n"}'}], params: [{'1': '{"text":"4\n"}'}, {'2': '{"text":"3\n"}'}, {'3': '{"text":"8\n"}'}, {'4': '{"text":"10\n"}'}, {'answer': '{"correct":["1"]}'}], description: '', duration: 12.0}], toTimeStamp(toDate(now()))); + +INSERT INTO sunbird_courses.user_activity_agg (activity_type, activity_id, user_id, context_id, agg, agg_last_updated) VALUES('Course', 'do_1130928636168192001667', 'user-001', 'cb:batch-00001', {'completedCount': 1}, {'completedCount': '2020-08-17'}); +INSERT INTO sunbird_courses.user_activity_agg (activity_type, activity_id, user_id, context_id, agg, agg_last_updated) VALUES('Course', 'do_1130928636168192001667', 'user-002', 'cb:batch-00001', {'completedCount': 0}, {'completedCount': '2020-09-17'}); + +DROP TABLE sunbird_courses.user_enrolments; +CREATE TABLE sunbird_courses.user_enrolments ( + batchid text, + courseid text, + userid text, + "active" boolean, + addedby text, + certificates frozen>>>, + certstatus int, + completedon timestamp, + completionpercentage int, + contentstatus frozen>, + datetime timestamp, + enrolled_date timestamp, + enrolleddate text, + issued_certificates frozen>>>, + lastreadcontentid text, + lastreadcontentstatus int, + progress int, + status int, + PRIMARY KEY (batchid, courseid, userid) +) WITH CLUSTERING ORDER BY (courseid ASC, userid ASC); + +INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, issued_certificates) VALUES('user-001', 'do_1130928636168192001667', 'batch-00001', [{'identifier': 'e08017de-3cb7-47d2-a375-6dd3f8575806', 'lastIssuedOn': '2020-05-01T11:27:35.130+0000', 'name': 'Acknowledgement Certificate', 'token': 'U4M3F6', 'url': 'https://diksha.gov.in/certs/0126684405014528002_0130107203983114243/e08017de-3cb7-47d2-a375-6dd3f8575806.pdf'}]); +INSERT INTO sunbird_courses.user_enrolments (userid, courseid, batchid, issued_certificates) VALUES('user-002', 'do_1130928636168192001667', 'batch-00001', [{'identifier': '0e045c27-5c9f-4d8d-9f2e-6c123eed63a7', 'lastIssuedOn': '2020-11-25T00:12:32.211+0000', 'name': 'Completion Certificate', 'token': 'N1C9D5'}]); + +CREATE TABLE IF NOT EXISTS sunbird.cert_registry ( + id text PRIMARY KEY, + accesscode text, + createdat timestamp, + createdby text, + data text, + isrevoked boolean, + jsonurl text, + pdfurl text, + reason text, + recipient text, + related text, + updatedat timestamp, + updatedby text +); + +INSERT INTO sunbird.cert_registry(id, accesscode, data) VALUES('e08017de-3cb7-47d2-a375-6dd3f8575806', 'Z3N7W6', '{"id":"https://loadtest.ntp.net.in/certs/0126796199493140480_0131495768541675525/7e8197e9-0a9c-4cb5-8abc-ea5eac38fa5a","type":["Assertion","Extension","extensions:CertificateExtension"],"issuedOn":"2020-11-19T00:00:00Z","recipient":{"identity":"7b5cc280-ec07-44e3-a5cc-e6fe54d9b79d","type":["id"],"hashed":false,"name":"1597748348-41199","@context":"https://loadtest.ntp.net.in/certs/v1/context.json"},"badge":{"id":"https://loadtest.ntp.net.in/certs/0126796199493140480_0131495768541675525/Badge.json","type":["BadgeClass"],"name":"Course 27012021 R21","criteria":{"type":["Criteria"],"narrative":"UPNISHTASVGTemplate"},"issuer":{"context":"https://loadtest.ntp.net.in/certs/v1/context.json","id":"https://loadtest.ntp.net.in/certs/Issuer.json","type":["Issuer"],"name":"up","url":"https://diksha.gov.in/up","publicKey":[]},"@context":"https://loadtest.ntp.net.in/certs/v1/context.json"},"evidence":{"id":"https://loadtest.ntp.net.in/certs/Evidence.json","type":["Evidence","Extension","extensions:TrainingEvidence"],"name":"Course 27012021 R21","@context":"https://loadtest.ntp.net.in/certs/v1/context.json"},"verification":{"type":["hosted"]},"revoked":false,"signatory":[{"identity":"up","type":["Extension","extensions:SignatoryExtension"],"hashed":false,"designation":"Uttar Pradesh","image":"https://diksha.gov.in/images/logo-sahaj.png","@context":"https://loadtest.ntp.net.in/certs/v1/extensions/SignatoryExtension/context.json"}],"@context":"https://loadtest.ntp.net.in/certs/v1/context.json"}'); +INSERT INTO sunbird.cert_registry(id, accesscode, data) VALUES('0e045c27-5c9f-4d8d-9f2e-6c123eed63a7', 'C6B2J6', '{"id":"https://loadtest.ntp.net.in/certs/0126796199493140480_0131495768541675525/7e8197e9-0a9c-4cb5-8abc-ea5eac38fa5a","type":["Assertion","Extension","extensions:CertificateExtension"],"issuedOn":"2020-11-19T00:00:00Z","recipient":{"identity":"7b5cc280-ec07-44e3-a5cc-e6fe54d9b79d","type":["id"],"hashed":false,"name":"1597748348-41199","@context":"https://loadtest.ntp.net.in/certs/v1/context.json"},"badge":{"id":"https://loadtest.ntp.net.in/certs/0126796199493140480_0131495768541675525/Badge.json","type":["BadgeClass"],"name":"Course 27012021 R21","criteria":{"type":["Criteria"],"narrative":"UPNISHTASVGTemplate"},"issuer":{"context":"https://loadtest.ntp.net.in/certs/v1/context.json","id":"https://loadtest.ntp.net.in/certs/Issuer.json","type":["Issuer"],"name":"up","url":"https://diksha.gov.in/up","publicKey":[]},"@context":"https://loadtest.ntp.net.in/certs/v1/context.json"},"evidence":{"id":"https://loadtest.ntp.net.in/certs/Evidence.json","type":["Evidence","Extension","extensions:TrainingEvidence"],"name":"Course 27012021 R21","@context":"https://loadtest.ntp.net.in/certs/v1/context.json"},"verification":{"type":["hosted"]},"revoked":false,"signatory":[{"identity":"up","type":["Extension","extensions:SignatoryExtension"],"hashed":false,"designation":"Uttar Pradesh","image":"https://diksha.gov.in/images/logo-sahaj.png","@context":"https://loadtest.ntp.net.in/certs/v1/extensions/SignatoryExtension/context.json"}],"@context":"https://loadtest.ntp.net.in/certs/v1/context.json"}'); diff --git a/data-products/src/test/scala/org/sunbird/analytics/audit/TestAssessmentScoreCorrectionJob.scala b/data-products/src/test/scala/org/sunbird/analytics/audit/TestAssessmentScoreCorrectionJob.scala new file mode 100644 index 000000000..b1892c0aa --- /dev/null +++ b/data-products/src/test/scala/org/sunbird/analytics/audit/TestAssessmentScoreCorrectionJob.scala @@ -0,0 +1,114 @@ +package org.sunbird.analytics.audit + +import org.apache.spark.SparkContext +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.ekstep.analytics.framework.util.{HadoopFileUtil, JSONUtils} +import org.ekstep.analytics.framework.{FrameworkContext, JobConfig} +import org.scalamock.scalatest.MockFactory +import org.sunbird.analytics.exhaust.BaseReportsJob +import org.sunbird.analytics.job.report.BaseReportSpec +import org.sunbird.analytics.util.EmbeddedCassandra + +import scala.collection.immutable.List + +class TestAssessmentScoreCorrectionJob extends BaseReportSpec with MockFactory { + implicit var spark: SparkSession = _ + + var assessmentAggDF: DataFrame = _ + var reporterMock: BaseReportsJob = mock[BaseReportsJob] + val sunbirdCoursesKeyspace = "sunbird_courses" + + override def beforeAll(): Unit = { + super.beforeAll() + spark = getSparkSession(); + EmbeddedCassandra.loadData("src/test/resources/assessment-score-correction/assessment.cql") // Load test data in embedded cassandra server + } + + override def afterAll(): Unit = { + super.afterAll() + new HadoopFileUtil().delete(spark.sparkContext.hadoopConfiguration, "src/test/resources/score-metrics-migration-job/") + } + + it should "Should able correct assessment raw data records when dryRunMode is true" in { + implicit val mockFc: FrameworkContext = mock[FrameworkContext] + implicit val sc: SparkContext = spark.sparkContext + val strConfig = """{"search":{"type":"none"},"model":"org.sunbird.analytics.job.report.AssessmentScoreCorrectionJob","modelParams":{"contentReadAPI":"https://dev.sunbirded.org/api/content/v1/read/","assessment.score.correction.batches":["batch-00001"],"isDryRunMode":true,"csvPath":"src/test/resources/score-metrics-migration-job/","store":"local","sparkCassandraConnectionHost":"{{ core_cassandra_host }}","fromDate":"$(date --date yesterday '+%Y-%m-%d')","toDate":"$(date --date yesterday '+%Y-%m-%d')"},"parallelization":8,"appName":"Assessment Score Correction"}""".stripMargin + implicit val jobConfig: JobConfig = JSONUtils.deserialize[JobConfig](strConfig) + val reportData = AssessmentScoreCorrectionJob.processBatches() + reportData.foreach(report => { + report.foreach(data => { + data.batchId should be("batch-00001") + data.invalidRecords should be(2) + data.contentId should be("do_11307972307046400011917") + data.contentTotalQuestions should be(4) + data.totalAffectedUsers should be(1) + }) + }) + + + // Validate Deleted Records Validation + val batch_id = "batch-00001" + val course_id = "do_11307972307046400011917" + val deleted_records = spark.read.format("com.databricks.spark.csv").option("header", "true") + .load(s"src/test/resources/score-metrics-migration-job/assessment-invalid-attempts-records-$batch_id-$course_id-*.csv/*.csv").cache() + deleted_records.count() should be(2) + + val contentIds: List[String] = deleted_records.select("content_id").distinct().collect().map(_ (0)).toList.asInstanceOf[List[String]] + contentIds.distinct.size should be(1) + contentIds.head should be("do_11307972307046400011917") + + val attempt_ids: List[String] = deleted_records.select("attempt_id").distinct().collect().map(_ (0)).toList.asInstanceOf[List[String]] + attempt_ids.distinct.size should be(2) + attempt_ids(0) should be("attempat-001") + attempt_ids(1) should be("attempat-002") + + val user_id: List[String] = deleted_records.select("user_id").distinct().collect().map(_ (0)).toList.asInstanceOf[List[String]] + user_id.distinct.size should be(1) + user_id.head should be("user-001") + + // Validate the instruction Events + + val instructionEvents = spark.read.json(s"src/test/resources/score-metrics-migration-job/instruction-events-$batch_id-*.json").cache() + + val contentId: String = instructionEvents.select("contentId").distinct().collect().map(_ (0)).toList.head.asInstanceOf[String] + val courseId: String = instructionEvents.select("courseId").distinct().collect().map(_ (0)).toList.head.asInstanceOf[String] + val userId: String = instructionEvents.select("userId").distinct().collect().map(_ (0)).toList.head.asInstanceOf[String] + + contentId should be("do_11307972307046400011917") + courseId should be("do_1130928636168192001667") + userId should be("user-001") + + // Validate the cert data + + val cert_data = spark.read.format("com.databricks.spark.csv").option("header", "true") + .load(s"src/test/resources/score-metrics-migration-job/revoked-cert-data-$batch_id-$course_id-*.csv/*.csv").cache() + + cert_data.show(false) + cert_data.count() should be(1) + cert_data.select("courseid").distinct().collect().map(_ (0)).toList.head.asInstanceOf[String] should be("do_1130928636168192001667") + cert_data.select("batchid").distinct().collect().map(_ (0)).toList.head.asInstanceOf[String] should be("batch-00001") + cert_data.select("userid").distinct().collect().map(_ (0)).toList.head.asInstanceOf[String] should be("user-001") + cert_data.select("certificate_id").distinct().collect().map(_ (0)).toList.head.asInstanceOf[String] should be("e08017de-3cb7-47d2-a375-6dd3f8575806") + + + } + + + it should "Should able correct assessment raw data records when dryRunMode is false" in { + implicit val mockFc: FrameworkContext = mock[FrameworkContext] + implicit val sc: SparkContext = spark.sparkContext + val strConfig = """{"search":{"type":"none"},"model":"org.sunbird.analytics.job.report.AssessmentScoreCorrectionJob","modelParams":{"correctRawAssessment":true,"contentReadAPI":"https://dev.sunbirded.org/api/content/v1/read/","assessment.score.correction.batches":["batch-00001"],"isDryRunMode":false,"csvPath":"src/test/resources/score-metrics-migration-job/","store":"local","sparkCassandraConnectionHost":"{{ core_cassandra_host }}","fromDate":"$(date --date yesterday '+%Y-%m-%d')","toDate":"$(date --date yesterday '+%Y-%m-%d')"},"parallelization":8,"appName":"Assessment Score Correction"}""".stripMargin + implicit val jobConfig: JobConfig = JSONUtils.deserialize[JobConfig](strConfig) + val reportData = AssessmentScoreCorrectionJob.processBatches() + reportData.foreach(report => { + report.foreach(data => { + data.batchId should be("batch-00001") + data.invalidRecords should be(2) + data.contentId should be("do_11307972307046400011917") + data.contentTotalQuestions should be(4) + data.totalAffectedUsers should be(1) + }) + }) + } + +} diff --git a/data-products/src/test/scala/org/sunbird/analytics/model/report/TestETBMetricsJobModel.scala b/data-products/src/test/scala/org/sunbird/analytics/model/report/TestETBMetricsJobModel.scala index 6844de885..159db3a1a 100644 --- a/data-products/src/test/scala/org/sunbird/analytics/model/report/TestETBMetricsJobModel.scala +++ b/data-products/src/test/scala/org/sunbird/analytics/model/report/TestETBMetricsJobModel.scala @@ -2,6 +2,8 @@ package org.sunbird.analytics.model.report import java.time.{ZoneOffset, ZonedDateTime} +import akka.actor.ActorSystem +import akka.stream.scaladsl.Source import cats.syntax.either._ import ing.wbaa.druid._ import ing.wbaa.druid.client.DruidClient @@ -18,7 +20,6 @@ import org.ekstep.analytics.framework.util.{HTTPClient, JSONUtils, RestUtil} import org.sunbird.cloud.storage.BaseStorageService import scala.concurrent.Future -import scala.io.Source class TestETBMetricsJobModel extends SparkSpec with Matchers with MockFactory { @@ -114,11 +115,11 @@ class TestETBMetricsJobModel extends SparkSpec with Matchers with MockFactory { val doc: Json = parse(json).getOrElse(Json.Null) val results = List(DruidResult.apply(Some(ZonedDateTime.of(2020, 1, 23, 17, 10, 3, 0, ZoneOffset.UTC)), doc)) - val druidResponse = DruidResponseTimeseriesImpl.apply(results, QueryType.GroupBy) - + val druidResponse = DruidResult.apply(Some(ZonedDateTime.of(2019, 11, 28, 17, 0, 0, 0, ZoneOffset.UTC)), doc) implicit val mockDruidConfig = DruidConfig.DefaultConfig val mockDruidClient = mock[DruidClient] - (mockDruidClient.doQuery[DruidResponse](_: DruidQuery)(_: DruidConfig)).expects(*, mockDruidConfig).returns(Future(druidResponse)).anyNumberOfTimes() + (mockDruidClient.actorSystem _).expects().returning(ActorSystem("TestQuery")).anyNumberOfTimes() + (mockDruidClient.doQueryAsStream(_: DruidQuery)(_: DruidConfig)).expects(*, mockDruidConfig).returns(Source(List(druidResponse))).anyNumberOfTimes() (mockFc.getDruidRollUpClient _).expects().returns(mockDruidClient).anyNumberOfTimes() val resultRDD = ETBMetricsModel.execute(sc.emptyRDD, Option(jobConfig)) @@ -152,14 +153,13 @@ class TestETBMetricsJobModel extends SparkSpec with Matchers with MockFactory { val doc: Json = parse(json).getOrElse(Json.Null) val results = List(DruidResult.apply(Some(ZonedDateTime.of(2020, 1, 23, 17, 10, 3, 0, ZoneOffset.UTC)), doc)) - val druidResponse = DruidResponseTimeseriesImpl.apply(results, QueryType.GroupBy) - + val druidResponse = DruidResult.apply(Some(ZonedDateTime.of(2019, 11, 28, 17, 0, 0, 0, ZoneOffset.UTC)), doc) implicit val mockDruidConfig = DruidConfig.DefaultConfig val mockDruidClient = mock[DruidClient] - (mockDruidClient.doQuery[DruidResponse](_: DruidQuery)(_: DruidConfig)).expects(*, mockDruidConfig).returns(Future(druidResponse)).anyNumberOfTimes() + (mockDruidClient.actorSystem _).expects().returning(ActorSystem("TestQuery")).anyNumberOfTimes() + (mockDruidClient.doQueryAsStream(_: DruidQuery)(_: DruidConfig)).expects(*, mockDruidConfig).returns(Source(List(druidResponse))).anyNumberOfTimes() (mockFc.getDruidRollUpClient _).expects().returns(mockDruidClient).anyNumberOfTimes() - ETBMetricsModel.execute(sc.emptyRDD, Option(jobConfig)) @@ -169,10 +169,10 @@ class TestETBMetricsJobModel extends SparkSpec with Matchers with MockFactory { implicit val mockFc = mock[FrameworkContext] val mockRestUtil = mock[HTTPClient] - val hierarchyData = JSONUtils.deserialize[ContentDetails](Source.fromInputStream + val hierarchyData = JSONUtils.deserialize[ContentDetails](scala.io.Source.fromInputStream (getClass.getResourceAsStream("/reports/hierarchyData.json")).getLines().mkString).result.content - val textBookData = JSONUtils.deserialize[TextbookData](Source.fromInputStream + val textBookData = JSONUtils.deserialize[TextbookData](scala.io.Source.fromInputStream (getClass.getResourceAsStream("/reports/textbookDetails.json")).getLines().mkString) implicit val sqlContext = new SQLContext(sc)