From 91efd912c0bb8afde0321ad6283c02b544278b7b Mon Sep 17 00:00:00 2001 From: Ramachandran G Date: Thu, 19 Sep 2024 19:07:24 +0530 Subject: [PATCH 1/7] * Add change for unique tag names --- .scalafmt.conf | 2 +- .../kusto/spark/datasink/KustoWriter.scala | 304 ++++++++---------- 2 files changed, 141 insertions(+), 165 deletions(-) diff --git a/.scalafmt.conf b/.scalafmt.conf index 202f35df..6eef5b04 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -24,7 +24,7 @@ optIn = { } danglingParentheses.preset = false docstrings.style = Asterisk -maxColumn = 98 +maxColumn = 120 runner.dialect = scala212 fileOverride { "glob:**/src/**/scala-2.13/**.scala" { diff --git a/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala b/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala index f7f01f5d..f4851e5a 100644 --- a/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala +++ b/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala @@ -19,14 +19,7 @@ import com.microsoft.kusto.spark.common.KustoCoordinates import com.microsoft.kusto.spark.datasink.FinalizeHelper.finalizeIngestionWhenWorkersSucceeded import com.microsoft.kusto.spark.utils.CslCommandsGenerator.generateTableGetSchemaAsRowsCommand import com.microsoft.kusto.spark.utils.KustoConstants.{IngestSkippedTrace, MaxIngestRetryAttempts} -import com.microsoft.kusto.spark.utils.{ - ExtendedKustoClient, - KustoClientCache, - KustoIngestionUtils, - KustoQueryUtils, - KustoConstants => KCONST, - KustoDataSourceUtils => KDSU -} +import com.microsoft.kusto.spark.utils.{ExtendedKustoClient, KustoClientCache, KustoIngestionUtils, KustoQueryUtils, KustoConstants => KCONST, KustoDataSourceUtils => KDSU} import io.github.resilience4j.retry.RetryConfig import org.apache.spark.TaskContext import org.apache.spark.sql.DataFrame @@ -47,6 +40,7 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.util.{Failure, Success, Try} import java.time.ZoneId import java.time.format.DateTimeFormatter +import java.util.concurrent.ConcurrentHashMap object KustoWriter { private val className = this.getClass.getSimpleName @@ -86,10 +80,7 @@ object KustoWriter { val stagingTableIngestionProperties = getSparkIngestionProperties(writeOptions) val schemaShowCommandResult = kustoClient - .executeEngine( - tableCoordinates.database, - generateTableGetSchemaAsRowsCommand(tableCoordinates.table.get), - crp) + .executeEngine(tableCoordinates.database, generateTableGetSchemaAsRowsCommand(tableCoordinates.table.get), crp) .getPrimaryResults val targetSchema = @@ -105,11 +96,8 @@ object KustoWriter { writeOptions.copy(ingestionProperties = Some(stagingTableIngestionProperties.toString())) val tableExists = schemaShowCommandResult.count() > 0 - val shouldIngest = kustoClient.shouldIngestData( - tableCoordinates, - writeOptions.ingestionProperties, - tableExists, - crp) + val shouldIngest = + kustoClient.shouldIngestData(tableCoordinates, writeOptions.ingestionProperties, tableExists, crp) if (!shouldIngest) { KDSU.logInfo(className, s"$IngestSkippedTrace '$table'") @@ -151,9 +139,7 @@ object KustoWriter { } if (stagingTableIngestionProperties.flushImmediately) { - KDSU.logWarn( - className, - "It's not recommended to set flushImmediately to true on production") + KDSU.logWarn(className, "It's not recommended to set flushImmediately to true on production") } val cloudInfo = CloudInfo.retrieveCloudInfoForCluster(kustoClient.ingestKcsb.getClusterUrl) val rdd = data.queryExecution.toRdd @@ -200,9 +186,7 @@ object KustoWriter { tableCoordinates.database, table, shouldNotThrow = true) - KDSU.logError( - className, - "The exception is not visible in the driver since we're in async mode") + KDSU.logError(className, "The exception is not visible in the driver since we're in async mode") } } } else { @@ -214,10 +198,7 @@ object KustoWriter { case exception: Exception => if (writeOptions.isTransactionalMode) { if (writeOptions.userTempTableName.isEmpty) { - kustoClient.cleanupIngestionByProducts( - tableCoordinates.database, - tmpTableName, - crp) + kustoClient.cleanupIngestionByProducts(tableCoordinates.database, tmpTableName, crp) } } /* Throwing the exception will abort the job (explicitly on the driver) */ @@ -263,9 +244,8 @@ object KustoWriter { } } - def ingestRowsIntoKusto( + private def ingestRowsIntoKusto( rows: Iterator[InternalRow], - ingestClient: IngestClient, partitionsResults: CollectionAccumulator[PartitionResult], batchIdForTracing: String, parameters: KustoWriteResource): Unit = { @@ -273,23 +253,20 @@ object KustoWriter { val ingestionProperties = getIngestionProperties( parameters.writeOptions, parameters.coordinates.database, - if (parameters.writeOptions.isTransactionalMode) parameters.tmpTableName - else parameters.coordinates.table.get) + if (parameters.writeOptions.isTransactionalMode) { + parameters.tmpTableName + } + else { + parameters.coordinates.table.get + }) if (parameters.writeOptions.isTransactionalMode) { ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.TABLE) - ingestionProperties.setReportLevel( - IngestionProperties.IngestionReportLevel.FAILURES_AND_SUCCESSES) + ingestionProperties.setReportLevel(IngestionProperties.IngestionReportLevel.FAILURES_AND_SUCCESSES) } ingestionProperties.setDataFormat(DataFormat.CSV.name) /* A try block may be redundant here. An exception thrown has to be propagated depending on the exception */ - ingestRows( - rows, - parameters, - ingestClient, - ingestionProperties, - partitionsResults, - batchIdForTracing) + ingestRows(rows, parameters, ingestionProperties, partitionsResults, batchIdForTracing) KDSU.logInfo( className, s"Ingesting from blob(s) partition: ${TaskContext.getPartitionId()} requestId: " + @@ -310,8 +287,7 @@ object KustoWriter { } } - private def getSparkIngestionProperties( - writeOptions: WriteOptions): SparkIngestionProperties = { + private def getSparkIngestionProperties(writeOptions: WriteOptions): SparkIngestionProperties = { val ingestionProperties = if (writeOptions.ingestionProperties.isDefined) SparkIngestionProperties.fromString(writeOptions.ingestionProperties.get) @@ -339,7 +315,6 @@ object KustoWriter { parameters.coordinates.clusterAlias) val ingestClient = clientCache.ingestClient CloudInfo.manuallyAddToCache(clientCache.ingestKcsb.getClusterUrl, parameters.cloudInfo); - val reqRetryOpts = new RequestRetryOptions( RetryPolicyType.FIXED, KCONST.QueueRetryAttempts, @@ -350,7 +325,7 @@ object KustoWriter { ingestClient.setQueueRequestOptions(reqRetryOpts) // We force blocking here, since the driver can only complete the ingestion process // once all partitions are ingested into the temporary table - ingestRowsIntoKusto(rows, ingestClient, partitionsResults, batchIdForTracing, parameters) + ingestRowsIntoKusto(rows, partitionsResults, batchIdForTracing, parameters) } private def createBlobWriter( @@ -362,18 +337,16 @@ object KustoWriter { blobUUID: String): BlobWriteResource = { val now = Instant.now() - val blobName = s"${KustoQueryUtils.simplifyName( - tableCoordinates.database)}_${tmpTableName}_${blobUUID}_${partitionId}_${blobNumber}_${formatter - .format(now)}_spark.csv.gz" + val blobName = + s"${KustoQueryUtils.simplifyName(tableCoordinates.database)}_${tmpTableName}_${blobUUID}_${partitionId}_${blobNumber}_${formatter + .format(now)}_spark.csv.gz" val containerAndSas = client.getTempBlobForIngestion - val currentBlob = new CloudBlockBlob( - new URI(s"${containerAndSas.containerUrl}/$blobName${containerAndSas.sas}")) + val currentBlob = new CloudBlockBlob(new URI(s"${containerAndSas.containerUrl}/$blobName${containerAndSas.sas}")) val currentSas = containerAndSas.sas val options = new BlobRequestOptions() options.setConcurrentRequestCount(4) // Should be configured from outside - val gzip: GZIPOutputStream = new GZIPOutputStream( - currentBlob.openOutputStream(null, options, null)) + val gzip: GZIPOutputStream = new GZIPOutputStream(currentBlob.openOutputStream(null, options, null)) val writer = new OutputStreamWriter(gzip, StandardCharsets.UTF_8) @@ -386,143 +359,146 @@ object KustoWriter { private[kusto] def ingestRows( rows: Iterator[InternalRow], parameters: KustoWriteResource, - ingestClient: IngestClient, ingestionProperties: IngestionProperties, partitionsResults: CollectionAccumulator[PartitionResult], batchIdForTracing: String): Unit = { - val partitionId = TaskContext.getPartitionId val partitionIdString = TaskContext.getPartitionId.toString - def ingest( - blobResource: BlobWriteResource, - size: Long, - sas: String, - flushImmediately: Boolean = false, - blobUUID: String, - kustoClient: ExtendedKustoClient): Unit = { - var props = ingestionProperties - val blobUri = blobResource.blob.getStorageUri.getPrimaryUri.toString - if (parameters.writeOptions.ensureNoDupBlobs || (!props.getFlushImmediately && flushImmediately)) { - // Need to copy the ingestionProperties so that only this blob ingestion will be effected - props = SparkIngestionProperties.cloneIngestionProperties(ingestionProperties) - } - - if (parameters.writeOptions.ensureNoDupBlobs) { - val pref = KDSU.getDedupTagsPrefix(parameters.writeOptions.requestId, batchIdForTracing) - val tag = pref + blobUUID - val ingestIfNotExist = new util.ArrayList[String] - ingestIfNotExist.addAll(props.getIngestIfNotExists) - val ingestBy: util.List[String] = new util.ArrayList[String] - ingestBy.addAll(props.getIngestByTags) - - ingestBy.add(tag) - ingestIfNotExist.add(tag) - props.setIngestByTags(ingestBy) - props.setIngestIfNotExists(ingestIfNotExist) - } - - if (!props.getFlushImmediately && flushImmediately) { - props.setFlushImmediately(true) - } - // write the data here - val partitionsResult = KDSU.retryApplyFunction( - () => { - Try( - ingestClient.ingestFromBlob( - new BlobSourceInfo(blobUri + sas, size, UUID.randomUUID()), - props)) match { - case Success(x) => - - val blobUrlWithSas = - s"${blobResource.blob.getStorageUri.getPrimaryUri.toString}${blobResource.sas}" - val containerWithSas = new ContainerWithSas(blobUrlWithSas, null) - kustoClient.reportIngestionResult(containerWithSas, success = true) - x - case Failure(e: Throwable) => - KDSU.reportExceptionAndThrow( - className, - e, - "Queueing blob for ingestion in partition " + - s"$partitionIdString for requestId: '${parameters.writeOptions.requestId}") - val blobUrlWithSas = - s"${blobResource.blob.getStorageUri.getPrimaryUri.toString}${blobResource.sas}" - val containerWithSas = new ContainerWithSas(blobUrlWithSas, null) - kustoClient.reportIngestionResult(containerWithSas, success = false) - null - } - }, - this.retryConfig, - "Ingest into Kusto") - if (parameters.writeOptions.isTransactionalMode) { - partitionsResults.add(PartitionResult(partitionsResult, partitionId)) - } - KDSU.logInfo( - className, - s"Queued blob for ingestion in partition $partitionIdString " + - s"for requestId: '${parameters.writeOptions.requestId}") - } val kustoClient = KustoClientCache.getClient( parameters.coordinates.clusterUrl, parameters.authentication, parameters.coordinates.ingestionUrl, parameters.coordinates.clusterAlias) + val blobIdMap = new ConcurrentHashMap[String,Int]() + blobIdMap.put(parameters.writeOptions.requestId,0) val maxBlobSize = parameters.writeOptions.batchLimit * KCONST.OneMegaByte var curBlobUUID = UUID.randomUUID().toString // This blobWriter will be used later to write the rows to blob storage from which it will be ingested to Kusto - val initialBlobWriter: BlobWriteResource = createBlobWriter( - parameters.coordinates, - parameters.tmpTableName, - kustoClient, - partitionIdString, - 0, - curBlobUUID) + val initialBlobWriter: BlobWriteResource = + createBlobWriter(parameters.coordinates, parameters.tmpTableName, kustoClient, partitionIdString, 0, curBlobUUID) val timeZone = TimeZone.getTimeZone(parameters.writeOptions.timeZone).toZoneId // Serialize rows to ingest and send to blob storage. - val lastBlobWriter = rows.zipWithIndex.foldLeft[BlobWriteResource](initialBlobWriter) { - case (blobWriter, row) => - RowCSVWriterUtils.writeRowAsCSV(row._1, parameters.schema, timeZone, blobWriter.csvWriter) - - val count = blobWriter.csvWriter.getCounter - val shouldNotCommitBlockBlob = count < maxBlobSize - if (shouldNotCommitBlockBlob) { - blobWriter - } else { - KDSU.logInfo( - className, - s"Sealing blob in partition $partitionIdString for requestId: '${parameters.writeOptions.requestId}', " + - s"blob number ${row._2}, with size $count") - finalizeBlobWrite(blobWriter) - ingest( - blobWriter, - blobWriter.csvWriter.getCounter, - blobWriter.sas, - flushImmediately = !parameters.writeOptions.disableFlushImmediately, - curBlobUUID, - kustoClient) - curBlobUUID = UUID.randomUUID().toString - createBlobWriter( - parameters.coordinates, - parameters.tmpTableName, - kustoClient, - partitionIdString, - row._2, - curBlobUUID) - } + val lastBlobWriter = rows.zipWithIndex.foldLeft[BlobWriteResource](initialBlobWriter) { case (blobWriter, row) => + RowCSVWriterUtils.writeRowAsCSV(row._1, parameters.schema, timeZone, blobWriter.csvWriter) + val count = blobWriter.csvWriter.getCounter + val shouldNotCommitBlockBlob = count < maxBlobSize + if (shouldNotCommitBlockBlob) { + blobWriter + } else { + val blobIndexInBatch = blobIdMap.getOrDefault(parameters.writeOptions.requestId,0) + blobIdMap.put(parameters.writeOptions.requestId,blobIndexInBatch+1) + KDSU.logInfo( + className, + s"Sealing blob in partition $partitionIdString for requestId: '${parameters.writeOptions.requestId}', " + + s"blob number ${row._2}, with size $count") + finalizeBlobWrite(blobWriter) + ingest( + blobWriter, + parameters, + ingestionProperties, + flushImmediately = !parameters.writeOptions.disableFlushImmediately, + curBlobUUID, + kustoClient, + partitionsResults, + batchIdForTracing,blobIndexInBatch) + curBlobUUID = UUID.randomUUID().toString + createBlobWriter( + parameters.coordinates, + parameters.tmpTableName, + kustoClient, + partitionIdString, + row._2, + curBlobUUID) + } } - KDSU.logInfo( className, - s"Finished serializing rows in partition $partitionIdString for " + - s"requestId: '${parameters.writeOptions.requestId}' ") + s"Finished serializing rows in partition $partitionIdString for requestId:'${parameters.writeOptions.requestId}'") finalizeBlobWrite(lastBlobWriter) if (lastBlobWriter.csvWriter.getCounter > 0) { + val blobIndexInBatch = blobIdMap.getOrDefault(parameters.writeOptions.requestId,0) + blobIdMap.put(parameters.writeOptions.requestId,blobIndexInBatch+1) ingest( lastBlobWriter, - lastBlobWriter.csvWriter.getCounter, - lastBlobWriter.sas, + parameters, + ingestionProperties, flushImmediately = false, curBlobUUID, - kustoClient) + kustoClient, + partitionsResults, + batchIdForTracing,blobIndexInBatch) + } + } + + def ingest( + blobResource: BlobWriteResource, + parameters: KustoWriteResource, + ingestionProperties: IngestionProperties, + flushImmediately: Boolean = false, + blobUUID: String, + kustoClient: ExtendedKustoClient, + partitionsResults: CollectionAccumulator[PartitionResult], + batchIdForTracing: String,blobIndexInBatch:Int): Unit = { + val size = blobResource.csvWriter.getCounter + val sas = blobResource.sas + val partitionId = TaskContext.getPartitionId + var props = ingestionProperties + val blobUri = blobResource.blob.getStorageUri.getPrimaryUri.toString + if (parameters.writeOptions.ensureNoDupBlobs || (!props.getFlushImmediately && flushImmediately)) { + // Need to copy the ingestionProperties so that only this blob ingestion will be effected + props = SparkIngestionProperties.cloneIngestionProperties(ingestionProperties) + } + if (parameters.writeOptions.ensureNoDupBlobs) { + // The Key change is here + val pref = KDSU.getDedupTagsPrefix(parameters.writeOptions.requestId, blobIndexInBatch.toString) + val tag = pref + blobUUID + val ingestIfNotExist = new util.ArrayList[String] + ingestIfNotExist.addAll(props.getIngestIfNotExists) + val ingestBy: util.List[String] = new util.ArrayList[String] + ingestBy.addAll(props.getIngestByTags) + ingestBy.add(tag) + ingestIfNotExist.add(tag) + props.setIngestByTags(ingestBy) + props.setIngestIfNotExists(ingestIfNotExist) + } + if (!props.getFlushImmediately && flushImmediately) { + props.setFlushImmediately(true) + } + val ingestClient = KustoClientCache + .getClient( + parameters.coordinates.clusterUrl, + parameters.authentication, + parameters.coordinates.ingestionUrl, + parameters.coordinates.clusterAlias) + .ingestClient + // write the data here + val blobUrlWithSas = s"${blobResource.blob.getStorageUri.getPrimaryUri.toString}${blobResource.sas}" + val containerWithSas = new ContainerWithSas(blobUrlWithSas, null) + val partitionsResult = KDSU.retryApplyFunction( + () => { + Try(ingestClient.ingestFromBlob(new BlobSourceInfo(blobUri + sas, size, UUID.randomUUID()), props)) match { + case Success(x) => + + kustoClient.reportIngestionResult(containerWithSas, success = true) + x + case Failure(e: Throwable) => + KDSU.reportExceptionAndThrow( + className, + e, + s"Queueing blob for ingestion in partition ${TaskContext.getPartitionId.toString} for " + + s"requestId: '${parameters.writeOptions.requestId}") + kustoClient.reportIngestionResult(containerWithSas, success = false) + null + } + }, + this.retryConfig, + "Ingest into Kusto") + if (parameters.writeOptions.isTransactionalMode) { + partitionsResults.add(PartitionResult(partitionsResult, partitionId)) } + KDSU.logInfo( + className, + s"Queued blob for ingestion in partition ${TaskContext.getPartitionId.toString} " + + s"for requestId: '${parameters.writeOptions.requestId}") } def finalizeBlobWrite(blobWriteResource: BlobWriteResource): Unit = { From 734bd0c441528848301baec7cbfe02d9818596ce Mon Sep 17 00:00:00 2001 From: Ramachandran G Date: Thu, 19 Sep 2024 19:11:16 +0530 Subject: [PATCH 2/7] * Add change for unique tag names --- .../kusto/spark/datasink/KustoWriter.scala | 40 ++++++++++++------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala b/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala index f4851e5a..0f03a31f 100644 --- a/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala +++ b/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala @@ -19,7 +19,14 @@ import com.microsoft.kusto.spark.common.KustoCoordinates import com.microsoft.kusto.spark.datasink.FinalizeHelper.finalizeIngestionWhenWorkersSucceeded import com.microsoft.kusto.spark.utils.CslCommandsGenerator.generateTableGetSchemaAsRowsCommand import com.microsoft.kusto.spark.utils.KustoConstants.{IngestSkippedTrace, MaxIngestRetryAttempts} -import com.microsoft.kusto.spark.utils.{ExtendedKustoClient, KustoClientCache, KustoIngestionUtils, KustoQueryUtils, KustoConstants => KCONST, KustoDataSourceUtils => KDSU} +import com.microsoft.kusto.spark.utils.{ + ExtendedKustoClient, + KustoClientCache, + KustoIngestionUtils, + KustoQueryUtils, + KustoConstants => KCONST, + KustoDataSourceUtils => KDSU +} import io.github.resilience4j.retry.RetryConfig import org.apache.spark.TaskContext import org.apache.spark.sql.DataFrame @@ -255,8 +262,7 @@ object KustoWriter { parameters.coordinates.database, if (parameters.writeOptions.isTransactionalMode) { parameters.tmpTableName - } - else { + } else { parameters.coordinates.table.get }) @@ -368,8 +374,8 @@ object KustoWriter { parameters.authentication, parameters.coordinates.ingestionUrl, parameters.coordinates.clusterAlias) - val blobIdMap = new ConcurrentHashMap[String,Int]() - blobIdMap.put(parameters.writeOptions.requestId,0) + val blobIdMap = new ConcurrentHashMap[String, Int]() + blobIdMap.put(parameters.writeOptions.requestId, 0) val maxBlobSize = parameters.writeOptions.batchLimit * KCONST.OneMegaByte var curBlobUUID = UUID.randomUUID().toString // This blobWriter will be used later to write the rows to blob storage from which it will be ingested to Kusto @@ -384,8 +390,8 @@ object KustoWriter { if (shouldNotCommitBlockBlob) { blobWriter } else { - val blobIndexInBatch = blobIdMap.getOrDefault(parameters.writeOptions.requestId,0) - blobIdMap.put(parameters.writeOptions.requestId,blobIndexInBatch+1) + val blobIndexInBatch = blobIdMap.getOrDefault(parameters.writeOptions.requestId, 0) + blobIdMap.put(parameters.writeOptions.requestId, blobIndexInBatch + 1) KDSU.logInfo( className, s"Sealing blob in partition $partitionIdString for requestId: '${parameters.writeOptions.requestId}', " + @@ -399,7 +405,8 @@ object KustoWriter { curBlobUUID, kustoClient, partitionsResults, - batchIdForTracing,blobIndexInBatch) + batchIdForTracing, + blobIndexInBatch) curBlobUUID = UUID.randomUUID().toString createBlobWriter( parameters.coordinates, @@ -415,8 +422,8 @@ object KustoWriter { s"Finished serializing rows in partition $partitionIdString for requestId:'${parameters.writeOptions.requestId}'") finalizeBlobWrite(lastBlobWriter) if (lastBlobWriter.csvWriter.getCounter > 0) { - val blobIndexInBatch = blobIdMap.getOrDefault(parameters.writeOptions.requestId,0) - blobIdMap.put(parameters.writeOptions.requestId,blobIndexInBatch+1) + val blobIndexInBatch = blobIdMap.getOrDefault(parameters.writeOptions.requestId, 0) + blobIdMap.put(parameters.writeOptions.requestId, blobIndexInBatch + 1) ingest( lastBlobWriter, parameters, @@ -425,7 +432,8 @@ object KustoWriter { curBlobUUID, kustoClient, partitionsResults, - batchIdForTracing,blobIndexInBatch) + batchIdForTracing, + blobIndexInBatch) } } @@ -437,7 +445,8 @@ object KustoWriter { blobUUID: String, kustoClient: ExtendedKustoClient, partitionsResults: CollectionAccumulator[PartitionResult], - batchIdForTracing: String,blobIndexInBatch:Int): Unit = { + batchIdForTracing: String, + blobIndexInBatch: Int): Unit = { val size = blobResource.csvWriter.getCounter val sas = blobResource.sas val partitionId = TaskContext.getPartitionId @@ -449,8 +458,11 @@ object KustoWriter { } if (parameters.writeOptions.ensureNoDupBlobs) { // The Key change is here - val pref = KDSU.getDedupTagsPrefix(parameters.writeOptions.requestId, blobIndexInBatch.toString) - val tag = pref + blobUUID + val tag = KDSU.getDedupTagsPrefix(parameters.writeOptions.requestId, s"${blobIndexInBatch.toString}_$partitionId") + KDSU.logInfo( + className, + s"With ensureNoDupBlobs in ${TaskContext.getPartitionId.toString} " + + s"for requestId: '${parameters.writeOptions.requestId}, the tag is $tag") val ingestIfNotExist = new util.ArrayList[String] ingestIfNotExist.addAll(props.getIngestIfNotExists) val ingestBy: util.List[String] = new util.ArrayList[String] From 5d7219483fb0d47ae1e2e6fac56ef5963489d5e6 Mon Sep 17 00:00:00 2001 From: Ramachandran A G <106139410+ag-ramachandran@users.noreply.github.com> Date: Thu, 19 Sep 2024 21:35:53 +0530 Subject: [PATCH 3/7] Update connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala Co-authored-by: ohad bitton <32278684+ohadbitt@users.noreply.github.com> --- .../com/microsoft/kusto/spark/datasink/KustoWriter.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala b/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala index 0f03a31f..f387e0e7 100644 --- a/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala +++ b/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala @@ -461,8 +461,8 @@ object KustoWriter { val tag = KDSU.getDedupTagsPrefix(parameters.writeOptions.requestId, s"${blobIndexInBatch.toString}_$partitionId") KDSU.logInfo( className, - s"With ensureNoDupBlobs in ${TaskContext.getPartitionId.toString} " + - s"for requestId: '${parameters.writeOptions.requestId}, the tag is $tag") + s"With ensureNoDupBlobs in partition: ${TaskContext.getPartitionId}, " + + s"for requestId: '${parameters.writeOptions.requestId}, tag: $tag") val ingestIfNotExist = new util.ArrayList[String] ingestIfNotExist.addAll(props.getIngestIfNotExists) val ingestBy: util.List[String] = new util.ArrayList[String] From 3071ed252920e37879e6a57e50bbacc8e1309d09 Mon Sep 17 00:00:00 2001 From: Ramachandran G Date: Fri, 20 Sep 2024 12:09:41 +0530 Subject: [PATCH 4/7] * Make tag names unique * Update POM --- .../kusto/spark/datasink/KustoWriter.scala | 24 +++++++++---------- pom.xml | 2 +- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala b/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala index f387e0e7..dd7848d5 100644 --- a/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala +++ b/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala @@ -28,6 +28,7 @@ import com.microsoft.kusto.spark.utils.{ KustoDataSourceUtils => KDSU } import io.github.resilience4j.retry.RetryConfig +import org.apache.commons.lang3.StringUtils import org.apache.spark.TaskContext import org.apache.spark.sql.DataFrame import org.apache.spark.sql.catalyst.InternalRow @@ -48,6 +49,7 @@ import scala.util.{Failure, Success, Try} import java.time.ZoneId import java.time.format.DateTimeFormatter import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger object KustoWriter { private val className = this.getClass.getSimpleName @@ -374,8 +376,7 @@ object KustoWriter { parameters.authentication, parameters.coordinates.ingestionUrl, parameters.coordinates.clusterAlias) - val blobIdMap = new ConcurrentHashMap[String, Int]() - blobIdMap.put(parameters.writeOptions.requestId, 0) + val currentBlobIndex = new AtomicInteger(0) val maxBlobSize = parameters.writeOptions.batchLimit * KCONST.OneMegaByte var curBlobUUID = UUID.randomUUID().toString // This blobWriter will be used later to write the rows to blob storage from which it will be ingested to Kusto @@ -390,8 +391,6 @@ object KustoWriter { if (shouldNotCommitBlockBlob) { blobWriter } else { - val blobIndexInBatch = blobIdMap.getOrDefault(parameters.writeOptions.requestId, 0) - blobIdMap.put(parameters.writeOptions.requestId, blobIndexInBatch + 1) KDSU.logInfo( className, s"Sealing blob in partition $partitionIdString for requestId: '${parameters.writeOptions.requestId}', " + @@ -402,11 +401,10 @@ object KustoWriter { parameters, ingestionProperties, flushImmediately = !parameters.writeOptions.disableFlushImmediately, - curBlobUUID, kustoClient, partitionsResults, batchIdForTracing, - blobIndexInBatch) + currentBlobIndex.incrementAndGet()) curBlobUUID = UUID.randomUUID().toString createBlobWriter( parameters.coordinates, @@ -422,18 +420,15 @@ object KustoWriter { s"Finished serializing rows in partition $partitionIdString for requestId:'${parameters.writeOptions.requestId}'") finalizeBlobWrite(lastBlobWriter) if (lastBlobWriter.csvWriter.getCounter > 0) { - val blobIndexInBatch = blobIdMap.getOrDefault(parameters.writeOptions.requestId, 0) - blobIdMap.put(parameters.writeOptions.requestId, blobIndexInBatch + 1) ingest( lastBlobWriter, parameters, ingestionProperties, flushImmediately = false, - curBlobUUID, kustoClient, partitionsResults, batchIdForTracing, - blobIndexInBatch) + currentBlobIndex.incrementAndGet()) } } @@ -442,7 +437,6 @@ object KustoWriter { parameters: KustoWriteResource, ingestionProperties: IngestionProperties, flushImmediately: Boolean = false, - blobUUID: String, kustoClient: ExtendedKustoClient, partitionsResults: CollectionAccumulator[PartitionResult], batchIdForTracing: String, @@ -450,6 +444,7 @@ object KustoWriter { val size = blobResource.csvWriter.getCounter val sas = blobResource.sas val partitionId = TaskContext.getPartitionId + val taskAttempt = TaskContext.get().taskAttemptId() var props = ingestionProperties val blobUri = blobResource.blob.getStorageUri.getPrimaryUri.toString if (parameters.writeOptions.ensureNoDupBlobs || (!props.getFlushImmediately && flushImmediately)) { @@ -458,11 +453,14 @@ object KustoWriter { } if (parameters.writeOptions.ensureNoDupBlobs) { // The Key change is here - val tag = KDSU.getDedupTagsPrefix(parameters.writeOptions.requestId, s"${blobIndexInBatch.toString}_$partitionId") + val tag = KDSU.getDedupTagsPrefix( + parameters.writeOptions.requestId, + s"${blobIndexInBatch.toString}_${partitionId}_${StringUtils.defaultIfBlank(batchIdForTracing, "0")}") KDSU.logInfo( className, s"With ensureNoDupBlobs in partition: ${TaskContext.getPartitionId}, " + - s"for requestId: '${parameters.writeOptions.requestId}, tag: $tag") + s"for requestId: '${parameters.writeOptions.requestId}, tag: $tag , blobIndexInBatch: $blobIndexInBatch " + + s"and batchIdForTracing: ${StringUtils.defaultIfBlank(batchIdForTracing, "0")}") val ingestIfNotExist = new util.ArrayList[String] ingestIfNotExist.addAll(props.getIngestIfNotExists) val ingestBy: util.List[String] = new util.ArrayList[String] diff --git a/pom.xml b/pom.xml index f4c3d4cd..de33b646 100644 --- a/pom.xml +++ b/pom.xml @@ -8,7 +8,7 @@ pom ${revision} - 5.2.2 + 5.2.2-PREVIEW 2.12 1.1.1640084764.9f463a9 From 37e5feb8dd598f291262922ba13b2d97eeeab430 Mon Sep 17 00:00:00 2001 From: Ramachandran G Date: Fri, 20 Sep 2024 12:54:23 +0530 Subject: [PATCH 5/7] * Make tag names unique * Update POM --- .../scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala b/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala index dd7848d5..6b50c2c0 100644 --- a/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala +++ b/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala @@ -444,7 +444,6 @@ object KustoWriter { val size = blobResource.csvWriter.getCounter val sas = blobResource.sas val partitionId = TaskContext.getPartitionId - val taskAttempt = TaskContext.get().taskAttemptId() var props = ingestionProperties val blobUri = blobResource.blob.getStorageUri.getPrimaryUri.toString if (parameters.writeOptions.ensureNoDupBlobs || (!props.getFlushImmediately && flushImmediately)) { From 6eaa5eea03bbdd656ea58a6fdfe6daa97552d76b Mon Sep 17 00:00:00 2001 From: Ramachandran G Date: Wed, 25 Sep 2024 13:33:03 +0530 Subject: [PATCH 6/7] * Minor fixes, add the rows count and cache the RDD to avoid re-evaluation --- .../kusto/spark/datasink/KustoWriter.scala | 80 ++++++++++--------- .../kusto/spark/datasink/Writers.scala | 3 + 2 files changed, 46 insertions(+), 37 deletions(-) diff --git a/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala b/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala index 6b50c2c0..5554952d 100644 --- a/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala +++ b/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala @@ -12,7 +12,7 @@ import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException import com.microsoft.azure.kusto.ingest.resources.ContainerWithSas import com.microsoft.azure.kusto.ingest.result.IngestionResult import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo -import com.microsoft.azure.kusto.ingest.{IngestClient, IngestionProperties} +import com.microsoft.azure.kusto.ingest.IngestionProperties import com.microsoft.azure.storage.blob.{BlobRequestOptions, CloudBlockBlob} import com.microsoft.kusto.spark.authentication.KustoAuthentication import com.microsoft.kusto.spark.common.KustoCoordinates @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types.StructType import org.apache.spark.util.CollectionAccumulator -import java.io._ +import java.io.{BufferedWriter, IOException, OutputStreamWriter} import java.net.URI import java.nio.charset.StandardCharsets import java.security.InvalidParameterException @@ -43,17 +43,15 @@ import java.time.{Clock, Duration, Instant} import java.util import java.util.zip.GZIPOutputStream import java.util.{TimeZone, UUID} -import scala.collection.JavaConverters._ import scala.concurrent.ExecutionContext.Implicits.global import scala.util.{Failure, Success, Try} import java.time.ZoneId import java.time.format.DateTimeFormatter -import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicInteger +import scala.collection.JavaConverters.asScalaBufferConverter object KustoWriter { private val className = this.getClass.getSimpleName - val LegacyTempIngestionTablePrefix = "_tmpTable" val TempIngestionTablePrefix = "sparkTempTable_" val DelayPeriodBetweenCalls: Int = KCONST.DefaultPeriodicSamplePeriod.toMillis.toInt private val GzipBufferSize: Int = 1000 * KCONST.DefaultBufferSize @@ -161,46 +159,47 @@ object KustoWriter { tmpTableName = tmpTableName, cloudInfo = cloudInfo) val sinkStartTime = getCreationTime(stagingTableIngestionProperties) + // Cache this RDD created so that it is not evaluated multiple times from source + val cachedRdd = rdd.cache() if (writeOptions.isAsync) { - val asyncWork = rdd.foreachPartitionAsync { rows => + val asyncWork = cachedRdd.foreachPartitionAsync { rows => ingestRowsIntoTempTbl(rows, batchIdIfExists, partitionsResults, parameters) } KDSU.logInfo(className, s"asynchronous write to Kusto table '$table' in progress") // This part runs back on the driver - if (writeOptions.isTransactionalMode) { - asyncWork.onSuccess { case _ => - finalizeIngestionWhenWorkersSucceeded( - tableCoordinates, - batchIdIfExists, - tmpTableName, - partitionsResults, - writeOptions, - crp, - tableExists, - rdd.sparkContext, - authentication, - kustoClient, - sinkStartTime) - } - asyncWork.onFailure { case exception: Exception => - if (writeOptions.userTempTableName.isEmpty) { - kustoClient.cleanupIngestionByProducts(tableCoordinates.database, tmpTableName, crp) - } - KDSU.reportExceptionAndThrow( - className, - exception, - "writing data", - tableCoordinates.clusterUrl, - tableCoordinates.database, - table, - shouldNotThrow = true) - KDSU.logError(className, "The exception is not visible in the driver since we're in async mode") + asyncWork.onComplete { + case Success(_) => + finalizeIngestionWhenWorkersSucceeded( + tableCoordinates, + batchIdIfExists, + tmpTableName, + partitionsResults, + writeOptions, + crp, + tableExists, + rdd.sparkContext, + authentication, + kustoClient, + sinkStartTime) + case Failure(exception) => + if (writeOptions.userTempTableName.isEmpty) { + kustoClient.cleanupIngestionByProducts(tableCoordinates.database, tmpTableName, crp) + } + KDSU.reportExceptionAndThrow( + className, + exception, + "writing data", + tableCoordinates.clusterUrl, + tableCoordinates.database, + table, + shouldNotThrow = true) + KDSU.logError(className, "The exception is not visible in the driver since we're in async mode") } } } else { try - rdd.foreachPartition { rows => + cachedRdd.foreachPartition { rows => ingestRowsIntoTempTbl(rows, batchIdIfExists, partitionsResults, parameters) } catch { @@ -322,7 +321,7 @@ object KustoWriter { parameters.coordinates.ingestionUrl, parameters.coordinates.clusterAlias) val ingestClient = clientCache.ingestClient - CloudInfo.manuallyAddToCache(clientCache.ingestKcsb.getClusterUrl, parameters.cloudInfo); + CloudInfo.manuallyAddToCache(clientCache.ingestKcsb.getClusterUrl, parameters.cloudInfo) val reqRetryOpts = new RequestRetryOptions( RetryPolicyType.FIXED, KCONST.QueueRetryAttempts, @@ -387,6 +386,7 @@ object KustoWriter { val lastBlobWriter = rows.zipWithIndex.foldLeft[BlobWriteResource](initialBlobWriter) { case (blobWriter, row) => RowCSVWriterUtils.writeRowAsCSV(row._1, parameters.schema, timeZone, blobWriter.csvWriter) val count = blobWriter.csvWriter.getCounter + val rowsWritten = blobWriter.csvWriter.getRowsWritten val shouldNotCommitBlockBlob = count < maxBlobSize if (shouldNotCommitBlockBlob) { blobWriter @@ -394,7 +394,7 @@ object KustoWriter { KDSU.logInfo( className, s"Sealing blob in partition $partitionIdString for requestId: '${parameters.writeOptions.requestId}', " + - s"blob number ${row._2}, with size $count") + s"blob number ${row._2}, blobname ${blobWriter.blob.getName} with size $count.Rows written: $rowsWritten") finalizeBlobWrite(blobWriter) ingest( blobWriter, @@ -420,6 +420,12 @@ object KustoWriter { s"Finished serializing rows in partition $partitionIdString for requestId:'${parameters.writeOptions.requestId}'") finalizeBlobWrite(lastBlobWriter) if (lastBlobWriter.csvWriter.getCounter > 0) { + val count = lastBlobWriter.csvWriter.getCounter + val rowsWritten = lastBlobWriter.csvWriter.getRowsWritten + KDSU.logInfo( + className, + s"Flushing final blob in partition $partitionIdString for requestId: '${parameters.writeOptions.requestId}', " + + s"blob name ${lastBlobWriter.blob.getName}, with size $count.Rows written: $rowsWritten") ingest( lastBlobWriter, parameters, diff --git a/connector/src/main/scala/com/microsoft/kusto/spark/datasink/Writers.scala b/connector/src/main/scala/com/microsoft/kusto/spark/datasink/Writers.scala index 4d4de2fb..30cc9e50 100644 --- a/connector/src/main/scala/com/microsoft/kusto/spark/datasink/Writers.scala +++ b/connector/src/main/scala/com/microsoft/kusto/spark/datasink/Writers.scala @@ -19,10 +19,12 @@ case class CountingWriter(out: java.io.Writer) extends Writer { // new sun.security.action.GetPropertyAction("line.separator")) private val newLineSepLength: Int = newLineSep.length private var bytesCounter: Long = 0L + private var rowsCounter: Long = 0L def newLine(): Unit = { out.write(newLineSep) bytesCounter += newLineSepLength + rowsCounter += 1 } def write(c: Char): Unit = { @@ -53,6 +55,7 @@ case class CountingWriter(out: java.io.Writer) extends Writer { def getCounter: Long = bytesCounter + def getRowsWritten: Long = rowsCounter def resetCounter(): Unit = { bytesCounter = 0 } From dfaba24449d12c1b56a3059c59c486016acd9c6f Mon Sep 17 00:00:00 2001 From: Ramachandran G Date: Wed, 25 Sep 2024 17:19:27 +0530 Subject: [PATCH 7/7] * Minor fixes, add the rows count and cache the RDD to avoid re-evaluation --- .../kusto/spark/datasink/KustoWriter.scala | 30 +++++++++++++------ 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala b/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala index 5554952d..039de7d9 100644 --- a/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala +++ b/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala @@ -150,7 +150,13 @@ object KustoWriter { } val cloudInfo = CloudInfo.retrieveCloudInfoForCluster(kustoClient.ingestKcsb.getClusterUrl) val rdd = data.queryExecution.toRdd - val partitionsResults = rdd.sparkContext.collectionAccumulator[PartitionResult] + val isRddAlreadyCached = rdd.getStorageLevel.useMemory + val maybeCachedRdd = if (writeOptions.ensureNoDupBlobs && !isRddAlreadyCached) { + rdd.cache() + } else { + rdd + } + val partitionsResults = maybeCachedRdd.sparkContext.collectionAccumulator[PartitionResult] val parameters = KustoWriteResource( authentication = authentication, coordinates = tableCoordinates, @@ -160,10 +166,12 @@ object KustoWriter { cloudInfo = cloudInfo) val sinkStartTime = getCreationTime(stagingTableIngestionProperties) // Cache this RDD created so that it is not evaluated multiple times from source - val cachedRdd = rdd.cache() + + val updatedParameters = parameters.copy(isAlreadyCached = isRddAlreadyCached) + if (writeOptions.isAsync) { - val asyncWork = cachedRdd.foreachPartitionAsync { rows => - ingestRowsIntoTempTbl(rows, batchIdIfExists, partitionsResults, parameters) + val asyncWork = maybeCachedRdd.foreachPartitionAsync { rows => + ingestRowsIntoTempTbl(rows, batchIdIfExists, partitionsResults, updatedParameters) } KDSU.logInfo(className, s"asynchronous write to Kusto table '$table' in progress") // This part runs back on the driver @@ -178,7 +186,7 @@ object KustoWriter { writeOptions, crp, tableExists, - rdd.sparkContext, + maybeCachedRdd.sparkContext, authentication, kustoClient, sinkStartTime) @@ -199,8 +207,8 @@ object KustoWriter { } } else { try - cachedRdd.foreachPartition { rows => - ingestRowsIntoTempTbl(rows, batchIdIfExists, partitionsResults, parameters) + maybeCachedRdd.foreachPartition { rows => + ingestRowsIntoTempTbl(rows, batchIdIfExists, partitionsResults, updatedParameters) } catch { case exception: Exception => @@ -221,12 +229,15 @@ object KustoWriter { writeOptions, crp, tableExists, - rdd.sparkContext, + maybeCachedRdd.sparkContext, authentication, kustoClient, sinkStartTime) } } + if (parameters.writeOptions.ensureNoDupBlobs && !parameters.isAlreadyCached) { + maybeCachedRdd.unpersist() + } } } @@ -536,6 +547,7 @@ final case class KustoWriteResource( schema: StructType, writeOptions: WriteOptions, tmpTableName: String, - cloudInfo: CloudInfo) + cloudInfo: CloudInfo, + isAlreadyCached: Boolean = false) final case class PartitionResult(ingestionResult: IngestionResult, partitionId: Int)