Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SB-27408 | Assessment archival to update existing requests #499

Open
wants to merge 25 commits into
base: release-4.5.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d320064
Issue feat SB-27408: Initial commit of Base Archival Job Implementation
manjudr Nov 7, 2021
9a4cbe2
Issue SB-24793 feat: Assessment archived data:: Base Archival job Imp…
utk14 Nov 15, 2021
d9c6e57
Issue SB-24793 feat: Assessment archived data:: Base Archival job Imp…
utk14 Nov 15, 2021
a0cdd69
Issue SB-24793 feat: Assessment archived data implemetation
utk14 Dec 7, 2021
029125b
Issue SB-24793 feat: Assessment archived data implemetation
utk14 Dec 7, 2021
9139b7c
Merge pull request #497 from manjudr/assessment-archival-refactor-cha…
manjudr Dec 7, 2021
78797d1
Issue #SB-27408 | Assessment archival to update existing requests
kumarks1122 Dec 8, 2021
862e306
Issue #SB-27408 | Assessment archival to create and update requests
kumarks1122 Dec 8, 2021
9f1567e
Issue #SB-27408 | Assessment archival test case and fixes added
kumarks1122 Dec 9, 2021
0de04e7
Issue #SB-27408 | Assessment archival Base and sub class changes
kumarks1122 Dec 10, 2021
4d5cab2
Issue #SB-27408 | Assessment archival changes added
kumarks1122 Dec 10, 2021
a60875f
Issue #SB-27408 | Assessment archival changes added
kumarks1122 Dec 10, 2021
78ee433
Issue SB-24793 feat: Review comments resolved
utk14 Dec 13, 2021
23c1d7b
merge conflicts resolved
utk14 Dec 13, 2021
8c39db1
Issue #SB-27408 | Test case fixes added
kumarks1122 Dec 13, 2021
e9f71dd
Issue #SB-27408 | PR Review changes added
kumarks1122 Dec 13, 2021
970b857
Issue SB-24793 feat: Review comments resolved
utk14 Dec 13, 2021
ff71892
merge conflicts resolved
utk14 Dec 13, 2021
0401987
Issue #SB-27408 | Archival Metrics changes added
kumarks1122 Dec 13, 2021
6b3c627
Issue #SB-27408 | Fixes added
kumarks1122 Dec 13, 2021
6ae0f3f
Issue #SB-27408 | Testcase Fixes added
kumarks1122 Dec 14, 2021
22c88a5
Issue #SB-27408 | Testcase Fixes added
kumarks1122 Dec 14, 2021
c69817e
Issue SB-24793 feat: Added batchfilters and search query support
utk14 Dec 15, 2021
4d3fac1
Merge conflicts resolved
utk14 Dec 15, 2021
0af8df3
Issue SB-24793 feat: Added batchfilters and search query support
utk14 Dec 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package org.sunbird.analytics.archival

import org.apache.spark.sql.functions.{col, to_json, to_timestamp, weekofyear, year}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.ekstep.analytics.framework.conf.AppConf
import org.ekstep.analytics.framework.util.{JSONUtils, JobLogger}
import org.ekstep.analytics.framework.{FrameworkContext, JobConfig, Level}
import org.sunbird.analytics.archival.util.ArchivalRequest

import java.util.concurrent.atomic.AtomicInteger

object AssessmentArchivalJob extends optional.Application with BaseArchivalJob {

case class Period(year: Int, weekOfYear: Int)
case class BatchPartition(batchId: String, period: Period)

private val partitionCols = List("batch_id", "year", "week_of_year")
kumarks1122 marked this conversation as resolved.
Show resolved Hide resolved
private val columnWithOrder = List("course_id", "batch_id", "user_id", "content_id", "attempt_id", "created_on", "grand_total", "last_attempted_on", "total_max_score", "total_score", "updated_on", "question")

override def getClassName = "org.sunbird.analytics.archival.AssessmentArchivalJob"
override def jobName = "AssessmentArchivalJob"
override def jobId: String = "assessment-archival"
override def getReportPath = "assessment-archival/"
override def getReportKey = "assessment"
override def dateColumn = "updated_on"

override def archivalFormat(batch: Map[String,AnyRef]): String = {
val formatDetails = JSONUtils.deserialize[BatchPartition](JSONUtils.serialize(batch))
s"${formatDetails.batchId}/${formatDetails.period.year}-${formatDetails.period.weekOfYear}"
}

override def dataFilter(requests: Array[ArchivalRequest], dataDF: DataFrame): DataFrame = {
var filteredDF = dataDF
for (request <- requests) {
if (request.archival_status.equals("SUCCESS")) {
val request_data = JSONUtils.deserialize[Map[String, AnyRef]](request.request_data)
filteredDF = dataDF.filter(
col("batch_id").equalTo(request.batch_id) &&
concat(col("year"), lit("-"), col("week_of_year")) =!= lit(request_data.get("year").get + "-" + request_data.get("week").get)
)
}
}
filteredDF
}

override def archiveData(requestConfig: Request, requests: Array[ArchivalRequest])(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): List[ArchivalRequest] = {

val archivalKeyspace = requestConfig.keyspace.getOrElse(AppConf.getConfig("sunbird.courses.keyspace"))
val batchId: String = requestConfig.batchId.getOrElse("")
val collectionId: String = requestConfig.collectionId.getOrElse("")
val date: String = requestConfig.date.getOrElse(null)

var data = loadData(Map("table" -> requestConfig.archivalTable, "keyspace" -> archivalKeyspace, "cluster" -> "LMSCluster"), cassandraUrl, new StructType())

data = if (batchId.nonEmpty && collectionId.nonEmpty) {
data.filter(col("batch_id") === batchId && col("course_id") === collectionId).persist()
} else if (batchId.nonEmpty) {
data.filter(col("batch_id") === batchId).persist()
} else {
data
}

try {
val dataDF = generatePeriodInData(data)
val filteredDF = dataFilter(requests, dataDF)
val archiveBatchList = filteredDF.groupBy(partitionCols.head, partitionCols.tail: _*).count().collect()
val batchesToArchive: Map[String, Array[BatchPartition]] = archiveBatchList.map(f => BatchPartition(f.get(0).asInstanceOf[String], Period(f.get(1).asInstanceOf[Int], f.get(2).asInstanceOf[Int]))).groupBy(_.batchId)

archiveBatches(batchesToArchive, filteredDF, requestConfig)
} catch {
case ex: Exception =>
ex.printStackTrace()
List()
}
}

def archiveBatches(batchesToArchive: Map[String, Array[BatchPartition]], data: DataFrame, requestConfig: Request)(implicit config: JobConfig): List[ArchivalRequest] = {
batchesToArchive.flatMap(batches => {
val processingBatch = new AtomicInteger(batches._2.length)
JobLogger.log(s"Started Processing to archive the data", Some(Map("batch_id" -> batches._1, "total_part_files_to_archive" -> processingBatch)))

// Loop through the week_num & year batch partition
batches._2.map((batch: BatchPartition) => {
val filteredDF = data.filter(col("batch_id") === batch.batchId && col("year") === batch.period.year && col("week_of_year") === batch.period.weekOfYear).select(columnWithOrder.head, columnWithOrder.tail: _*)
val collectionId = filteredDF.first().getAs[String]("course_id")
var archivalRequest:ArchivalRequest = getRequest(collectionId, batch.batchId, List(batch.period.year, batch.period.weekOfYear))

if (archivalRequest == null) {
val request_data = JSONUtils.deserialize[Map[String, AnyRef]](JSONUtils.serialize(requestConfig)) ++ Map[String, Int](
"week" -> batch.period.weekOfYear,
"year"-> batch.period.year
)
archivalRequest = ArchivalRequest("", batch.batchId, collectionId, Option(getReportKey), jobId, None, None, null, null, None, Option(0), JSONUtils.serialize(request_data), None)
}

try {
val urls = upload(filteredDF, Map("batchId" -> batch.batchId, "period"-> Map("year" -> batch.period.year, "weekOfYear" -> batch.period.weekOfYear))) // Upload the archived files into blob store
archivalRequest.blob_url = Option(urls)
JobLogger.log(s"Data is archived and Processing the remaining part files ", Some(Map("remaining_part_files_to_archive" -> processingBatch.decrementAndGet())), Level.INFO)
markRequestAsSuccess(archivalRequest, requestConfig)
} catch {
case ex: Exception => {
markArchivalRequestAsFailed(archivalRequest, ex.getLocalizedMessage)
}
}
})
}).toList
}

def deleteArchivedData(archivalRequest: Request): List[ArchivalRequest] = {
// TODO: Deletion feature
List()
}

def generatePeriodInData(data: DataFrame): DataFrame = {
data.withColumn("updated_on", to_timestamp(col(dateColumn)))
.withColumn("year", year(col("updated_on")))
.withColumn("week_of_year", weekofyear(col("updated_on")))
.withColumn("question", to_json(col("question")))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package org.sunbird.analytics.archival

import com.datastax.spark.connector.cql.CassandraConnectorConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.cassandra._
import org.ekstep.analytics.framework.Level.ERROR
import org.ekstep.analytics.framework.conf.AppConf
import org.ekstep.analytics.framework.util.{CommonUtil, JSONUtils, JobLogger}
import org.ekstep.analytics.framework.{FrameworkContext, IJob, JobConfig, Level}
import org.sunbird.analytics.exhaust.BaseReportsJob
import org.ekstep.analytics.framework.util.DatasetUtil.extensions
import org.sunbird.analytics.archival.util.{ArchivalMetaDataStoreJob, ArchivalRequest}

case class Request(archivalTable: String, keyspace: Option[String], query: Option[String] = Option(""), batchId: Option[String] = Option(""), collectionId: Option[String]=Option(""), date: Option[String] = Option(""))

trait BaseArchivalJob extends BaseReportsJob with IJob with ArchivalMetaDataStoreJob with Serializable {

val cassandraUrl = "org.apache.spark.sql.cassandra"
def dateColumn: String

def main(config: String)(implicit sc: Option[SparkContext] = None, fc: Option[FrameworkContext] = None): Unit = {
implicit val className: String = getClassName;
JobLogger.init(jobName)
JobLogger.start(s"$jobName started executing - ver3", Option(Map("config" -> config, "model" -> jobName)))
implicit val jobConfig: JobConfig = JSONUtils.deserialize[JobConfig](config)
implicit val spark: SparkSession = openSparkSession(jobConfig)
implicit val frameworkContext: FrameworkContext = getReportingFrameworkContext()

try {
val res = CommonUtil.time(execute());
JobLogger.end(s"$jobName completed execution", "SUCCESS", None)
} catch {
case ex: Exception => ex.printStackTrace()
JobLogger.log(ex.getMessage, None, ERROR);
JobLogger.end(jobName + " execution failed", "FAILED", Option(Map("model" -> jobName, "statusMsg" -> ex.getMessage)));
}
finally {
frameworkContext.closeContext();
spark.close()
}
}

def init()(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): Unit = {
spark.setCassandraConf("LMSCluster", CassandraConnectorConf.ConnectionHostParam.option(AppConf.getConfig("sunbird.courses.cluster.host")))
}

def execute()(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): Unit = {
val modelParams = config.modelParams.getOrElse(Map[String, Option[AnyRef]]());
val requestConfig = JSONUtils.deserialize[Request](JSONUtils.serialize(modelParams.getOrElse("request", Request).asInstanceOf[Map[String,AnyRef]]))
val mode: String = modelParams.getOrElse("mode","archive").asInstanceOf[String]

val requests = getRequests(jobId, requestConfig.batchId)

val archivalRequests = mode.toLowerCase() match {
case "archival" =>
archiveData(requestConfig, requests)
case "delete" =>
deleteArchivedData(requestConfig)
}
for (archivalRequest <- archivalRequests) {
upsertRequest(archivalRequest)
}
}

def upload(archivedData: DataFrame, batch: Map[String,AnyRef])(implicit jobConfig: JobConfig): List[String] = {
val blobConfig = jobConfig.modelParams.get("blobConfig").asInstanceOf[Map[String, AnyRef]]
val reportPath: String = blobConfig.getOrElse("reportPath", "archived-data/").asInstanceOf[String]
val container = AppConf.getConfig("cloud.container.reports")
val objectKey = AppConf.getConfig("course.metrics.cloud.objectKey")
val fileName = archivalFormat(batch)
val storageConfig = getStorageConfig(jobConfig, objectKey)
JobLogger.log(s"Uploading reports to blob storage", None, Level.INFO)
archivedData.saveToBlobStore(storageConfig, "csv", s"$reportPath$fileName-${System.currentTimeMillis()}", Option(Map("header" -> "true", "codec" -> "org.apache.hadoop.io.compress.GzipCodec")), None, fileExt=Some("csv.gz"))
}

// Overriding methods START:
def jobId: String;
def jobName: String;
def getReportPath: String;
def getReportKey: String;
def getClassName: String;

def archiveData(requestConfig: Request, requests: Array[ArchivalRequest])(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): List[ArchivalRequest];
def deleteArchivedData(archivalRequest: Request): List[ArchivalRequest];
def archivalFormat(batch: Map[String,AnyRef]): String;
def dataFilter(requests: Array[ArchivalRequest], dataDF: DataFrame): DataFrame;

//Overriding methods END:

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package org.sunbird.analytics.archival.util

import java.security.MessageDigest
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet, Timestamp}
import java.util.Properties
import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.{Encoders, SparkSession}
import org.apache.spark.sql.functions.col
import org.ekstep.analytics.framework.FrameworkContext
import org.ekstep.analytics.framework.Level.INFO
import org.ekstep.analytics.framework.conf.AppConf
import org.ekstep.analytics.framework.util.{CommonUtil, JSONUtils, JobLogger}
import org.sunbird.analytics.archival.Request

case class ArchivalRequest(request_id: String, batch_id: String, collection_id: String, resource_type: Option[String], job_id: String,
var archival_date: Option[Long],var completion_date: Option[Long],var archival_status: String,var deletion_status: String,
var blob_url: Option[List[String]],var iteration: Option[Int], request_data: String, var err_message: Option[String])

trait ArchivalMetaDataStoreJob {

implicit val className: String = getClassName;
val connProperties: Properties = CommonUtil.getPostgresConnectionProps()
val db: String = AppConf.getConfig("postgres.db")
val url: String = AppConf.getConfig("postgres.url") + s"$db"
val requestsTable: String = AppConf.getConfig("postgres.table.archival_request")
val dbc: Connection = DriverManager.getConnection(url, connProperties.getProperty("user"), connProperties.getProperty("password"));
dbc.setAutoCommit(true);

def getClassName(): String;

def cleanUp() {
dbc.close();
}

def getRequests(jobId: String, batchId: Option[String])(implicit spark: SparkSession, fc: FrameworkContext): Array[ArchivalRequest] = {
val encoder = Encoders.product[ArchivalRequest]
val archivalConfigsDf = spark.read.jdbc(url, requestsTable, connProperties)
.where(col("job_id") === jobId && col("iteration") < 3)

val filteredReportConfigDf = if (batchId.isDefined) {
val filteredArchivalConfig = archivalConfigsDf.filter(col("batch_id").equalTo(batchId.get))
if (filteredArchivalConfig.count() > 0) filteredArchivalConfig else archivalConfigsDf
} else archivalConfigsDf

JobLogger.log("fetched records count" + filteredReportConfigDf.count(), None, INFO)
val requests = filteredReportConfigDf.as[ArchivalRequest](encoder).collect()
requests
}

def getRequestID(collectionId: String, batchId: String, partitionCols: List[Int]): String = {
val requestComb = s"$collectionId:$batchId:" + partitionCols.mkString(":")
MessageDigest.getInstance("MD5").digest(requestComb.getBytes).map("%02X".format(_)).mkString
}

def getRequest(collectionId: String, batchId: String, partitionCols: List[Int]): ArchivalRequest = {
val requestId = getRequestID(collectionId, batchId, partitionCols)
val archivalRequest = s"""select * from $requestsTable where request_id = '$requestId' limit 1"""
val pstmt: PreparedStatement = dbc.prepareStatement(archivalRequest);
val resultSet = pstmt.executeQuery()

if (resultSet.next()) getArchivalRequest(resultSet) else null
}

private def getArchivalRequest(resultSet: ResultSet): ArchivalRequest = {
ArchivalRequest(
resultSet.getString("request_id"),
resultSet.getString("batch_id"),
resultSet.getString("collection_id"),
Option(resultSet.getString("resource_type")),
resultSet.getString("job_id"),
Option(resultSet.getTimestamp("archival_date").getTime),
if (resultSet.getTimestamp("completion_date") != null) Option(resultSet.getTimestamp("completion_date").getTime) else None,
resultSet.getString("archival_status"),
resultSet.getString("deletion_status"),
if (resultSet.getArray("blob_url") != null) Option(resultSet.getArray("blob_url").getArray().asInstanceOf[Array[String]].toList) else None,
Option(resultSet.getInt("iteration")),
resultSet.getString("request_data"),
Option(resultSet.getString("err_message"))
)
}

def markArchivalRequestAsFailed(request: ArchivalRequest, failedMsg: String): ArchivalRequest = {
request.archival_status = "FAILED";
request.archival_date = Option(System.currentTimeMillis());
request.iteration = Option(request.iteration.getOrElse(0) + 1);
request.err_message = Option(failedMsg);
request
}

def markDeletionRequestAsFailed(request: ArchivalRequest, failedMsg: String): ArchivalRequest = {
request.deletion_status = "FAILED";
request.archival_date = Option(System.currentTimeMillis());
request.iteration = Option(request.iteration.getOrElse(0) + 1);
request.err_message = Option(failedMsg);
request
}

def createRequest(request: ArchivalRequest) = {
val insertQry = s"INSERT INTO $requestsTable (request_id, batch_id, collection_id, resource_type, job_id, archival_date, completion_date, archival_status, " +
s"deletion_status, blob_url, iteration, request_data, err_message) VALUES (?,?,?,?,?,?,?,?,?,?,?,?::json,?)"
val pstmt: PreparedStatement = dbc.prepareStatement(insertQry);
val request_data = JSONUtils.deserialize[Map[String, AnyRef]](request.request_data)
val requestId = getRequestID(request.collection_id, request.batch_id, List(request_data("year").asInstanceOf[Int], request_data("week").asInstanceOf[Int]))
pstmt.setString(1, requestId);
pstmt.setString(2, request.batch_id);
pstmt.setString(3, request.collection_id);
pstmt.setString(4, request.resource_type.getOrElse("assessment"));
pstmt.setString(5, request.job_id);
pstmt.setTimestamp(6, if (request.archival_date.isDefined) new Timestamp(request.archival_date.get) else null);
pstmt.setTimestamp(7, if (request.completion_date.isDefined) new Timestamp(request.completion_date.get) else null);
pstmt.setString(8, request.archival_status);
pstmt.setString(9, request.deletion_status);
val blobURLs = request.blob_url.getOrElse(List()).toArray.asInstanceOf[Array[Object]];
pstmt.setArray(10, dbc.createArrayOf("text", blobURLs))
pstmt.setInt(11, request.iteration.getOrElse(0))
pstmt.setString(12, request.request_data)
pstmt.setString(13, StringUtils.abbreviate(request.err_message.getOrElse(""), 300));

pstmt.execute()
}

def upsertRequest(request: ArchivalRequest): Unit = {
if (request.request_id.isEmpty) {
createRequest(request)
} else {
updateRequest(request)
}
}

def updateRequest(request: ArchivalRequest): Unit = {
val updateQry = s"UPDATE $requestsTable SET blob_url=?, iteration = ?, archival_date=?, completion_date=?, " +
s"archival_status=?, deletion_status=? WHERE request_id=?";
val pstmt: PreparedStatement = dbc.prepareStatement(updateQry)

val blobURLs = request.blob_url.getOrElse(List()).toArray.asInstanceOf[Array[Object]];
pstmt.setArray(1, dbc.createArrayOf("text", blobURLs))
pstmt.setInt(2, request.iteration.get);
pstmt.setTimestamp(3, if (request.archival_date.isDefined) new Timestamp(request.archival_date.get) else null);
pstmt.setTimestamp(4, if (request.completion_date.isDefined) new Timestamp(request.completion_date.get) else null);
pstmt.setString(5, request.archival_status);
pstmt.setString(6, request.deletion_status);
pstmt.setString(7, request.request_id);

pstmt.execute()
}

def markRequestAsSuccess(request: ArchivalRequest, requestConfig: Request): ArchivalRequest = {
request.archival_status = "SUCCESS";
request.archival_date = Option(System.currentTimeMillis())
request
}

}
3 changes: 3 additions & 0 deletions data-products/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -199,3 +199,6 @@ uci.conversation.postgres.pass="postgres"
uci.exhaust.store.prefix="src/test/resources/exhaust-reports/"
uci.encryption.secret="123443957398423479784298247982789428fldkssd"
// END OF UCI Related Job Configs

//START of Archival Config
postgres.table.archival_request="archival_metadata"
Loading