Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Cats backpressure to throttle writes (and conversions to HTTP requests) #1000

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 7 additions & 15 deletions src/main/scala/cognite/spark/v1/RawTableRelation.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cognite.spark.v1

import cats.effect.IO
import cats.effect.std.Backpressure
import cats.implicits._
import cognite.spark.v1.PushdownUtilities.getTimestampLimit
import com.codahale.metrics.Counter
Expand Down Expand Up @@ -287,22 +288,13 @@ class RawTableRelation(
dfWithUnRenamedKeyColumns.foreachPartition((rows: Iterator[Row]) => {
config.maxOutstandingRawInsertRequests match {
case Some(maxOutstandingRawInsertRequests) =>
// We first group by batch size of a write, and then group that by the number of allowed parallel
// outstanding requests to avoid queueing up too many requests towards the RAW API (and this potentially
// leading to an OutOfMemory)
// Note: This is a suboptimal fix, as if one of the requests in a batch is slow, we will not
// start on the next batch (this limitation used to be per partition). Instead, we should
// have a cats.effect.std.Semaphore permit with X number of outstanding requests
// or cats.effect.concurrent.Backpressure.
rows
.grouped(batchSize)
.toSeq
.grouped(maxOutstandingRawInsertRequests)
.foreach { batch =>
batch.toVector
.parTraverse_(postRows(columnNames, _))
.unsafeRunSync()
Backpressure[IO](Backpressure.Strategy.Lossless, maxOutstandingRawInsertRequests)
.flatMap { backpressure =>
rows.grouped(batchSize).toVector.parTraverse_ { batch: Seq[Row] =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compared to _.grouped.toSeq.grouped.foreach it will now create a vector of all batches, they will reference all Row items from iterator which potentially could be not a lot of mem, less than request bodies, more than constant mem

but compared to .grouped.toVector it is about the same apart from having semaphore

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't look like there's a out-of-the box Seq.parTraverse_, or even better the one that would not take more items when semaphore is full, so for now we can try the .toVector.parTraverse_

backpressure.metered(postRows(columnNames, batch))
}
}
.unsafeRunSync()
case None =>
// Same behavior as before, which is prone to OutOfMemory if the RAW API calls are too slow
// to finish
Expand Down
Loading