From a4372ef4cf48f59a193a984b192868bef6309edc Mon Sep 17 00:00:00 2001 From: Grigory Pomadchin Date: Sun, 10 Jul 2022 11:41:20 -0400 Subject: [PATCH] Cassandra closeAsync --- .../spark/store/cassandra/CassandraRDDReader.scala | 2 +- .../spark/store/cassandra/CassandraRDDWriter.scala | 2 +- cassandra-spark/src/test/resources/application.conf | 2 +- .../store/cassandra/CassandraInstance.scala | 12 +++++++----- .../scala/geotrellis/store/cassandra/package.scala | 7 ++++++- 5 files changed, 16 insertions(+), 9 deletions(-) diff --git a/cassandra-spark/src/main/scala/geotrellis/spark/store/cassandra/CassandraRDDReader.scala b/cassandra-spark/src/main/scala/geotrellis/spark/store/cassandra/CassandraRDDReader.scala index f162ec4129..70071d9956 100644 --- a/cassandra-spark/src/main/scala/geotrellis/spark/store/cassandra/CassandraRDDReader.scala +++ b/cassandra-spark/src/main/scala/geotrellis/spark/store/cassandra/CassandraRDDReader.scala @@ -89,7 +89,7 @@ object CassandraRDDReader { } /** Close partition session */ - (result ++ Iterator({ session.close(); Seq.empty[(K, V)] })).flatten + (result ++ Iterator({ session.closeAsync(); Seq.empty[(K, V)] })).flatten } } } diff --git a/cassandra-spark/src/main/scala/geotrellis/spark/store/cassandra/CassandraRDDWriter.scala b/cassandra-spark/src/main/scala/geotrellis/spark/store/cassandra/CassandraRDDWriter.scala index bc8a328a9d..17813e33f2 100644 --- a/cassandra-spark/src/main/scala/geotrellis/spark/store/cassandra/CassandraRDDWriter.scala +++ b/cassandra-spark/src/main/scala/geotrellis/spark/store/cassandra/CassandraRDDWriter.scala @@ -151,7 +151,7 @@ object CassandraRDDWriter { .flatMap(rowToBytes) .map(retire) .parJoinUnbounded - .onComplete { fs2.Stream eval session.closeF[IO] } + .onComplete { fs2.Stream eval IO(session.closeAsync) } results .compile diff --git a/cassandra-spark/src/test/resources/application.conf b/cassandra-spark/src/test/resources/application.conf index cf8ea91f37..cb5c470212 100644 --- a/cassandra-spark/src/test/resources/application.conf +++ b/cassandra-spark/src/test/resources/application.conf @@ -4,4 +4,4 @@ datastax-java-driver { local-datacenter = datacenter1 } } -} \ No newline at end of file +} diff --git a/cassandra/src/main/scala/geotrellis/store/cassandra/CassandraInstance.scala b/cassandra/src/main/scala/geotrellis/store/cassandra/CassandraInstance.scala index 20dcef2eef..cce9dd1dc7 100644 --- a/cassandra/src/main/scala/geotrellis/store/cassandra/CassandraInstance.scala +++ b/cassandra/src/main/scala/geotrellis/store/cassandra/CassandraInstance.scala @@ -77,12 +77,14 @@ trait CassandraInstance extends Serializable { /** With session close */ def withSessionDo[T](block: CqlSession => T): T = { val session = getSession() - try block(session) finally session.close() + try block(session) finally session.closeAsync() } - def closeAsync[F[_]: Async]: F[Unit] = session.closeF + // def closeAsync[F[_]: Async]: F[Unit] = session.closeF - def close(): Unit = session.close() + // def close(): Unit = session.close() + + def closeAsync = session.closeAsync() } case class BaseCassandraInstance( @@ -134,7 +136,7 @@ object Cassandra { implicit def instanceToSession[T <: CassandraInstance](instance: T): CqlSession = instance.session def withCassandraInstance[T <: CassandraInstance, K](instance: T)(block: T => K): K = block(instance) - def withCassandraInstanceDo[T <: CassandraInstance, K](instance: T)(block: T => K): K = try block(instance) finally instance.close() + def withCassandraInstanceDo[T <: CassandraInstance, K](instance: T)(block: T => K): K = try block(instance) finally instance.closeAsync() def withBaseCassandraInstance[K](hosts: Seq[String], username: String, @@ -155,7 +157,7 @@ object Cassandra { password: String, cassandraConfig: CassandraConfig)(block: CassandraInstance => K): K = { val instance = BaseCassandraInstance(hosts, username, password, cassandraConfig) - try block(instance) finally instance.close() + try block(instance) finally instance.closeAsync() } def withBaseCassandraInstanceDo[K](hosts: Seq[String], username: String, diff --git a/cassandra/src/main/scala/geotrellis/store/cassandra/package.scala b/cassandra/src/main/scala/geotrellis/store/cassandra/package.scala index 7004c8474a..5a2509a26a 100644 --- a/cassandra/src/main/scala/geotrellis/store/cassandra/package.scala +++ b/cassandra/src/main/scala/geotrellis/store/cassandra/package.scala @@ -24,7 +24,7 @@ import com.datastax.oss.driver.api.core.cql.{AsyncResultSet, Statement} import java.math.BigInteger import java.util.concurrent.CompletableFuture -package object cassandra { +package object cassandra extends Serializable { implicit def bigToBig(i: BigInt): BigInteger = new BigInteger(i.toByteArray) implicit class CompletableFutureOps[T](val self: CompletableFuture[T]) extends AnyVal { @@ -35,4 +35,9 @@ package object cassandra { def closeF[F[_]: Async]: F[Unit] = self.closeAsync().toCompletableFuture.liftTo.void def executeF[F[_]: Async](statement: Statement[_]): F[AsyncResultSet] = self.executeAsync(statement).toCompletableFuture.liftTo } + + implicit class AsyncResultSetOps(val self: AsyncResultSet) extends AnyVal { + def nonEmpty: Boolean = self.remaining() > 0 || self.hasMorePages + def isEmpty: Boolean = !nonEmpty + } }