-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
ArtifactStore implementation for CosmosDB (#3562)
This commit provides a CosmosDB based implementation for ArtifactStore SPI. Given the complexity involved in performing various operations against CosmosDB this commit uses the Java SDK to simplify and speed up the implementation - because compared to CouchDB, performing queries with CosmosDB requires client side computation, which involves sending queries to each partition, then collecting and merging the result set. The Async Java SDK takes care of all these interactions and provides a simplified reactive API based on RxJava.
- Loading branch information
Showing
26 changed files
with
2,162 additions
and
8 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
495 changes: 495 additions & 0 deletions
495
common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
Large diffs are not rendered by default.
Oops, something went wrong.
114 changes: 114 additions & 0 deletions
114
common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package whisk.core.database.cosmosdb | ||
|
||
import java.io.Closeable | ||
|
||
import akka.actor.ActorSystem | ||
import akka.stream.ActorMaterializer | ||
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient | ||
import spray.json.RootJsonFormat | ||
import whisk.common.Logging | ||
import whisk.core.database._ | ||
import pureconfig._ | ||
import whisk.core.entity.size._ | ||
import whisk.core.ConfigKeys | ||
import whisk.core.database.cosmosdb.CosmosDBUtil.createClient | ||
import whisk.core.entity.{DocumentReader, WhiskActivation, WhiskAuth, WhiskEntity} | ||
|
||
import scala.reflect.ClassTag | ||
|
||
case class CosmosDBConfig(endpoint: String, key: String, db: String) | ||
|
||
case class ClientHolder(client: AsyncDocumentClient) extends Closeable { | ||
override def close(): Unit = client.close() | ||
} | ||
|
||
object CosmosDBArtifactStoreProvider extends ArtifactStoreProvider { | ||
type DocumentClientRef = ReferenceCounted[ClientHolder]#CountedReference | ||
private lazy val config = loadConfigOrThrow[CosmosDBConfig](ConfigKeys.cosmosdb) | ||
private var clientRef: ReferenceCounted[ClientHolder] = _ | ||
|
||
override def makeStore[D <: DocumentSerializer: ClassTag](useBatching: Boolean)( | ||
implicit jsonFormat: RootJsonFormat[D], | ||
docReader: DocumentReader, | ||
actorSystem: ActorSystem, | ||
logging: Logging, | ||
materializer: ActorMaterializer): ArtifactStore[D] = { | ||
makeStoreForClient(config, getOrCreateReference(config), getAttachmentStore()) | ||
} | ||
|
||
def makeArtifactStore[D <: DocumentSerializer: ClassTag](config: CosmosDBConfig, | ||
attachmentStore: Option[AttachmentStore])( | ||
implicit jsonFormat: RootJsonFormat[D], | ||
docReader: DocumentReader, | ||
actorSystem: ActorSystem, | ||
logging: Logging, | ||
materializer: ActorMaterializer): ArtifactStore[D] = { | ||
|
||
makeStoreForClient(config, createReference(config).reference(), attachmentStore) | ||
} | ||
|
||
private def makeStoreForClient[D <: DocumentSerializer: ClassTag](config: CosmosDBConfig, | ||
clientRef: DocumentClientRef, | ||
attachmentStore: Option[AttachmentStore])( | ||
implicit jsonFormat: RootJsonFormat[D], | ||
docReader: DocumentReader, | ||
actorSystem: ActorSystem, | ||
logging: Logging, | ||
materializer: ActorMaterializer): ArtifactStore[D] = { | ||
|
||
val classTag = implicitly[ClassTag[D]] | ||
val (dbName, handler, viewMapper) = handlerAndMapper(classTag) | ||
|
||
new CosmosDBArtifactStore( | ||
dbName, | ||
config, | ||
clientRef, | ||
handler, | ||
viewMapper, | ||
loadConfigOrThrow[InliningConfig](ConfigKeys.db), | ||
attachmentStore) | ||
} | ||
|
||
private def handlerAndMapper[D](entityType: ClassTag[D])( | ||
implicit actorSystem: ActorSystem, | ||
logging: Logging, | ||
materializer: ActorMaterializer): (String, DocumentHandler, CosmosDBViewMapper) = { | ||
val entityClass = entityType.runtimeClass | ||
if (entityClass == classOf[WhiskEntity]) ("whisks", WhisksHandler, WhisksViewMapper) | ||
else if (entityClass == classOf[WhiskActivation]) ("activations", ActivationHandler, ActivationViewMapper) | ||
else if (entityClass == classOf[WhiskAuth]) ("subjects", SubjectHandler, SubjectViewMapper) | ||
else throw new IllegalArgumentException(s"Unsupported entity type $entityType") | ||
} | ||
|
||
/* | ||
* This method ensures that all store instances share same client instance and thus the underlying connection pool. | ||
* Synchronization is required to ensure concurrent init of various store instances share same ref instance | ||
*/ | ||
private def getOrCreateReference(config: CosmosDBConfig) = synchronized { | ||
if (clientRef == null || clientRef.isClosed) { | ||
clientRef = createReference(config) | ||
} | ||
clientRef.reference() | ||
} | ||
|
||
private def createReference(config: CosmosDBConfig) = | ||
new ReferenceCounted[ClientHolder](ClientHolder(createClient(config))) | ||
|
||
} |
88 changes: 88 additions & 0 deletions
88
common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBSupport.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package whisk.core.database.cosmosdb | ||
|
||
import com.microsoft.azure.cosmosdb._ | ||
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient | ||
|
||
import scala.collection.JavaConverters._ | ||
import scala.collection.immutable | ||
|
||
private[cosmosdb] trait CosmosDBSupport extends RxObservableImplicits with CosmosDBUtil { | ||
protected def config: CosmosDBConfig | ||
protected def collName: String | ||
protected def client: AsyncDocumentClient | ||
protected def viewMapper: CosmosDBViewMapper | ||
|
||
def initialize(): (Database, DocumentCollection) = { | ||
val db = getOrCreateDatabase() | ||
(db, getOrCreateCollection(db)) | ||
} | ||
|
||
private def getOrCreateDatabase(): Database = { | ||
client | ||
.queryDatabases(querySpec(config.db), null) | ||
.blockingOnlyResult() | ||
.getOrElse { | ||
client.createDatabase(newDatabase, null).blockingResult() | ||
} | ||
} | ||
|
||
private def getOrCreateCollection(database: Database) = { | ||
client | ||
.queryCollections(database.getSelfLink, querySpec(collName), null) | ||
.blockingOnlyResult() | ||
.map { coll => | ||
if (matchingIndexingPolicy(coll)) { | ||
coll | ||
} else { | ||
//Modify the found collection with latest policy as its selfLink is set | ||
coll.setIndexingPolicy(viewMapper.indexingPolicy.asJava()) | ||
client.replaceCollection(coll, null).blockingResult() | ||
} | ||
} | ||
.getOrElse { | ||
client.createCollection(database.getSelfLink, newDatabaseCollection, null).blockingResult() | ||
} | ||
} | ||
|
||
private def matchingIndexingPolicy(coll: DocumentCollection): Boolean = | ||
IndexingPolicy.isSame(viewMapper.indexingPolicy, IndexingPolicy(coll.getIndexingPolicy)) | ||
|
||
private def newDatabaseCollection = { | ||
val defn = new DocumentCollection | ||
defn.setId(collName) | ||
defn.setIndexingPolicy(viewMapper.indexingPolicy.asJava()) | ||
defn.setPartitionKey(viewMapper.partitionKeyDefn) | ||
defn | ||
} | ||
|
||
private def newDatabase = { | ||
val databaseDefinition = new Database | ||
databaseDefinition.setId(config.db) | ||
databaseDefinition | ||
} | ||
|
||
/** | ||
* Prepares a query for fetching any resource by id | ||
*/ | ||
protected def querySpec(id: String) = | ||
new SqlQuerySpec("SELECT * FROM root r WHERE r.id=@id", new SqlParameterCollection(new SqlParameter("@id", id))) | ||
|
||
protected def asSeq[T <: Resource](r: FeedResponse[T]): immutable.Seq[T] = r.getResults.asScala.to[immutable.Seq] | ||
} |
108 changes: 108 additions & 0 deletions
108
common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBUtil.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package whisk.core.database.cosmosdb | ||
|
||
import com.microsoft.azure.cosmosdb._ | ||
import com.microsoft.azure.cosmosdb.internal.Constants.Properties.{AGGREGATE, E_TAG, ID, SELF_LINK} | ||
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient | ||
import whisk.core.database.cosmosdb.CosmosDBConstants._ | ||
|
||
import scala.collection.immutable.Iterable | ||
|
||
private[cosmosdb] object CosmosDBConstants { | ||
val computed: String = "_c" | ||
|
||
val alias: String = "view" | ||
|
||
val cid: String = ID | ||
|
||
val etag: String = E_TAG | ||
|
||
val aggregate: String = AGGREGATE | ||
|
||
val selfLink: String = SELF_LINK | ||
} | ||
|
||
private[cosmosdb] trait CosmosDBUtil { | ||
|
||
/** | ||
* Prepares the json like select clause | ||
* {{{ | ||
* Seq("a", "b", "c.d.e") => | ||
* { "a" : r['a'], "b" : r['b'], "c" : { "d" : { "e" : r['c']['d']['e']}}, "id" : r['id']} AS view | ||
* }}} | ||
* Here it uses {{{r['keyName']}}} notation to avoid issues around using reserved words as field name | ||
*/ | ||
def prepareFieldClause(fields: Iterable[String]): String = { | ||
val m = fields.foldLeft(Map.empty[String, Any]) { (map, name) => | ||
addToMap(name, map) | ||
} | ||
val withId = addToMap(cid, m) | ||
val json = asJsonLikeString(withId) | ||
s"$json AS $alias" | ||
} | ||
|
||
private def addToMap(name: String, map: Map[String, _]): Map[String, Any] = name.split('.').toList match { | ||
case Nil => throw new IllegalStateException(s"'$name' split on '.' should not result in empty list") | ||
case x :: xs => addToMap(x, xs, Nil, map) | ||
} | ||
|
||
private def addToMap(key: String, | ||
children: List[String], | ||
keyPath: List[String], | ||
map: Map[String, Any]): Map[String, Any] = children match { | ||
case Nil => map + (key -> s"r${makeKeyPath(key :: keyPath)}") | ||
case x :: xs => | ||
map + (key -> addToMap(x, xs, key :: keyPath, map.getOrElse(key, Map.empty).asInstanceOf[Map[String, Any]])) | ||
} | ||
|
||
private def makeKeyPath(keyPath: List[String]) = keyPath.reverse.map(f => s"['$f']").mkString | ||
|
||
private def asJsonLikeString(m: Map[_, _]) = | ||
m.map { case (k, v) => s""" "$k" : ${asString(v)}""" }.mkString("{", ",", "}") | ||
|
||
private def asString(v: Any): String = v match { | ||
case m: Map[_, _] => asJsonLikeString(m) | ||
case x => x.toString | ||
} | ||
|
||
def createClient(config: CosmosDBConfig): AsyncDocumentClient = | ||
new AsyncDocumentClient.Builder() | ||
.withServiceEndpoint(config.endpoint) | ||
.withMasterKey(config.key) | ||
.withConnectionPolicy(ConnectionPolicy.GetDefault) | ||
.withConsistencyLevel(ConsistencyLevel.Session) | ||
.build | ||
|
||
/** | ||
* CosmosDB id considers '/', '\' , '?' and '#' as invalid. EntityNames can include '/' so | ||
* that need to be escaped. For that we use '|' as the replacement char | ||
*/ | ||
def escapeId(id: String): String = { | ||
require(!id.contains("|"), s"Id [$id] should not contain '|'") | ||
id.replace("/", "|") | ||
} | ||
|
||
def unescapeId(id: String): String = { | ||
require(!id.contains("/"), s"Escaped Id [$id] should not contain '/'") | ||
id.replace("|", "/") | ||
} | ||
|
||
} | ||
|
||
private[cosmosdb] object CosmosDBUtil extends CosmosDBUtil |
Oops, something went wrong.