-
Notifications
You must be signed in to change notification settings - Fork 359
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BT-711 Refresh SAS token for filesystem on expiry #6831
Changes from 6 commits
90be7d3
77dcf19
00011bc
1a9e556
45b9b45
8f00cc1
832e546
c907917
009ac6a
cd02eb2
e113bb1
313b9fc
6ac6c32
e687360
ca965da
930449c
193e246
20c5db7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,14 +1,13 @@ | ||
package cromwell.filesystems.blob | ||
|
||
import com.azure.core.credential.AzureSasCredential | ||
import com.azure.storage.blob.nio.AzureFileSystem | ||
import com.google.common.net.UrlEscapers | ||
import cromwell.core.path.{NioPath, Path, PathBuilder} | ||
import cromwell.filesystems.blob.BlobPathBuilder._ | ||
|
||
import java.net.{MalformedURLException, URI} | ||
import java.nio.file.{FileSystem, FileSystemNotFoundException, FileSystems} | ||
import scala.jdk.CollectionConverters._ | ||
import java.time.Instant | ||
import java.time.temporal.TemporalAmount | ||
import scala.language.postfixOps | ||
import scala.util.{Failure, Try} | ||
|
||
|
@@ -19,8 +18,8 @@ object BlobPathBuilder { | |
case class UnparsableBlobPath(errorMessage: Throwable) extends BlobPathValidation | ||
|
||
def invalidBlobPathMessage(container: String, endpoint: String) = s"Malformed Blob URL for this builder. Expecting a URL for a container $container and endpoint $endpoint" | ||
def parseURI(string: String) = URI.create(UrlEscapers.urlFragmentEscaper().escape(string)) | ||
def parseStorageAccount(uri: URI) = uri.getHost().split("\\.").filter(!_.isEmpty()).headOption | ||
def parseURI(string: String): URI = URI.create(UrlEscapers.urlFragmentEscaper().escape(string)) | ||
def parseStorageAccount(uri: URI): Option[String] = uri.getHost.split("\\.").find(_.nonEmpty) | ||
|
||
/** | ||
* Validates a that a path from a string is a valid BlobPath of the format: | ||
|
@@ -44,9 +43,9 @@ object BlobPathBuilder { | |
Try { | ||
val uri = parseURI(string) | ||
val storageAccount = parseStorageAccount(parseURI(endpoint)) | ||
val hasContainer = uri.getPath().split("/").filter(!_.isEmpty()).headOption.contains(container) | ||
def hasEndpoint = parseStorageAccount(uri).contains(storageAccount.get) | ||
if (hasContainer && !storageAccount.isEmpty && hasEndpoint) { | ||
val hasContainer = uri.getPath.split("/").find(_.nonEmpty).contains(container) | ||
val hasEndpoint = storageAccount.exists(parseStorageAccount(uri).contains(_)) | ||
if (hasContainer && storageAccount.isDefined && hasEndpoint) { | ||
ValidBlobPath(uri.getPath.replaceFirst("/" + container, "")) | ||
} else { | ||
UnparsableBlobPath(new MalformedURLException(invalidBlobPathMessage(container, endpoint))) | ||
|
@@ -55,39 +54,39 @@ object BlobPathBuilder { | |
} | ||
} | ||
|
||
class BlobPathBuilder(blobTokenGenerator: BlobTokenGenerator, container: String, endpoint: String) extends PathBuilder { | ||
|
||
val credential: AzureSasCredential = new AzureSasCredential(blobTokenGenerator.getAccessToken) | ||
val fileSystemConfig: Map[String, Object] = Map((AzureFileSystem.AZURE_STORAGE_SAS_TOKEN_CREDENTIAL, credential), | ||
(AzureFileSystem.AZURE_STORAGE_FILE_STORES, container), | ||
(AzureFileSystem.AZURE_STORAGE_SKIP_INITIAL_CONTAINER_CHECK, java.lang.Boolean.TRUE)) | ||
|
||
def retrieveFilesystem(uri: URI): Try[FileSystem] = { | ||
Try(FileSystems.getFileSystem(uri)) recover { | ||
// If no filesystem already exists, this will create a new connection, with the provided configs | ||
case _: FileSystemNotFoundException => FileSystems.newFileSystem(uri, fileSystemConfig.asJava) | ||
} | ||
} | ||
class BlobPathBuilder(fsm: FileSystemManager, container: String, endpoint: String) extends PathBuilder { | ||
|
||
def build(string: String): Try[BlobPath] = { | ||
validateBlobPath(string, container, endpoint) match { | ||
case ValidBlobPath(path) => for { | ||
fileSystem <- retrieveFilesystem(new URI("azb://?endpoint=" + endpoint)) | ||
nioPath <- Try(fileSystem.getPath(path)) | ||
blobPath = BlobPath(nioPath, endpoint, container) | ||
} yield blobPath | ||
case ValidBlobPath(path) => Try(BlobPath(path, endpoint, container, fsm)) | ||
case UnparsableBlobPath(errorMessage: Throwable) => Failure(errorMessage) | ||
} | ||
} | ||
|
||
override def name: String = "Azure Blob Storage" | ||
} | ||
|
||
// Add args for container, storage account name | ||
case class BlobPath private[blob](nioPath: NioPath, endpoint: String, container: String) extends Path { | ||
override protected def newPath(nioPath: NioPath): Path = BlobPath(nioPath, endpoint, container) | ||
case class BlobPath private[blob](pathString: String, endpoint: String, container: String, fsm: FileSystemManager) extends Path { | ||
//var token = blobTokenGenerator.getAccessToken | ||
//var expiry = token.getSignature.split("&").filter(_.startsWith("se")).headOption.map(_.replaceFirst("se=","")) | ||
override def nioPath: NioPath = findNioPath(path = pathString, endpoint, container) | ||
|
||
override protected def newPath(nioPath: NioPath): Path = BlobPath(nioPath.toString, endpoint, container, fsm) | ||
|
||
override def pathAsString: String = List(endpoint, container, nioPath.toString).mkString("/") | ||
|
||
override def pathWithoutScheme: String = parseURI(endpoint).getHost + "/" + container + "/" + nioPath.toString | ||
|
||
def findNioPath(path: String, endpoint: String, container: String): NioPath = (for { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this method be private? |
||
fileSystem <- fsm.retrieveFilesystem() | ||
nioPath = fileSystem.getPath(path) | ||
} yield nioPath).get | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We've discussed begrudgingly accepting that this method will throw. We should ensure it throws something useful, though. I think we should throw different informative error messages depending on whether we failed to get the filesystem or failed to create the NIO path. |
||
} | ||
|
||
override def pathAsString: String = List(endpoint, container, nioPath.toString()).mkString("/") | ||
case class TokenExpiration(token: AzureSasCredential, buffer: TemporalAmount) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Curious why this class ended up in here and not in |
||
val expiry = for { | ||
expiryString <- token.getSignature.split("&").find(_.startsWith("se")).map(_.replaceFirst("se=","")).map(_.replace("%3A", ":")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This parsing ( |
||
instant = Instant.parse(expiryString) | ||
} yield instant | ||
|
||
override def pathWithoutScheme: String = parseURI(endpoint).getHost + "/" + container + "/" + nioPath.toString() | ||
def hasTokenExpired: Boolean = expiry.exists(_.isAfter(Instant.now.plus(buffer))) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,43 +1,89 @@ | ||
package cromwell.filesystems.blob | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now is a good time to take another look at this file and see if there's anything we want to move out into other files in this package. There are a lot of top-level classes in here now, the whole group would probably be easier to understand with a little division. |
||
|
||
import akka.actor.ActorSystem | ||
import com.azure.core.credential.AzureSasCredential | ||
import com.azure.core.management.AzureEnvironment | ||
import com.azure.core.management.profile.AzureProfile | ||
import com.azure.identity.DefaultAzureCredentialBuilder | ||
import com.azure.resourcemanager.AzureResourceManager | ||
import com.azure.storage.blob.BlobContainerClientBuilder | ||
import com.azure.storage.blob.nio.AzureFileSystem | ||
import com.azure.storage.blob.sas.{BlobContainerSasPermission, BlobServiceSasSignatureValues} | ||
import com.azure.storage.common.StorageSharedKeyCredential | ||
import com.typesafe.config.Config | ||
import cromwell.core.WorkflowOptions | ||
import cromwell.core.path.PathBuilderFactory | ||
import net.ceedubs.ficus.Ficus._ | ||
|
||
import java.time.OffsetDateTime | ||
import scala.concurrent.ExecutionContext | ||
import scala.concurrent.Future | ||
import java.net.URI | ||
import java.nio.file.{FileSystem, FileSystemNotFoundException, FileSystems} | ||
import java.time.temporal.ChronoUnit | ||
import java.time.{Duration, OffsetDateTime} | ||
import scala.concurrent.{ExecutionContext, Future} | ||
import scala.jdk.CollectionConverters._ | ||
import scala.util.{Failure, Try} | ||
|
||
final case class BlobFileSystemConfig(config: Config) | ||
final case class BlobPathBuilderFactory(globalConfig: Config, instanceConfig: Config, singletonConfig: BlobFileSystemConfig) extends PathBuilderFactory { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have a LOT of important string identifiers floating around. Not necessarily in scope for this PR, but we should consider replacing them with simple case classes so that the compiler can check that we're passing them around correctly. Otherwise it's too easy to swap their ordering in a method call and introduce a very hard-to-find bug.
...and so on. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I will give this a try, It will likely help me get the tests right without accidentally swapping strings |
||
val sasToken: String = instanceConfig.as[String]("sas-token") | ||
val container: String = instanceConfig.as[String]("store") | ||
val endpoint: String = instanceConfig.as[String]("endpoint") | ||
val workspaceId: String = instanceConfig.as[String]("workspace-id") | ||
val workspaceManagerURL: String = singletonConfig.config.as[String]("workspace-manager-url") | ||
val workspaceId: Option[String] = instanceConfig.as[Option[String]]("workspace-id") | ||
val workspaceManagerURL: Option[String] = singletonConfig.config.as[Option[String]]("workspace-manager-url") | ||
|
||
val blobTokenGenerator: BlobTokenGenerator = BlobTokenGenerator.createBlobTokenGenerator( | ||
container, endpoint, Option(workspaceId), Option(workspaceManagerURL)) | ||
val fsm = FileSystemManager(container, endpoint, 10, workspaceId, workspaceManagerURL) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Eventually we'll want this buffer time to be configurable. |
||
|
||
override def withOptions(options: WorkflowOptions)(implicit as: ActorSystem, ec: ExecutionContext): Future[BlobPathBuilder] = { | ||
Future { | ||
new BlobPathBuilder(blobTokenGenerator, container, endpoint) | ||
new BlobPathBuilder(fsm, container, endpoint) | ||
} | ||
} | ||
} | ||
|
||
case class FileSystemManager(container: String, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would name this something more specific, like |
||
endpoint: String, | ||
preemptionMinutes: Long, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We use the term "preemption" for lower-cost cloud computing getting preempted, I'd rather not overload it. Maybe call this |
||
workspaceId: Option[String] = None, | ||
workspaceManagerURL: Option[String] = None) { | ||
|
||
var expiry: Option[TokenExpiration] = None | ||
val blobTokenGenerator: BlobTokenGenerator = BlobTokenGenerator.createBlobTokenGenerator( | ||
container, endpoint, workspaceId, workspaceManagerURL) | ||
|
||
def buildConfigMap(credential: AzureSasCredential, container: String): Map[String, Object] = { | ||
Map((AzureFileSystem.AZURE_STORAGE_SAS_TOKEN_CREDENTIAL, credential), | ||
(AzureFileSystem.AZURE_STORAGE_FILE_STORES, container), | ||
(AzureFileSystem.AZURE_STORAGE_SKIP_INITIAL_CONTAINER_CHECK, java.lang.Boolean.TRUE)) | ||
} | ||
|
||
def uri = new URI("azb://?endpoint=" + endpoint) | ||
|
||
def retrieveFilesystem(): Try[FileSystem] = { | ||
synchronized { | ||
expiry.map(_.hasTokenExpired) match { | ||
case Some(false) => Try(FileSystems.getFileSystem(uri)) recoverWith { | ||
// If no filesystem already exists, this will create a new connection, with the provided configs | ||
case _: FileSystemNotFoundException => blobTokenGenerator.generateAccessToken.flatMap(generateFilesystem(uri, container, _)) | ||
} | ||
// If the token has expired, OR there is no token record, try to close the FS and regenerate | ||
case _ => { | ||
closeFileSystem(uri) | ||
blobTokenGenerator.generateAccessToken.flatMap(generateFilesystem(uri, container, _)) | ||
} | ||
} | ||
} | ||
} | ||
|
||
def generateFilesystem(uri: URI, container: String, token: AzureSasCredential): Try[FileSystem] = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this method be private? |
||
expiry = Some(TokenExpiration(token, Duration.of(preemptionMinutes, ChronoUnit.MINUTES))) | ||
Try(FileSystems.newFileSystem(uri, buildConfigMap(token, container).asJava)) | ||
} | ||
|
||
def closeFileSystem(uri: URI): Try[Unit] = Try(FileSystems.getFileSystem(uri)).map(_.close) | ||
} | ||
|
||
sealed trait BlobTokenGenerator { | ||
def getAccessToken: String | ||
def generateAccessToken: Try[AzureSasCredential] | ||
} | ||
|
||
object BlobTokenGenerator { | ||
|
@@ -57,13 +103,11 @@ object BlobTokenGenerator { | |
} | ||
|
||
case class WSMBlobTokenGenerator(container: String, endpoint: String, workspaceId: String, workspaceManagerURL: String) extends BlobTokenGenerator { | ||
def getAccessToken: String = { | ||
throw new NotImplementedError | ||
} | ||
def generateAccessToken: Try[AzureSasCredential] = Failure(new NotImplementedError) | ||
} | ||
|
||
case class NativeBlobTokenGenerator(container: String, endpoint: String) extends BlobTokenGenerator { | ||
def getAccessToken: String = { | ||
def generateAccessToken: Try[AzureSasCredential] = { | ||
val storageAccountName = BlobPathBuilder.parseStorageAccount(BlobPathBuilder.parseURI(endpoint)) match { | ||
case Some(storageAccountName) => storageAccountName | ||
case _ => throw new Exception("Storage account could not be parsed from endpoint") | ||
|
@@ -73,7 +117,7 @@ case class NativeBlobTokenGenerator(container: String, endpoint: String) extends | |
val azureCredential = new DefaultAzureCredentialBuilder() | ||
.authorityHost(profile.getEnvironment.getActiveDirectoryEndpoint) | ||
.build | ||
val azure = AzureResourceManager.authenticate(azureCredential, profile).withDefaultSubscription | ||
val azure = AzureResourceManager.authenticate(azureCredential, profile).withDefaultSubscription() | ||
|
||
val storageAccounts = azure.storageAccounts() | ||
val storageAccount = storageAccounts | ||
|
@@ -110,6 +154,6 @@ case class NativeBlobTokenGenerator(container: String, endpoint: String) extends | |
blobContainerSasPermission | ||
) | ||
|
||
blobContainerClient.generateSas(blobServiceSasSignatureValues) | ||
Try(new AzureSasCredential(blobContainerClient.generateSas(blobServiceSasSignatureValues))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm guessing this is just a draft to get it to compile with the right signature rather than the intended final form of this method. I suggest instead wrapping the whole body of the method in a |
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,6 @@ | ||
package cromwell.filesystems.blob | ||
import org.scalatest.flatspec.AnyFlatSpec | ||
import org.scalatest.matchers.should.Matchers | ||
import java.nio.file.Files | ||
|
||
object BlobPathBuilderSpec { | ||
def buildEndpoint(storageAccount: String) = s"https://$storageAccount.blob.core.windows.net" | ||
|
@@ -27,7 +26,7 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers{ | |
val testString = BlobPathBuilderSpec.buildEndpoint("badStorageAccount") + container + evalPath | ||
BlobPathBuilder.validateBlobPath(testString, container, endpoint) match { | ||
case BlobPathBuilder.ValidBlobPath(path) => fail(s"Valid path: $path found when verifying mismatched storage account") | ||
case BlobPathBuilder.UnparsableBlobPath(errorMessage) => errorMessage.getMessage() should equal(BlobPathBuilder.invalidBlobPathMessage(container, endpoint)) | ||
case BlobPathBuilder.UnparsableBlobPath(errorMessage) => errorMessage.getMessage should equal(BlobPathBuilder.invalidBlobPathMessage(container, endpoint)) | ||
} | ||
} | ||
|
||
|
@@ -38,7 +37,7 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers{ | |
val testString = endpoint + "badContainer" + evalPath | ||
BlobPathBuilder.validateBlobPath(testString, container, endpoint) match { | ||
case BlobPathBuilder.ValidBlobPath(path) => fail(s"Valid path: $path found when verifying mismatched container") | ||
case BlobPathBuilder.UnparsableBlobPath(errorMessage) => errorMessage.getMessage() should equal(BlobPathBuilder.invalidBlobPathMessage(container, endpoint)) | ||
case BlobPathBuilder.UnparsableBlobPath(errorMessage) => errorMessage.getMessage should equal(BlobPathBuilder.invalidBlobPathMessage(container, endpoint)) | ||
} | ||
} | ||
|
||
|
@@ -47,16 +46,15 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers{ | |
val endpointHost = BlobPathBuilder.parseURI(endpoint).getHost | ||
val store = "inputs" | ||
val evalPath = "/test/inputFile.txt" | ||
val blobTokenGenerator: BlobTokenGenerator = BlobTokenGenerator.createBlobTokenGenerator(store, endpoint) | ||
val fsm: FileSystemManager = FileSystemManager(store, endpoint, 10) | ||
val testString = endpoint + "/" + store + evalPath | ||
val blobPath: BlobPath = new BlobPathBuilder(blobTokenGenerator, store, endpoint) build testString getOrElse fail() | ||
val blobPath: BlobPath = new BlobPathBuilder(fsm, store, endpoint) build testString getOrElse fail() | ||
|
||
blobPath.container should equal(store) | ||
blobPath.endpoint should equal(endpoint) | ||
blobPath.pathAsString should equal(testString) | ||
blobPath.pathWithoutScheme should equal(endpointHost + "/" + store + evalPath) | ||
|
||
val is = Files.newInputStream(blobPath.nioPath) | ||
val is = blobPath.newInputStream() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this new way of getting the input stream use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indeed, I think both ways work now, but I left it using this new way just since it was neater |
||
val fileText = (is.readAllBytes.map(_.toChar)).mkString | ||
fileText should include ("This is my test file!!!! Did it work?") | ||
} | ||
|
@@ -65,10 +63,14 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers{ | |
val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage") | ||
val store = "inputs" | ||
val evalPath = "/test/inputFile.txt" | ||
val blobTokenGenerator: BlobTokenGenerator = BlobTokenGenerator.createBlobTokenGenerator(store, endpoint) | ||
val fsm: FileSystemManager = FileSystemManager(store, endpoint, 10) | ||
val testString = endpoint + "/" + store + evalPath | ||
val blobPath1: BlobPath = new BlobPathBuilder(blobTokenGenerator, store, endpoint) build testString getOrElse fail() | ||
val blobPath2: BlobPath = new BlobPathBuilder(blobTokenGenerator, store, endpoint) build testString getOrElse fail() | ||
val blobPath1: BlobPath = new BlobPathBuilder(fsm, store, endpoint) build testString getOrElse fail() | ||
blobPath1.nioPath.getFileSystem.close() | ||
val blobPath2: BlobPath = new BlobPathBuilder(fsm, store, endpoint) build testString getOrElse fail() | ||
blobPath1 should equal(blobPath2) | ||
val is = blobPath1.newInputStream() | ||
val fileText = (is.readAllBytes.map(_.toChar)).mkString | ||
fileText should include ("This is my test file!!!! Did it work?") | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been trying to think of a way to make this work without including the
FileSystemManager
in the signature of this class. It's not something we want code outside the class referencing, and aBlobPath
should be fully specified by thepathString
,endpoint
, andcontainer
. I don't think there's a way to insert private state when constructing a case class, though. Hmmmm, I'll keep thinking but this is fine for now.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would using an implicit argument be a workable strategy? This is something that should exist as a part of the Builder, so could I think just be implicitly supplied?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's an interesting idea to put it in a separate set of args, not sure it gains much by being implicit. I think this is a little-used but valid pattern:
This might be something that's a little easier to think about once the tests are in place.