Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #SB-24872: Validate invalid batchId in data-exhaust job #413

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import java.util.concurrent.atomic.AtomicInteger

import com.datastax.spark.connector.cql.CassandraConnectorConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Encoders, SparkSession}
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, SparkSession, Row}
import org.apache.spark.sql.cassandra._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -244,7 +244,7 @@ trait BaseCollectionExhaustJob extends BaseReportsJob with IJob with OnDemandExh
val encoder = Encoders.product[CollectionBatch];
val collectionBatches = getCollectionBatchDF(false);
if (batchId.isDefined || batchFilter.isDefined) {
val batches = if (batchId.isDefined) collectionBatches.filter(col("batchid") === batchId.get) else collectionBatches.filter(col("batchid").isin(batchFilter.get: _*))
val batches = validateBatches(collectionBatches, batchId, batchFilter)
val collectionIds = batches.select("courseid").dropDuplicates().collect().map(f => f.get(0));
val collectionDF = searchContent(Map("request" -> Map("filters" -> Map("identifier" -> collectionIds, "status" -> Array("Live", "Unlisted", "Retired")), "fields" -> Array("channel", "identifier", "name", "userConsent"))));
val joinedDF = batches.join(collectionDF, batches("courseid") === collectionDF("identifier"), "inner");
Expand All @@ -264,14 +264,38 @@ trait BaseCollectionExhaustJob extends BaseReportsJob with IJob with OnDemandExh
}
}

/**
*
* @param collectionBatches, batchId, batchFilter
* If batchFilter is defined
* Step 1: Filter the duplictae batches from batchFilter list
* Common Step
* Step 2: Validate if the batchid is correct by checking in coursebatch table
*
* @return Dataset[Row] of valid batchid
*/
def validateBatches(collectionBatches: DataFrame, batchId: Option[String], batchFilter: Option[List[String]]): Dataset[Row] = {
if (batchId.isDefined) {
collectionBatches.filter(col("batchid") === batchId.get)
} else {
/**
* Filter out the duplicate batches from batchFilter
* eg: Input: List["batch-001", "batch-002", "batch-001"]
* Output: List["batch-001", "batch-002"]
*/
val distinctBatch = batchFilter.get.distinct
if (batchFilter.size != distinctBatch.size) JobLogger.log("Duplicate Batches are filtered:: TotalDistinctBatches: " + distinctBatch.size)
collectionBatches.filter(col("batchid").isin(distinctBatch: _*))
}
}

def processBatches(userCachedDF: DataFrame, collectionBatches: List[CollectionBatch], storageConfig: StorageConfig, requestId: Option[String], requestChannel: Option[String], processedRequests: List[ProcessedRequest] )(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): List[CollectionBatchResponse] = {

var processedCount = if(processedRequests.isEmpty) 0 else processedRequests.filter(f => f.channel.equals(requestChannel.getOrElse(""))).size
var processedSize = if(processedRequests.isEmpty) 0 else processedRequests.filter(f => f.channel.equals(requestChannel.getOrElse(""))).map(f => f.fileSize).sum
JobLogger.log("Channel details at processBatches", Some(Map("channel" -> requestChannel, "file size" -> processedSize, "completed batches" -> processedCount)), INFO)

var newFileSize: Long = 0

for (batch <- filterCollectionBatches(collectionBatches)) yield {
if (checkRequestProcessCriteria(processedCount, processedSize)) {
val userEnrolmentBatchDF = getUserEnrolmentDF(batch.collectionId, batch.batchId, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,4 +402,82 @@ class TestProgressExhaustJob extends BaseReportSpec with MockFactory with BaseRe
batch1Results.filter(col("User UUID") === "user-002").collect().map(_ (1)).toList(0) should be("15/11/2019")
batch1Results.filter(col("User UUID") === "user-003").collect().map(_ (1)).toList(0) should be("15/11/2019")
}

it should "generate report validating and filtering duplicate batches" in {
EmbeddedPostgresql.execute(s"TRUNCATE $jobRequestTable")
EmbeddedPostgresql.execute("INSERT INTO job_request (tag, request_id, job_id, status, request_data, requested_by, requested_channel, dt_job_submitted, download_urls, dt_file_created, dt_job_completed, execution_time, err_message ,iteration, encryption_key) VALUES ('do_1130928636168192001667_batch-001:channel-01', '37564CF8F134EE7532F125651B51D17F', 'progress-exhaust', 'SUBMITTED', '{\"batchFilter\": [\"batch-01\", \"batch-001\", \"batch-001\"]}', 'user-002', 'b00bc992ef25f1a9a8d63291e20efc8d', '2020-10-19 05:58:18.666', '{}', NULL, NULL, 0, '' ,0, 'test12');")

implicit val fc = new FrameworkContext()
val strConfig = """{"search":{"type":"none"},"model":"org.sunbird.analytics.exhaust.collection.ProgressExhaustJob","modelParams":{"store":"local","mode":"OnDemand","batchFilters":["TPD"],"searchFilter":{},"sparkElasticsearchConnectionHost":"{{ sunbird_es_host }}","sparkRedisConnectionHost":"localhost","sparkUserDbRedisPort":6341,"sparkUserDbRedisIndex":"0","sparkCassandraConnectionHost":"localhost","fromDate":"","toDate":"","storageContainer":""},"parallelization":8,"appName":"Progress Exhaust"}"""
val jobConfig = JSONUtils.deserialize[JobConfig](strConfig)
implicit val config = jobConfig

ProgressExhaustJob.execute()

val outputLocation = AppConf.getConfig("collection.exhaust.store.prefix")
val outputDir = "progress-exhaust"
val batch1 = "batch-001"
val requestId = "37564CF8F134EE7532F125651B51D17F"
val filePath = ProgressExhaustJob.getFilePath(batch1, requestId)
val jobName = ProgressExhaustJob.jobName()

implicit val responseExhaustEncoder = Encoders.product[ProgressExhaustReport]
val batch1Results = spark.read.format("csv").option("header", "true")
.load(s"$outputLocation/$filePath.csv").as[ProgressExhaustReport].collectAsList().asScala

batch1Results.size should be (4)
batch1Results.map(f => f.`Collection Id`).toList should contain atLeastOneElementOf List("do_1130928636168192001667")
batch1Results.map(f => f.`Collection Name`).toList should contain atLeastOneElementOf List("24 aug course")
batch1Results.map(f => f.`Batch Id`).toList should contain atLeastOneElementOf List("BatchId_batch-001")
batch1Results.map(f => f.`Batch Name`).toList should contain atLeastOneElementOf List("Basic Java")
batch1Results.map {res => res.`User UUID`}.toList should contain theSameElementsAs List("user-001", "user-002", "user-003", "user-004")
batch1Results.map {res => res.`State`}.toList should contain theSameElementsAs List("Karnataka", "Andhra Pradesh", "Karnataka", "Delhi")
batch1Results.map {res => res.`District`}.toList should contain theSameElementsAs List("bengaluru", "bengaluru", "bengaluru", "babarpur")
batch1Results.map(f => f.`Enrolment Date`).toList should contain allElementsOf List("15/11/2019")
batch1Results.map(f => f.`Completion Date`).toList should contain allElementsOf List(null)
batch1Results.map(f => f.`Progress`).toList should contain allElementsOf List("100")
batch1Results.map(f => f.`Cluster Name`).toList should contain atLeastOneElementOf List("CLUSTER1")
batch1Results.map(f => f.`User Type`).toList should contain atLeastOneElementOf List("administrator")
batch1Results.map(f => f.`User Sub Type`).toList should contain atLeastOneElementOf List("deo")

val pResponse = EmbeddedPostgresql.executeQuery("SELECT * FROM job_request WHERE job_id='progress-exhaust'")
val reportDate = getDate("yyyyMMdd").format(Calendar.getInstance().getTime())

while(pResponse.next()) {
pResponse.getString("status") should be ("SUCCESS")
pResponse.getString("err_message") should be ("")
pResponse.getString("dt_job_submitted") should be ("2020-10-19 05:58:18.666")
pResponse.getString("download_urls") should be (s"""{reports/progress-exhaust/$requestId/batch-001_progress_${reportDate}.zip}""")
pResponse.getString("dt_file_created") should be (null)
pResponse.getString("iteration") should be ("0")
}

new HadoopFileUtil().delete(spark.sparkContext.hadoopConfiguration, outputLocation)
}

it should "mark request as failed if all batches are invalid in request_data" in {
EmbeddedPostgresql.execute(s"TRUNCATE $jobRequestTable")
EmbeddedPostgresql.execute("INSERT INTO job_request (tag, request_id, job_id, status, request_data, requested_by, requested_channel, dt_job_submitted, download_urls, dt_file_created, dt_job_completed, execution_time, err_message ,iteration, encryption_key) VALUES ('do_1130928636168192001667_batch-001:channel-01', '37564CF8F134EE7532F125651B51D17F', 'progress-exhaust', 'SUBMITTED', '{\"batchFilter\": [\"batch-01\", \"batch-02\"]}', 'user-002', 'b00bc992ef25f1a9a8d63291e20efc8d', '2020-10-19 05:58:18.666', '{}', NULL, NULL, 0, '' ,0, 'test12');")

implicit val fc = new FrameworkContext()
val strConfig = """{"search":{"type":"none"},"model":"org.sunbird.analytics.exhaust.collection.ProgressExhaustJob","modelParams":{"store":"local","mode":"OnDemand","batchFilters":["TPD"],"searchFilter":{},"sparkElasticsearchConnectionHost":"{{ sunbird_es_host }}","sparkRedisConnectionHost":"localhost","sparkUserDbRedisPort":6341,"sparkUserDbRedisIndex":"0","sparkCassandraConnectionHost":"localhost","fromDate":"","toDate":"","storageContainer":""},"parallelization":8,"appName":"Progress Exhaust"}"""
val jobConfig = JSONUtils.deserialize[JobConfig](strConfig)
implicit val config = jobConfig

ProgressExhaustJob.execute()

val pResponse = EmbeddedPostgresql.executeQuery("SELECT * FROM job_request WHERE job_id='progress-exhaust'")
val reportDate = getDate("yyyyMMdd").format(Calendar.getInstance().getTime())

while(pResponse.next()) {
pResponse.getString("status") should be ("FAILED")
pResponse.getString("request_data") should be ("""{"batchFilter": ["batch-01", "batch-02"]}""")
pResponse.getString("err_message") should be ("No data found")
pResponse.getString("dt_job_submitted") should be ("2020-10-19 05:58:18.666")
pResponse.getString("download_urls") should be (s"""{}""")
pResponse.getString("dt_file_created") should be (null)
pResponse.getString("iteration") should be ("1")
}

}
}