Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SB-27408 | Assessment archival to update existing requests #499

Open
wants to merge 25 commits into
base: release-4.5.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d320064
Issue feat SB-27408: Initial commit of Base Archival Job Implementation
manjudr Nov 7, 2021
9a4cbe2
Issue SB-24793 feat: Assessment archived data:: Base Archival job Imp…
utk14 Nov 15, 2021
d9c6e57
Issue SB-24793 feat: Assessment archived data:: Base Archival job Imp…
utk14 Nov 15, 2021
a0cdd69
Issue SB-24793 feat: Assessment archived data implemetation
utk14 Dec 7, 2021
029125b
Issue SB-24793 feat: Assessment archived data implemetation
utk14 Dec 7, 2021
9139b7c
Merge pull request #497 from manjudr/assessment-archival-refactor-cha…
manjudr Dec 7, 2021
78797d1
Issue #SB-27408 | Assessment archival to update existing requests
kumarks1122 Dec 8, 2021
862e306
Issue #SB-27408 | Assessment archival to create and update requests
kumarks1122 Dec 8, 2021
9f1567e
Issue #SB-27408 | Assessment archival test case and fixes added
kumarks1122 Dec 9, 2021
0de04e7
Issue #SB-27408 | Assessment archival Base and sub class changes
kumarks1122 Dec 10, 2021
4d5cab2
Issue #SB-27408 | Assessment archival changes added
kumarks1122 Dec 10, 2021
a60875f
Issue #SB-27408 | Assessment archival changes added
kumarks1122 Dec 10, 2021
78ee433
Issue SB-24793 feat: Review comments resolved
utk14 Dec 13, 2021
23c1d7b
merge conflicts resolved
utk14 Dec 13, 2021
8c39db1
Issue #SB-27408 | Test case fixes added
kumarks1122 Dec 13, 2021
e9f71dd
Issue #SB-27408 | PR Review changes added
kumarks1122 Dec 13, 2021
970b857
Issue SB-24793 feat: Review comments resolved
utk14 Dec 13, 2021
ff71892
merge conflicts resolved
utk14 Dec 13, 2021
0401987
Issue #SB-27408 | Archival Metrics changes added
kumarks1122 Dec 13, 2021
6b3c627
Issue #SB-27408 | Fixes added
kumarks1122 Dec 13, 2021
6ae0f3f
Issue #SB-27408 | Testcase Fixes added
kumarks1122 Dec 14, 2021
22c88a5
Issue #SB-27408 | Testcase Fixes added
kumarks1122 Dec 14, 2021
c69817e
Issue SB-24793 feat: Added batchfilters and search query support
utk14 Dec 15, 2021
4d3fac1
Merge conflicts resolved
utk14 Dec 15, 2021
0af8df3
Issue SB-24793 feat: Added batchfilters and search query support
utk14 Dec 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.sunbird.analytics.archival

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.ekstep.analytics.framework.{FrameworkContext, JobConfig}

object AssessmentArchivalJob extends optional.Application with BaseArchivalJob {

override def getClassName = "org.sunbird.analytics.archival.AssessmentArchivalJob"
override def jobName() = "AssessmentArchivalJob";
override def jobId(): String = "assessment-archival";
override def getReportPath() = "assessment-archival/";
override def getReportKey() = "assessment";

override def processArchival(archivalTableData: DataFrame, requestConfig: Request)(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): DataFrame = {
println("Process Archival")
generatePeriodInData(data = archivalTableData)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package org.sunbird.analytics.archival

import java.util.concurrent.atomic.AtomicInteger

import com.datastax.spark.connector.cql.CassandraConnectorConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.cassandra._
import org.apache.spark.sql.types.StructType
import org.ekstep.analytics.framework.Level.ERROR
import org.ekstep.analytics.framework.conf.AppConf
import org.ekstep.analytics.framework.util.{CommonUtil, JSONUtils, JobLogger}
import org.ekstep.analytics.framework.{FrameworkContext, IJob, JobConfig, Level}
import org.sunbird.analytics.exhaust.BaseReportsJob
import org.ekstep.analytics.framework.util.DatasetUtil.extensions
import org.apache.spark.sql.functions._
import org.joda.time.DateTime
import org.sunbird.analytics.archival.util.{ArchivalMetaDataStoreJob, ArchivalRequest}

case class Period(year: Int, weekOfYear: Int)

case class BatchPartition(batchId: String, period: Period)
case class Request(archivalTable: String, keyspace: Option[String], query: Option[String] = Option(""), batchId: Option[String] = Option(""), collectionId: Option[String]=Option(""), date: Option[String] = Option(""))
case class ArchivalMetrics(batchId: Option[String],
period: Period,
totalArchivedRecords: Option[Long],
pendingWeeksOfYears: Option[Long],
totalDeletedRecords: Option[Long],
totalDistinctBatches: Long
)

trait BaseArchivalJob extends BaseReportsJob with IJob with ArchivalMetaDataStoreJob with Serializable {

private val partitionCols = List("batch_id", "year", "week_of_year")
private val columnWithOrder = List("course_id", "batch_id", "user_id", "content_id", "attempt_id", "created_on", "grand_total", "last_attempted_on", "total_max_score", "total_score", "updated_on", "question")
val cassandraUrl = "org.apache.spark.sql.cassandra"

def main(config: String)(implicit sc: Option[SparkContext] = None, fc: Option[FrameworkContext] = None): Unit = {
implicit val className: String = getClassName;
JobLogger.init(jobName)
JobLogger.start(s"$jobName started executing - ver3", Option(Map("config" -> config, "model" -> jobName)))
implicit val jobConfig: JobConfig = JSONUtils.deserialize[JobConfig](config)
implicit val spark: SparkSession = openSparkSession(jobConfig)
implicit val frameworkContext: FrameworkContext = getReportingFrameworkContext()

try {
val res = CommonUtil.time(execute());
JobLogger.end(s"$jobName completed execution", "SUCCESS", None)
} catch {
case ex: Exception => ex.printStackTrace()
JobLogger.log(ex.getMessage, None, ERROR);
JobLogger.end(jobName + " execution failed", "FAILED", Option(Map("model" -> jobName, "statusMsg" -> ex.getMessage)));
}
finally {
frameworkContext.closeContext();
spark.close()
}


}

def init()(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): Unit = {
spark.setCassandraConf("LMSCluster", CassandraConnectorConf.ConnectionHostParam.option(AppConf.getConfig("sunbird.courses.cluster.host")))
}

// def dataFilter(): Unit = {}
// def dateFormat(): String;
def getClassName: String;

def execute()(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): Unit = {
val modelParams = config.modelParams.getOrElse(Map[String, Option[AnyRef]]());
val requestConfig = JSONUtils.deserialize[Request](JSONUtils.serialize(modelParams.getOrElse("request", Request).asInstanceOf[Map[String,AnyRef]]))
val archivalTable = requestConfig.archivalTable
val archivalKeyspace = requestConfig.keyspace.getOrElse(AppConf.getConfig("sunbird.courses.keyspace"))

val batchId: String = requestConfig.batchId.getOrElse("")
val date: String = requestConfig.date.getOrElse("")
val mode: String = modelParams.getOrElse("mode","archive").asInstanceOf[String]

println("modelParams: " + modelParams)
kumarks1122 marked this conversation as resolved.
Show resolved Hide resolved
println("archival request: " + requestConfig)
val archivalTableData: DataFrame = getArchivalData(archivalTable, archivalKeyspace,Option(batchId),Option(date))
println("archivalTableData ")
archivalTableData.show(false)

mode.toLowerCase() match {
case "archival" =>
archiveData(archivalTableData, requestConfig)
case "delete" =>
deleteArchivedData(archivalTableData,requestConfig)
}
}

def archiveData(data: DataFrame, requestConfig: Request)(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): Unit = {
val requests = getRequests(jobId, requestConfig.batchId)
println("requestLength: " + requests.length)
try {
var dataDF = processArchival(data, requestConfig)
if(requests.length > 0) {
for (request <- requests) {
if (request.archival_status.equals("SUCCESS")) {
val request_data = JSONUtils.deserialize[Map[String, AnyRef]](request.request_data)
dataDF = dataDF.filter(
col("batch_id").equalTo(request.batch_id) &&
kumarks1122 marked this conversation as resolved.
Show resolved Hide resolved
concat(col("year"), lit("-"), col("week_of_year")) =!= lit(request_data.get("year").get + "-" + request_data.get("week").get)
)
}
}
}

val archiveBatchList = dataDF.groupBy(partitionCols.head, partitionCols.tail: _*).count().collect()

val batchesToArchive: Map[String, Array[BatchPartition]] = archiveBatchList.map(f => BatchPartition(f.get(0).asInstanceOf[String], Period(f.get(1).asInstanceOf[Int], f.get(2).asInstanceOf[Int]))).groupBy(_.batchId)

archiveBatches(batchesToArchive, dataDF, requestConfig)
} catch {
case ex: Exception =>
ex.printStackTrace()
}
}

def generatePeriodInData(data: DataFrame): DataFrame = {
data.withColumn("updated_on", to_timestamp(col("updated_on")))
.withColumn("year", year(col("updated_on")))
.withColumn("week_of_year", weekofyear(col("updated_on")))
.withColumn("question", to_json(col("question")))
}

def archiveBatches(batchesToArchive: Map[String, Array[BatchPartition]], data: DataFrame, requestConfig: Request)(implicit config: JobConfig): Unit = {
batchesToArchive.foreach(batches => {
val processingBatch = new AtomicInteger(batches._2.length)
JobLogger.log(s"Started Processing to archive the data", Some(Map("batch_id" -> batches._1, "total_part_files_to_archive" -> processingBatch)))

// Loop through the week_num & year batch partition
batches._2.map((batch: BatchPartition) => {
kumarks1122 marked this conversation as resolved.
Show resolved Hide resolved
val filteredDF = data.filter(col("batch_id") === batch.batchId && col("year") === batch.period.year && col("week_of_year") === batch.period.weekOfYear).select(columnWithOrder.head, columnWithOrder.tail: _*)
val collectionId = filteredDF.first().getAs[String]("course_id")
var archivalRequest = getRequest(collectionId, batch.batchId, batch.period.year, batch.period.weekOfYear)

if (archivalRequest == null) {
kumarks1122 marked this conversation as resolved.
Show resolved Hide resolved
val request_data = JSONUtils.deserialize[Map[String, AnyRef]](JSONUtils.serialize(requestConfig)) ++ Map[String, Int](
"week" -> batch.period.weekOfYear,
"year"-> batch.period.year
)
archivalRequest = ArchivalRequest("", batch.batchId, collectionId, Option(getReportKey), jobId, None, None, null, null, None, Option(0), JSONUtils.serialize(request_data), None)
}

try {
val urls = upload(filteredDF, batch) // Upload the archived files into blob store
archivalRequest.blob_url = Option(urls)
JobLogger.log(s"Data is archived and Processing the remaining part files ", None, Level.INFO)
markRequestAsSuccess(archivalRequest, requestConfig)
} catch {
case ex: Exception => {
markArchivalRequestAsFailed(archivalRequest, ex.getLocalizedMessage)
}
}
}).foreach((archivalRequest: ArchivalRequest) => {
upsertRequest(archivalRequest)
})

JobLogger.log(s"${batches._1} is successfully archived", Some(Map("batch_id" -> batches._1)), Level.INFO)
})
}

def deleteArchivedData(data: DataFrame, archivalRequest: Request): Unit = {

}

def processArchival(archivalTableData: DataFrame, archiveRequest: Request)(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): DataFrame;

def getArchivalData(table: String, keyspace: String, batchId: Option[String], date: Option[String])(implicit spark: SparkSession, fc: FrameworkContext): DataFrame = {
val archivalTableSettings = Map("table" -> table, "keyspace" -> keyspace, "cluster" -> "LMSCluster")
val archivalDBDF = loadData(archivalTableSettings, cassandraUrl, new StructType())
val batchIdentifier = batchId.getOrElse(null)

if (batchIdentifier.nonEmpty) {
archivalDBDF.filter(col("batch_id") === batchIdentifier).persist()
} else {
archivalDBDF
}
}

def getWeekAndYearVal(date: String): Period = {
if (null != date && date.nonEmpty) {
val dt = new DateTime(date)
Period(year = dt.getYear, weekOfYear = dt.getWeekOfWeekyear)
} else {
Period(0, 0)
}
}

def upload(archivedData: DataFrame, batch: BatchPartition)(implicit jobConfig: JobConfig): List[String] = {
val blobConfig = jobConfig.modelParams.get("blobConfig").asInstanceOf[Map[String, AnyRef]]
val reportPath: String = blobConfig.getOrElse("reportPath", "archived-data/").asInstanceOf[String]
val container = AppConf.getConfig("cloud.container.reports")
val objectKey = AppConf.getConfig("course.metrics.cloud.objectKey")
val fileName = s"${batch.batchId}/${batch.period.year}-${batch.period.weekOfYear}"
kumarks1122 marked this conversation as resolved.
Show resolved Hide resolved
val storageConfig = getStorageConfig(jobConfig, objectKey)
JobLogger.log(s"Uploading reports to blob storage", None, Level.INFO)
archivedData.saveToBlobStore(storageConfig, "csv", s"$reportPath$fileName-${System.currentTimeMillis()}", Option(Map("header" -> "true", "codec" -> "org.apache.hadoop.io.compress.GzipCodec")), None, fileExt=Some("csv.gz"))
}

def jobId: String;
def jobName: String;
def getReportPath: String;
def getReportKey: String;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package org.sunbird.analytics.archival.util

import java.security.MessageDigest
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet, Timestamp}
import java.util.Properties
import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.{Encoders, SparkSession}
import org.apache.spark.sql.functions.{col, lit}
import org.ekstep.analytics.framework.{FrameworkContext, JobConfig}
import org.ekstep.analytics.framework.Level.INFO
import org.ekstep.analytics.framework.conf.AppConf
import org.ekstep.analytics.framework.util.{CommonUtil, JSONUtils, JobLogger}
import org.sunbird.analytics.archival.Request

case class ArchivalRequest(request_id: String, batch_id: String, collection_id: String, resource_type: Option[String], job_id: String,
var archival_date: Option[Long],var completion_date: Option[Long],var archival_status: String,var deletion_status: String,
var blob_url: Option[List[String]],var iteration: Option[Int], request_data: String, var err_message: Option[String])

trait ArchivalMetaDataStoreJob {

implicit val className: String = getClassName;
val connProperties: Properties = CommonUtil.getPostgresConnectionProps()
val db: String = AppConf.getConfig("postgres.db")
val url: String = AppConf.getConfig("postgres.url") + s"$db"
val requestsTable: String = AppConf.getConfig("postgres.table.archival_request")
val dbc: Connection = DriverManager.getConnection(url, connProperties.getProperty("user"), connProperties.getProperty("password"));
dbc.setAutoCommit(true);

def getClassName(): String;

def cleanUp() {
dbc.close();
}

def getRequests(jobId: String, batchId: Option[String])(implicit spark: SparkSession, fc: FrameworkContext): Array[ArchivalRequest] = {
println("jobid: " + jobId + " batchid: " + batchId)
kumarks1122 marked this conversation as resolved.
Show resolved Hide resolved
val encoder = Encoders.product[ArchivalRequest]
val archivalConfigsDf = spark.read.jdbc(url, requestsTable, connProperties)
.where(col("job_id") === jobId && col("iteration") < 3)
println("archivalConfigDF:")
kumarks1122 marked this conversation as resolved.
Show resolved Hide resolved
archivalConfigsDf.show(false)
kumarks1122 marked this conversation as resolved.
Show resolved Hide resolved

val filteredReportConfigDf = if (batchId.isDefined) {
val filteredArchivalConfig = archivalConfigsDf.filter(col("batch_id").equalTo(batchId.get))
if (filteredArchivalConfig.count() > 0) filteredArchivalConfig else archivalConfigsDf
} else archivalConfigsDf
println("filteredtReportCOnfig: ")
kumarks1122 marked this conversation as resolved.
Show resolved Hide resolved
filteredReportConfigDf.show(false)
JobLogger.log("fetched records count" + filteredReportConfigDf.count(), None, INFO)
val requests = filteredReportConfigDf.as[ArchivalRequest](encoder).collect()
requests
}

def getRequestID(collectionId: String, batchId: String, year: Int, week: Int): String = {
kumarks1122 marked this conversation as resolved.
Show resolved Hide resolved
val requestComb = s"$collectionId:$batchId:$year:$week"
MessageDigest.getInstance("MD5").digest(requestComb.getBytes).map("%02X".format(_)).mkString
}

def getRequest(collectionId: String, batchId: String, year: Int, week: Int): ArchivalRequest = {
val requestId = getRequestID(collectionId, batchId, year, week)
val archivalRequest = s"""select * from $requestsTable where request_id = '$requestId' limit 1"""
val pstmt: PreparedStatement = dbc.prepareStatement(archivalRequest);
val resultSet = pstmt.executeQuery()

if (resultSet.next()) getArchivalRequest(resultSet) else null
}

private def getArchivalRequest(resultSet: ResultSet): ArchivalRequest = {
ArchivalRequest(
resultSet.getString("request_id"),
resultSet.getString("batch_id"),
resultSet.getString("collection_id"),
Option(resultSet.getString("resource_type")),
resultSet.getString("job_id"),
Option(resultSet.getTimestamp("archival_date").getTime),
if (resultSet.getTimestamp("completion_date") != null) Option(resultSet.getTimestamp("completion_date").getTime) else None,
resultSet.getString("archival_status"),
resultSet.getString("deletion_status"),
if (resultSet.getArray("blob_url") != null) Option(resultSet.getArray("blob_url").getArray().asInstanceOf[Array[String]].toList) else None,
Option(resultSet.getInt("iteration")),
resultSet.getString("request_data"),
Option(resultSet.getString("err_message"))
)
}

def markArchivalRequestAsFailed(request: ArchivalRequest, failedMsg: String): ArchivalRequest = {
request.archival_status = "FAILED";
request.archival_date = Option(System.currentTimeMillis());
request.iteration = Option(request.iteration.getOrElse(0) + 1);
request.err_message = Option(failedMsg);
request
}

def markDeletionRequestAsFailed(request: ArchivalRequest, failedMsg: String): ArchivalRequest = {
request.deletion_status = "FAILED";
request.archival_date = Option(System.currentTimeMillis());
request.iteration = Option(request.iteration.getOrElse(0) + 1);
request.err_message = Option(failedMsg);
request
}

def createRequest(request: ArchivalRequest) = {
val insertQry = s"INSERT INTO $requestsTable (request_id, batch_id, collection_id, resource_type, job_id, archival_date, completion_date, archival_status, " +
s"deletion_status, blob_url, iteration, request_data, err_message) VALUES (?,?,?,?,?,?,?,?,?,?,?,?::json,?)"
val pstmt: PreparedStatement = dbc.prepareStatement(insertQry);
val request_data = JSONUtils.deserialize[Map[String, AnyRef]](request.request_data)
val requestId = getRequestID(request.collection_id, request.batch_id, request_data("year").asInstanceOf[Int], request_data("week").asInstanceOf[Int])
pstmt.setString(1, requestId);
pstmt.setString(2, request.batch_id);
pstmt.setString(3, request.collection_id);
pstmt.setString(4, request.resource_type.getOrElse("assessment"));
pstmt.setString(5, request.job_id);
pstmt.setTimestamp(6, if (request.archival_date.isDefined) new Timestamp(request.archival_date.get) else null);
pstmt.setTimestamp(7, if (request.completion_date.isDefined) new Timestamp(request.completion_date.get) else null);
pstmt.setString(8, request.archival_status);
pstmt.setString(9, request.deletion_status);
val blobURLs = request.blob_url.getOrElse(List()).toArray.asInstanceOf[Array[Object]];
pstmt.setArray(10, dbc.createArrayOf("text", blobURLs))
pstmt.setInt(11, request.iteration.getOrElse(0))
pstmt.setString(12, request.request_data)
pstmt.setString(13, StringUtils.abbreviate(request.err_message.getOrElse(""), 300));

pstmt.execute()
}

def upsertRequest(request: ArchivalRequest): Unit = {
if (request.request_id.isEmpty) {
createRequest(request)
} else {
updateRequest(request)
}
}

def updateRequest(request: ArchivalRequest): Unit = {
val updateQry = s"UPDATE $requestsTable SET blob_url=?, iteration = ?, archival_date=?, completion_date=?, " +
s"archival_status=?, deletion_status=? WHERE request_id=?";
val pstmt: PreparedStatement = dbc.prepareStatement(updateQry)

val blobURLs = request.blob_url.getOrElse(List()).toArray.asInstanceOf[Array[Object]];
pstmt.setArray(1, dbc.createArrayOf("text", blobURLs))
pstmt.setInt(2, request.iteration.get);
pstmt.setTimestamp(3, if (request.archival_date.isDefined) new Timestamp(request.archival_date.get) else null);
pstmt.setTimestamp(4, if (request.completion_date.isDefined) new Timestamp(request.completion_date.get) else null);
pstmt.setString(5, request.archival_status);
pstmt.setString(6, request.deletion_status);
pstmt.setString(7, request.request_id);

pstmt.execute()
}

def markRequestAsSuccess(request: ArchivalRequest, requestConfig: Request): ArchivalRequest = {
request.archival_status = "SUCCESS";
request.archival_date = Option(System.currentTimeMillis())
request
}

}
3 changes: 3 additions & 0 deletions data-products/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -199,3 +199,6 @@ uci.conversation.postgres.pass="postgres"
uci.exhaust.store.prefix="src/test/resources/exhaust-reports/"
uci.encryption.secret="123443957398423479784298247982789428fldkssd"
// END OF UCI Related Job Configs

//START of Archival Config
postgres.table.archival_request="archival_metadata"
Loading