-
Notifications
You must be signed in to change notification settings - Fork 35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/update tag name for dups v2 #403
Conversation
props = SparkIngestionProperties.cloneIngestionProperties(ingestionProperties) | ||
} | ||
if (parameters.writeOptions.ensureNoDupBlobs) { | ||
// if (blobIndexInBatch == 2 && TaskContext.get().taskAttemptId() % 2 == 0 && exceptionCount |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert afterwards
.scalafmt.conf
Outdated
@@ -24,7 +24,7 @@ optIn = { | |||
} | |||
danglingParentheses.preset = false | |||
docstrings.style = Asterisk | |||
maxColumn = 98 | |||
maxColumn = 120 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
funny that now its the other way
import java.util.concurrent.atomic.AtomicInteger | ||
import scala.collection.JavaConverters.asScalaBufferConverter | ||
|
||
object KustoWriter { | ||
private val className = this.getClass.getSimpleName | ||
private val exceptionCount = new AtomicInteger(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
@@ -166,43 +170,47 @@ object KustoWriter { | |||
tmpTableName = tmpTableName, | |||
cloudInfo = cloudInfo) | |||
val sinkStartTime = getCreationTime(stagingTableIngestionProperties) | |||
// Cache this RDD created so that it is not evaluated multiple times from source |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove
if (!props.getFlushImmediately && flushImmediately) { | ||
props.setFlushImmediately(true) | ||
} | ||
val ingestClient = KustoClientCache |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to get the client from cache everytime instead of passing it in params?
pom.xml
Outdated
@@ -8,7 +8,7 @@ | |||
<packaging>pom</packaging> | |||
<version>${revision}</version> | |||
<properties> | |||
<revision>5.2.2</revision> | |||
<revision>5.2.2-PREVIEW</revision> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why preview
} | ||
} | ||
if (parameters.writeOptions.ensureNoDupBlobs && taskMap.size() > 0) { | ||
taskMap.forEach((blobUUID, blobWriter) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you dont use the blobUUID - revert to old usage
Supersceded with #404 |
Pull Request Description
[Add a description of your pull request here]
Future Release Comment
[Add description of your change, to include in the next release]
[Delete any or all irrelevant sections, e.g. if your change does not warrant a release comment at all]
Breaking Changes:
Features:
Fixes: