Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/update tag name for dups v2 #403

Closed
wants to merge 12 commits into from

Conversation

ag-ramachandran
Copy link
Contributor

Pull Request Description

[Add a description of your pull request here]


Future Release Comment

[Add description of your change, to include in the next release]
[Delete any or all irrelevant sections, e.g. if your change does not warrant a release comment at all]

Breaking Changes:

  • None

Features:

  • None

Fixes:

  • None

Copy link

github-actions bot commented Sep 26, 2024

Test Results

76 tests  ±0   76 ✅ ±0   2m 52s ⏱️ +33s
22 suites ±0    0 💤 ±0 
22 files   ±0    0 ❌ ±0 

Results for commit 7312a20. ± Comparison against base commit 175ac60.

♻️ This comment has been updated with latest results.

props = SparkIngestionProperties.cloneIngestionProperties(ingestionProperties)
}
if (parameters.writeOptions.ensureNoDupBlobs) {
// if (blobIndexInBatch == 2 && TaskContext.get().taskAttemptId() % 2 == 0 && exceptionCount
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert afterwards

.scalafmt.conf Outdated
@@ -24,7 +24,7 @@ optIn = {
}
danglingParentheses.preset = false
docstrings.style = Asterisk
maxColumn = 98
maxColumn = 120
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

funny that now its the other way

import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConverters.asScalaBufferConverter

object KustoWriter {
private val className = this.getClass.getSimpleName
private val exceptionCount = new AtomicInteger(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

@@ -166,43 +170,47 @@ object KustoWriter {
tmpTableName = tmpTableName,
cloudInfo = cloudInfo)
val sinkStartTime = getCreationTime(stagingTableIngestionProperties)
// Cache this RDD created so that it is not evaluated multiple times from source
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove

if (!props.getFlushImmediately && flushImmediately) {
props.setFlushImmediately(true)
}
val ingestClient = KustoClientCache
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to get the client from cache everytime instead of passing it in params?

pom.xml Outdated
@@ -8,7 +8,7 @@
<packaging>pom</packaging>
<version>${revision}</version>
<properties>
<revision>5.2.2</revision>
<revision>5.2.2-PREVIEW</revision>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why preview

}
}
if (parameters.writeOptions.ensureNoDupBlobs && taskMap.size() > 0) {
taskMap.forEach((blobUUID, blobWriter) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you dont use the blobUUID - revert to old usage

@ag-ramachandran
Copy link
Contributor Author

Supersceded with #404

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants