Skip to content

Commit

Permalink
S3AttachmentStore (apache#3779)
Browse files Browse the repository at this point in the history
This PR introduces a S3AttachmentStore which is an AttachmentStore implementation for storing attachments in S3 API compatible object storages.
  • Loading branch information
chetanmeh authored and rabbah committed Jul 27, 2018
1 parent e4a4208 commit 55b21ca
Show file tree
Hide file tree
Showing 16 changed files with 646 additions and 33 deletions.
7 changes: 7 additions & 0 deletions common/scala/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ dependencies {
compile 'io.reactivex:rxscala_2.11:0.26.5'
compile 'io.reactivex:rxjava-reactive-streams:1.2.1'
compile 'com.microsoft.azure:azure-cosmosdb:2.0.0'

compile ('com.lightbend.akka:akka-stream-alpakka-s3_2.11:0.19') {
exclude group: 'commons-logging'
exclude group: 'org.apache.httpcomponents' //Not used as alpakka uses akka-http
exclude group: 'com.fasterxml.jackson.core'
exclude group: 'com.fasterxml.jackson.dataformat'
}
scoverage gradle.scoverage.deps
}

Expand Down
2 changes: 2 additions & 0 deletions common/scala/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Licensed to the Apache Software Foundation (ASF) under one or more contributor
# license agreements; and to You under the Apache License, Version 2.0.

include "s3-reference.conf"

whisk.spi {
ArtifactStoreProvider = whisk.core.database.CouchDbStoreProvider
ActivationStoreProvider = whisk.core.database.ArtifactActivationStoreProvider
Expand Down
78 changes: 78 additions & 0 deletions common/scala/src/main/resources/s3-reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Licensed to the Apache Software Foundation (ASF) under one or more contributor
# license agreements; and to You under the Apache License, Version 2.0.

whisk {
s3 {
# See https://developer.lightbend.com/docs/alpakka/current/s3.html#usage
alpakka {
# whether the buffer request chunks (up to 5MB each) to "memory" or "disk"
buffer = "memory"

# location for temporary files, if buffer is set to "disk". If empty, uses the standard java temp path.
disk-buffer-path = ""

proxy {
# hostname of the proxy. If undefined ("") proxy is not enabled.
host = ""
port = 8000

# if "secure" is set to "true" then HTTPS will be used for all requests to S3, otherwise HTTP will be used
secure = true
}

# default values for AWS configuration. If credentials and/or region are not specified when creating S3Client,
# these values will be used.
aws {
# If this section is absent, the fallback behavior is to use the
# com.amazonaws.auth.DefaultAWSCredentialsProviderChain instance to resolve credentials
credentials {
# supported providers:
# anon - anonymous requests ("no auth")
# static - static credentials,
# required params:
# access-key-id
# secret-access-key
# optional:
# token
# default: as described in com.amazonaws.auth.DefaultAWSCredentialsProviderChain docs,
# attempts to get the credentials from either:
# - environment variables
# - system properties
# - credentials file
# - EC2 credentials service
# - IAM / metadata
provider = default
}

# If this section is absent, the fallback behavior is to use the
# com.amazonaws.regions.AwsRegionProvider.DefaultAwsRegionProviderChain instance to resolve region
region {
# supported providers:
# static - static credentials,
# required params:
# default-region
# default: as described in com.amazonaws.regions.AwsRegionProvider.DefaultAwsRegionProviderChain docs,
# attempts to get the region from either:
# - environment variables
# - system properties
# - progile file
# - EC2 metadata
provider = default
}
}

# Enable path style access to s3, i.e. "https://s3-eu-west-1.amazonaws.com/my.bucket/myobject"
# Default is virtual-hosted style.
# When using virtual hosted–style buckets with SSL, the S3 wild card certificate only matches buckets that do not contain periods.
# Buckets containing periods will lead to certificate errors. In those cases it's useful to enable path-style access.
path-style-access = true

# Custom endpoint url, used for alternate s3 implementations
# endpoint-url = null

# Which version of the list bucket api to use. Set to 1 to use the old style version 1 API.
# By default the newer version 2 api is used.
list-bucket-api-version = 2
}
}
}
2 changes: 2 additions & 0 deletions common/scala/src/main/scala/whisk/core/WhiskConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -243,4 +243,6 @@ object ConfigKeys {
val containerProxy = "whisk.container-proxy"
val containerProxyTimeouts = s"$containerProxy.timeouts"

val s3 = "whisk.s3"

}
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,13 @@ trait AttachmentSupport[DocumentAbstraction <: DocumentSerializer] extends Defau
protected[database] def uriOf(bytesOrSource: Either[ByteString, Source[ByteString, _]], path: => String): Uri = {
bytesOrSource match {
case Left(bytes) => Uri.from(scheme = MemScheme, path = encode(bytes))
case Right(_) => Uri.from(scheme = attachmentScheme, path = path)
case Right(_) => uriFrom(scheme = attachmentScheme, path = path)
}
}

//Not using Uri.from due to https://github.com/akka/akka-http/issues/2080
protected[database] def uriFrom(scheme: String, path: String): Uri = Uri(s"$scheme:$path")

/**
* Constructs a source from inlined attachment contents
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
docStream: Source[ByteString, _])(implicit transid: TransactionId): Future[(DocInfo, Attached)] = {

if (maxInlineSize.toBytes == 0) {
val uri = Uri.from(scheme = attachmentScheme, path = UUID().asString)
val uri = uriFrom(scheme = attachmentScheme, path = UUID().asString)
for {
attached <- Future.successful(Attached(uri.toString, contentType))
i1 <- put(update(doc, attached))
Expand Down Expand Up @@ -550,7 +550,7 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
*/
private def getAttachmentName(name: String): String = {
Try(java.util.UUID.fromString(name))
.map(_ => Uri.from(scheme = attachmentScheme, path = name).toString)
.map(_ => uriFrom(scheme = attachmentScheme, path = name).toString)
.getOrElse(name)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package whisk.core.database.s3

import akka.actor.ActorSystem
import akka.http.scaladsl.model.ContentType
import akka.stream.ActorMaterializer
import akka.stream.alpakka.s3.scaladsl.S3Client
import akka.stream.alpakka.s3.{S3Exception, S3Settings}
import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString
import com.typesafe.config.Config
import pureconfig.loadConfigOrThrow
import whisk.common.LoggingMarkers.{DATABASE_ATTS_DELETE, DATABASE_ATT_DELETE, DATABASE_ATT_GET, DATABASE_ATT_SAVE}
import whisk.common.{Logging, TransactionId}
import whisk.core.ConfigKeys
import whisk.core.database.StoreUtils._
import whisk.core.database._
import whisk.core.entity.DocId

import scala.concurrent.{ExecutionContext, Future}
import scala.reflect.ClassTag

object S3AttachmentStoreProvider extends AttachmentStoreProvider {
val alpakkaConfigKey = s"${ConfigKeys.s3}.alpakka"
case class S3Config(bucket: String) {
def prefixFor[D](implicit tag: ClassTag[D]): String = {
tag.runtimeClass.getSimpleName.toLowerCase
}
}

override def makeStore[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
logging: Logging,
materializer: ActorMaterializer): AttachmentStore = {
val client = new S3Client(S3Settings(alpakkaConfigKey))
val config = loadConfigOrThrow[S3Config](ConfigKeys.s3)
new S3AttachmentStore(client, config.bucket, config.prefixFor[D])
}

def makeStore[D <: DocumentSerializer: ClassTag](config: Config)(implicit actorSystem: ActorSystem,
logging: Logging,
materializer: ActorMaterializer): AttachmentStore = {
val client = new S3Client(S3Settings(config, alpakkaConfigKey))
val s3config = loadConfigOrThrow[S3Config](config, ConfigKeys.s3)
new S3AttachmentStore(client, s3config.bucket, s3config.prefixFor[D])
}

}
class S3AttachmentStore(client: S3Client, bucket: String, prefix: String)(implicit system: ActorSystem,
logging: Logging,
materializer: ActorMaterializer)
extends AttachmentStore {
override val scheme = "s3"

override protected[core] implicit val executionContext: ExecutionContext = system.dispatcher

override protected[core] def attach(
docId: DocId,
name: String,
contentType: ContentType,
docStream: Source[ByteString, _])(implicit transid: TransactionId): Future[AttachResult] = {
require(name != null, "name undefined")
val start =
transid.started(this, DATABASE_ATT_SAVE, s"[ATT_PUT] uploading attachment '$name' of document 'id: $docId'")

//A possible optimization for small attachments < 5MB can be to use putObject instead of multipartUpload
//and thus use 1 remote call instead of 3
val f = docStream
.runWith(combinedSink(client.multipartUpload(bucket, objectKey(docId, name), contentType)))
.map(r => AttachResult(r.digest, r.length))

f.onSuccess({
case _ =>
transid
.finished(this, start, s"[ATT_PUT] '$prefix' completed uploading attachment '$name' of document 'id: $docId'")
})

reportFailure(
f,
start,
failure => s"[ATT_PUT] '$prefix' internal error, name: '$name', doc: '$docId', failure: '${failure.getMessage}'")
}

override protected[core] def readAttachment[T](docId: DocId, name: String, sink: Sink[ByteString, Future[T]])(
implicit transid: TransactionId): Future[T] = {
require(name != null, "name undefined")
val start =
transid.started(
this,
DATABASE_ATT_GET,
s"[ATT_GET] '$prefix' finding attachment '$name' of document 'id: $docId'")
val (source, _) = client.download(bucket, objectKey(docId, name))

val f = source.runWith(sink)

val g = f.transform(
{ s =>
transid
.finished(this, start, s"[ATT_GET] '$prefix' completed: found attachment '$name' of document 'id: $docId'")
s
}, {
case s: Throwable if isMissingKeyException(s) =>
transid
.finished(
this,
start,
s"[ATT_GET] '$prefix', retrieving attachment '$name' of document 'id: $docId'; not found.")
NoDocumentException("Not found on 'readAttachment'.")
case e => e
})

reportFailure(
g,
start,
failure =>
s"[ATT_GET] '$prefix' internal error, name: '$name', doc: 'id: $docId', failure: '${failure.getMessage}'")
}

override protected[core] def deleteAttachments(docId: DocId)(implicit transid: TransactionId): Future[Boolean] = {
val start =
transid.started(this, DATABASE_ATTS_DELETE, s"[ATT_DELETE] deleting attachments of document 'id: $docId'")

//S3 provides API to delete multiple objects in single call however alpakka client
//currently does not support that and also in current usage 1 docs has at most 1 attachment
//so current approach would also involve 2 remote calls
val f = client
.listBucket(bucket, Some(objectKeyPrefix(docId)))
.mapAsync(1)(bc => client.deleteObject(bc.bucketName, bc.key))
.runWith(Sink.seq)
.map(_ => true)

f.onSuccess {
case _ =>
transid.finished(this, start, s"[ATTS_DELETE] completed: deleting attachments of document 'id: $docId'")
}

reportFailure(
f,
start,
failure => s"[ATTS_DELETE] '$prefix' internal error, doc: '$docId', failure: '${failure.getMessage}'")
}

override protected[core] def deleteAttachment(docId: DocId, name: String)(
implicit transid: TransactionId): Future[Boolean] = {
val start =
transid.started(this, DATABASE_ATT_DELETE, s"[ATT_DELETE] deleting attachment '$name' of document 'id: $docId'")

val f = client
.deleteObject(bucket, objectKey(docId, name))
.map(_ => true)

f.onSuccess {
case _ =>
transid.finished(this, start, s"[ATT_DELETE] completed: deleting attachment '$name' of document 'id: $docId'")
}

reportFailure(
f,
start,
failure => s"[ATT_DELETE] '$prefix' internal error, doc: '$docId', failure: '${failure.getMessage}'")
}

override def shutdown(): Unit = {}

private def objectKey(id: DocId, name: String): String = s"$prefix/${id.id}/$name"

private def objectKeyPrefix(id: DocId): String = s"$prefix/${id.id}"

private def isMissingKeyException(e: Throwable): Boolean = {
//In some case S3Exception is a sub cause. So need to recurse
e match {
case s: S3Exception if s.code == "NoSuchKey" => true
case t if t != null && isMissingKeyException(t.getCause) => true
case _ => false
}
}
}
2 changes: 2 additions & 0 deletions tests/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ dependencies {
compile 'io.opentracing:opentracing-mock:0.31.0'
compile "org.apache.curator:curator-test:${gradle.curator.version}"

compile "com.amazonaws:aws-java-sdk-s3:1.11.295"

compile project(':common:scala')
compile project(':core:controller')
compile project(':core:invoker')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ object ActionContainer {
}.get // This fails if the docker binary couldn't be located.
}

private lazy val dockerCmd: String = {
lazy val dockerCmd: String = {
/*
* The docker host is set to a provided property 'docker.host' if it's
* available; otherwise we check with WhiskProperties to see whether we are
Expand Down
Loading

0 comments on commit 55b21ca

Please sign in to comment.