Skip to content

Commit

Permalink
Merge pull request #54 from nrkno/use_interruptable-where_needed
Browse files Browse the repository at this point in the history
Use F.interruptable where operations may be cancelled
  • Loading branch information
hamnis authored Apr 19, 2023
2 parents 2890af3 + 0df2ae8 commit 801e83d
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 12 deletions.
16 changes: 8 additions & 8 deletions core/src/main/scala/no/nrk/bigquery/BigQueryClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,11 @@ class BigQueryClient[F[_]](
else identity
)
.prefetch
.evalMap(chunk => F.blocking(writer.write(chunk.toByteBuffer)))
.evalMap(chunk => F.interruptible(writer.write(chunk.toByteBuffer)))
.compile
.drain
}
.flatMap(_ => F.blocking(Option(bigQuery.getJob(jobId))))
.flatMap(_ => F.interruptible(Option(bigQuery.getJob(jobId))))

}.map(jobOpt => jobOpt.map(_.getStatistics[LoadStatistics]))

Expand All @@ -318,7 +318,7 @@ class BigQueryClient[F[_]](
))
}
.flatMap(tmp =>
F.blocking {
F.interruptible {
val tempTableBqDef = UpdateOperation.createNew(tmp).table
val expirationTime =
Instant.now.plusMillis(expirationDuration.getOrElse(1.hour).toMillis)
Expand Down Expand Up @@ -404,7 +404,7 @@ class BigQueryClient[F[_]](
b.build()
}

F.blocking(
F.interruptible(
Option(
bigQuery.create(JobInfo.of(jobId, jobConfiguration), jobOptions: _*)
)
Expand Down Expand Up @@ -433,7 +433,7 @@ class BigQueryClient[F[_]](
maxDuration = 20.minutes,
maxErrorsTolerated = 10
)(
retry = F.blocking(bigQuery.getJob(runningJob.getJobId))
retry = F.interruptible(bigQuery.getJob(runningJob.getJobId))
)
.flatMap {
case BQPoll.Failed(error) => F.raiseError[Job](error)
Expand Down Expand Up @@ -463,7 +463,7 @@ class BigQueryClient[F[_]](
tableId: BQTableId,
tableOptions: TableOption*
): F[Option[Table]] =
F.blocking(
F.interruptible(
Option(bigQuery.getTable(tableId.underlying, tableOptions: _*))
.filter(_.exists())
)
Expand All @@ -488,7 +488,7 @@ class BigQueryClient[F[_]](
.setDryRun(true)
.build()
)
F.blocking(bigQuery.create(jobInfo))
F.interruptible(bigQuery.create(jobInfo))
}

def create(table: TableInfo): F[Table] =
Expand All @@ -504,7 +504,7 @@ class BigQueryClient[F[_]](
dataset: BQDataset,
datasetOptions: BigQuery.TableListOption*
): F[Vector[BQTableRef[Any]]] =
F.blocking(bigQuery.listTables(dataset.underlying, datasetOptions: _*)).flatMap { tables =>
F.interruptible(bigQuery.listTables(dataset.underlying, datasetOptions: _*)).flatMap { tables =>
tables
.iterateAll()
.asScala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class BigQueryTransferClient(transferClient: DataTransferServiceClient) {
def getTransferConfig(
transferConfigName: TransferConfigName
): IO[Option[TransferConfig]] =
IO.blocking(
IO.interruptible(
transferClient.listTransferConfigs(
s"projects/${transferConfigName.getProject}/locations/${transferConfigName.getLocation}"
)
Expand Down Expand Up @@ -64,7 +64,7 @@ class BigQueryTransferClient(transferClient: DataTransferServiceClient) {
.setParent(projectName.toString)
.setTransferConfig(transferConfig)
.build
IO.blocking(transferClient.createTransferConfig(transferRequest))
IO.interruptible(transferClient.createTransferConfig(transferRequest))
}

def startDatasetTransfer(transferConfig: TransferConfig): IO[TransferStatus] =
Expand All @@ -76,13 +76,13 @@ class BigQueryTransferClient(transferClient: DataTransferServiceClient) {
.setParent(transferConfig.getName)
.setRequestedRunTime(transferConfig.getUpdateTime)
.build()
runResponse <- IO.blocking(
runResponse <- IO.interruptible(
transferClient.startManualTransferRuns(startRequest)
)
activeRun = runResponse.getRunsList.asScala.headOption
transferStatus <- activeRun.traverse(activeRun =>
pollDatasetTransfer(activeRun, 15.seconds)(retry =
IO.blocking(transferClient.getTransferRun(activeRun.getName))).attempt.map {
IO.interruptible(transferClient.getTransferRun(activeRun.getName))).attempt.map {
case Left(err) => TransferFailed(activeRun, err)
case Right(transfer) => transfer
})
Expand Down

0 comments on commit 801e83d

Please sign in to comment.