Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BT-711 Refresh SAS token for filesystem on expiry #6831

Merged
merged 18 commits into from
Sep 12, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package cromwell.filesystems.blob

import com.azure.core.credential.AzureSasCredential
import com.azure.storage.blob.nio.AzureFileSystem
import com.google.common.net.UrlEscapers
import cromwell.core.path.{NioPath, Path, PathBuilder}
import cromwell.filesystems.blob.BlobPathBuilder._

import java.net.{MalformedURLException, URI}
import java.nio.file.{FileSystem, FileSystemNotFoundException, FileSystems}
import scala.jdk.CollectionConverters._
import java.time.Instant
import java.time.temporal.TemporalAmount
import scala.language.postfixOps
import scala.util.{Failure, Try}

Expand All @@ -19,8 +18,8 @@ object BlobPathBuilder {
case class UnparsableBlobPath(errorMessage: Throwable) extends BlobPathValidation

def invalidBlobPathMessage(container: String, endpoint: String) = s"Malformed Blob URL for this builder. Expecting a URL for a container $container and endpoint $endpoint"
def parseURI(string: String) = URI.create(UrlEscapers.urlFragmentEscaper().escape(string))
def parseStorageAccount(uri: URI) = uri.getHost().split("\\.").filter(!_.isEmpty()).headOption
def parseURI(string: String): URI = URI.create(UrlEscapers.urlFragmentEscaper().escape(string))
def parseStorageAccount(uri: URI): Option[String] = uri.getHost.split("\\.").find(_.nonEmpty)

/**
* Validates a that a path from a string is a valid BlobPath of the format:
Expand All @@ -44,9 +43,9 @@ object BlobPathBuilder {
Try {
val uri = parseURI(string)
val storageAccount = parseStorageAccount(parseURI(endpoint))
val hasContainer = uri.getPath().split("/").filter(!_.isEmpty()).headOption.contains(container)
def hasEndpoint = parseStorageAccount(uri).contains(storageAccount.get)
if (hasContainer && !storageAccount.isEmpty && hasEndpoint) {
val hasContainer = uri.getPath.split("/").find(_.nonEmpty).contains(container)
val hasEndpoint = storageAccount.exists(parseStorageAccount(uri).contains(_))
if (hasContainer && storageAccount.isDefined && hasEndpoint) {
ValidBlobPath(uri.getPath.replaceFirst("/" + container, ""))
} else {
UnparsableBlobPath(new MalformedURLException(invalidBlobPathMessage(container, endpoint)))
Expand All @@ -55,39 +54,39 @@ object BlobPathBuilder {
}
}

class BlobPathBuilder(blobTokenGenerator: BlobTokenGenerator, container: String, endpoint: String) extends PathBuilder {

val credential: AzureSasCredential = new AzureSasCredential(blobTokenGenerator.getAccessToken)
val fileSystemConfig: Map[String, Object] = Map((AzureFileSystem.AZURE_STORAGE_SAS_TOKEN_CREDENTIAL, credential),
(AzureFileSystem.AZURE_STORAGE_FILE_STORES, container),
(AzureFileSystem.AZURE_STORAGE_SKIP_INITIAL_CONTAINER_CHECK, java.lang.Boolean.TRUE))

def retrieveFilesystem(uri: URI): Try[FileSystem] = {
Try(FileSystems.getFileSystem(uri)) recover {
// If no filesystem already exists, this will create a new connection, with the provided configs
case _: FileSystemNotFoundException => FileSystems.newFileSystem(uri, fileSystemConfig.asJava)
}
}
class BlobPathBuilder(fsm: FileSystemManager, container: String, endpoint: String) extends PathBuilder {

def build(string: String): Try[BlobPath] = {
validateBlobPath(string, container, endpoint) match {
case ValidBlobPath(path) => for {
fileSystem <- retrieveFilesystem(new URI("azb://?endpoint=" + endpoint))
nioPath <- Try(fileSystem.getPath(path))
blobPath = BlobPath(nioPath, endpoint, container)
} yield blobPath
case ValidBlobPath(path) => Try(BlobPath(path, endpoint, container, fsm))
case UnparsableBlobPath(errorMessage: Throwable) => Failure(errorMessage)
}
}

override def name: String = "Azure Blob Storage"
}

// Add args for container, storage account name
case class BlobPath private[blob](nioPath: NioPath, endpoint: String, container: String) extends Path {
override protected def newPath(nioPath: NioPath): Path = BlobPath(nioPath, endpoint, container)
case class BlobPath private[blob](pathString: String, endpoint: String, container: String, fsm: FileSystemManager) extends Path {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been trying to think of a way to make this work without including the FileSystemManager in the signature of this class. It's not something we want code outside the class referencing, and a BlobPath should be fully specified by the pathString, endpoint, and container. I don't think there's a way to insert private state when constructing a case class, though. Hmmmm, I'll keep thinking but this is fine for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would using an implicit argument be a workable strategy? This is something that should exist as a part of the Builder, so could I think just be implicitly supplied?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an interesting idea to put it in a separate set of args, not sure it gains much by being implicit. I think this is a little-used but valid pattern:

case class BlobPath(pathString: String, endpoint: String, container: String)(private val fsm: FileSystemManager)

This might be something that's a little easier to think about once the tests are in place.

//var token = blobTokenGenerator.getAccessToken
//var expiry = token.getSignature.split("&").filter(_.startsWith("se")).headOption.map(_.replaceFirst("se=",""))
override def nioPath: NioPath = findNioPath(path = pathString, endpoint, container)

override protected def newPath(nioPath: NioPath): Path = BlobPath(nioPath.toString, endpoint, container, fsm)

override def pathAsString: String = List(endpoint, container, nioPath.toString).mkString("/")

override def pathWithoutScheme: String = parseURI(endpoint).getHost + "/" + container + "/" + nioPath.toString

def findNioPath(path: String, endpoint: String, container: String): NioPath = (for {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this method be private?

fileSystem <- fsm.retrieveFilesystem()
nioPath = fileSystem.getPath(path)
} yield nioPath).get
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've discussed begrudgingly accepting that this method will throw. We should ensure it throws something useful, though. I think we should throw different informative error messages depending on whether we failed to get the filesystem or failed to create the NIO path.

}

override def pathAsString: String = List(endpoint, container, nioPath.toString()).mkString("/")
case class TokenExpiration(token: AzureSasCredential, buffer: TemporalAmount) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why this class ended up in here and not in BlobPathBuilderFactory.

val expiry = for {
expiryString <- token.getSignature.split("&").find(_.startsWith("se")).map(_.replaceFirst("se=","")).map(_.replace("%3A", ":"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This parsing (String => Instant) is a good candidate for unit testing, maybe break it out into its own method?

instant = Instant.parse(expiryString)
} yield instant

override def pathWithoutScheme: String = parseURI(endpoint).getHost + "/" + container + "/" + nioPath.toString()
def hasTokenExpired: Boolean = expiry.exists(_.isAfter(Instant.now.plus(buffer)))
}
Original file line number Diff line number Diff line change
@@ -1,43 +1,89 @@
package cromwell.filesystems.blob
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now is a good time to take another look at this file and see if there's anything we want to move out into other files in this package. There are a lot of top-level classes in here now, the whole group would probably be easier to understand with a little division.


import akka.actor.ActorSystem
import com.azure.core.credential.AzureSasCredential
import com.azure.core.management.AzureEnvironment
import com.azure.core.management.profile.AzureProfile
import com.azure.identity.DefaultAzureCredentialBuilder
import com.azure.resourcemanager.AzureResourceManager
import com.azure.storage.blob.BlobContainerClientBuilder
import com.azure.storage.blob.nio.AzureFileSystem
import com.azure.storage.blob.sas.{BlobContainerSasPermission, BlobServiceSasSignatureValues}
import com.azure.storage.common.StorageSharedKeyCredential
import com.typesafe.config.Config
import cromwell.core.WorkflowOptions
import cromwell.core.path.PathBuilderFactory
import net.ceedubs.ficus.Ficus._

import java.time.OffsetDateTime
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import java.net.URI
import java.nio.file.{FileSystem, FileSystemNotFoundException, FileSystems}
import java.time.temporal.ChronoUnit
import java.time.{Duration, OffsetDateTime}
import scala.concurrent.{ExecutionContext, Future}
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Try}

final case class BlobFileSystemConfig(config: Config)
final case class BlobPathBuilderFactory(globalConfig: Config, instanceConfig: Config, singletonConfig: BlobFileSystemConfig) extends PathBuilderFactory {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a LOT of important string identifiers floating around. Not necessarily in scope for this PR, but we should consider replacing them with simple case classes so that the compiler can check that we're passing them around correctly. Otherwise it's too easy to swap their ordering in a method call and introduce a very hard-to-find bug.

case class BlobContainerName(value: String)
case class StorageAccountName(value: String)
case class WorkspaceId(value: String)

...and so on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I will give this a try, It will likely help me get the tests right without accidentally swapping strings

val sasToken: String = instanceConfig.as[String]("sas-token")
val container: String = instanceConfig.as[String]("store")
val endpoint: String = instanceConfig.as[String]("endpoint")
val workspaceId: String = instanceConfig.as[String]("workspace-id")
val workspaceManagerURL: String = singletonConfig.config.as[String]("workspace-manager-url")
val workspaceId: Option[String] = instanceConfig.as[Option[String]]("workspace-id")
val workspaceManagerURL: Option[String] = singletonConfig.config.as[Option[String]]("workspace-manager-url")

val blobTokenGenerator: BlobTokenGenerator = BlobTokenGenerator.createBlobTokenGenerator(
container, endpoint, Option(workspaceId), Option(workspaceManagerURL))
val fsm = FileSystemManager(container, endpoint, 10, workspaceId, workspaceManagerURL)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually we'll want this buffer time to be configurable.


override def withOptions(options: WorkflowOptions)(implicit as: ActorSystem, ec: ExecutionContext): Future[BlobPathBuilder] = {
Future {
new BlobPathBuilder(blobTokenGenerator, container, endpoint)
new BlobPathBuilder(fsm, container, endpoint)
}
}
}

case class FileSystemManager(container: String,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would name this something more specific, like BlobFileSystemManager.

endpoint: String,
preemptionMinutes: Long,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use the term "preemption" for lower-cost cloud computing getting preempted, I'd rather not overload it. Maybe call this expiryBuffer or something?

workspaceId: Option[String] = None,
workspaceManagerURL: Option[String] = None) {

var expiry: Option[TokenExpiration] = None
val blobTokenGenerator: BlobTokenGenerator = BlobTokenGenerator.createBlobTokenGenerator(
container, endpoint, workspaceId, workspaceManagerURL)

def buildConfigMap(credential: AzureSasCredential, container: String): Map[String, Object] = {
Map((AzureFileSystem.AZURE_STORAGE_SAS_TOKEN_CREDENTIAL, credential),
(AzureFileSystem.AZURE_STORAGE_FILE_STORES, container),
(AzureFileSystem.AZURE_STORAGE_SKIP_INITIAL_CONTAINER_CHECK, java.lang.Boolean.TRUE))
}

def uri = new URI("azb://?endpoint=" + endpoint)

def retrieveFilesystem(): Try[FileSystem] = {
synchronized {
expiry.map(_.hasTokenExpired) match {
case Some(false) => Try(FileSystems.getFileSystem(uri)) recoverWith {
// If no filesystem already exists, this will create a new connection, with the provided configs
case _: FileSystemNotFoundException => blobTokenGenerator.generateAccessToken.flatMap(generateFilesystem(uri, container, _))
}
// If the token has expired, OR there is no token record, try to close the FS and regenerate
case _ => {
closeFileSystem(uri)
blobTokenGenerator.generateAccessToken.flatMap(generateFilesystem(uri, container, _))
}
}
}
}

def generateFilesystem(uri: URI, container: String, token: AzureSasCredential): Try[FileSystem] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this method be private?

expiry = Some(TokenExpiration(token, Duration.of(preemptionMinutes, ChronoUnit.MINUTES)))
Try(FileSystems.newFileSystem(uri, buildConfigMap(token, container).asJava))
}

def closeFileSystem(uri: URI): Try[Unit] = Try(FileSystems.getFileSystem(uri)).map(_.close)
}

sealed trait BlobTokenGenerator {
def getAccessToken: String
def generateAccessToken: Try[AzureSasCredential]
}

object BlobTokenGenerator {
Expand All @@ -57,13 +103,11 @@ object BlobTokenGenerator {
}

case class WSMBlobTokenGenerator(container: String, endpoint: String, workspaceId: String, workspaceManagerURL: String) extends BlobTokenGenerator {
def getAccessToken: String = {
throw new NotImplementedError
}
def generateAccessToken: Try[AzureSasCredential] = Failure(new NotImplementedError)
}

case class NativeBlobTokenGenerator(container: String, endpoint: String) extends BlobTokenGenerator {
def getAccessToken: String = {
def generateAccessToken: Try[AzureSasCredential] = {
val storageAccountName = BlobPathBuilder.parseStorageAccount(BlobPathBuilder.parseURI(endpoint)) match {
case Some(storageAccountName) => storageAccountName
case _ => throw new Exception("Storage account could not be parsed from endpoint")
Expand All @@ -73,7 +117,7 @@ case class NativeBlobTokenGenerator(container: String, endpoint: String) extends
val azureCredential = new DefaultAzureCredentialBuilder()
.authorityHost(profile.getEnvironment.getActiveDirectoryEndpoint)
.build
val azure = AzureResourceManager.authenticate(azureCredential, profile).withDefaultSubscription
val azure = AzureResourceManager.authenticate(azureCredential, profile).withDefaultSubscription()

val storageAccounts = azure.storageAccounts()
val storageAccount = storageAccounts
Expand Down Expand Up @@ -110,6 +154,6 @@ case class NativeBlobTokenGenerator(container: String, endpoint: String) extends
blobContainerSasPermission
)

blobContainerClient.generateSas(blobServiceSasSignatureValues)
Try(new AzureSasCredential(blobContainerClient.generateSas(blobServiceSasSignatureValues)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing this is just a draft to get it to compile with the right signature rather than the intended final form of this method. I suggest instead wrapping the whole body of the method in a Try, since that will prevent it from throwing.

}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cromwell.filesystems.blob
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import java.nio.file.Files

object BlobPathBuilderSpec {
def buildEndpoint(storageAccount: String) = s"https://$storageAccount.blob.core.windows.net"
Expand All @@ -27,7 +26,7 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers{
val testString = BlobPathBuilderSpec.buildEndpoint("badStorageAccount") + container + evalPath
BlobPathBuilder.validateBlobPath(testString, container, endpoint) match {
case BlobPathBuilder.ValidBlobPath(path) => fail(s"Valid path: $path found when verifying mismatched storage account")
case BlobPathBuilder.UnparsableBlobPath(errorMessage) => errorMessage.getMessage() should equal(BlobPathBuilder.invalidBlobPathMessage(container, endpoint))
case BlobPathBuilder.UnparsableBlobPath(errorMessage) => errorMessage.getMessage should equal(BlobPathBuilder.invalidBlobPathMessage(container, endpoint))
}
}

Expand All @@ -38,7 +37,7 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers{
val testString = endpoint + "badContainer" + evalPath
BlobPathBuilder.validateBlobPath(testString, container, endpoint) match {
case BlobPathBuilder.ValidBlobPath(path) => fail(s"Valid path: $path found when verifying mismatched container")
case BlobPathBuilder.UnparsableBlobPath(errorMessage) => errorMessage.getMessage() should equal(BlobPathBuilder.invalidBlobPathMessage(container, endpoint))
case BlobPathBuilder.UnparsableBlobPath(errorMessage) => errorMessage.getMessage should equal(BlobPathBuilder.invalidBlobPathMessage(container, endpoint))
}
}

Expand All @@ -47,16 +46,15 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers{
val endpointHost = BlobPathBuilder.parseURI(endpoint).getHost
val store = "inputs"
val evalPath = "/test/inputFile.txt"
val blobTokenGenerator: BlobTokenGenerator = BlobTokenGenerator.createBlobTokenGenerator(store, endpoint)
val fsm: FileSystemManager = FileSystemManager(store, endpoint, 10)
val testString = endpoint + "/" + store + evalPath
val blobPath: BlobPath = new BlobPathBuilder(blobTokenGenerator, store, endpoint) build testString getOrElse fail()
val blobPath: BlobPath = new BlobPathBuilder(fsm, store, endpoint) build testString getOrElse fail()

blobPath.container should equal(store)
blobPath.endpoint should equal(endpoint)
blobPath.pathAsString should equal(testString)
blobPath.pathWithoutScheme should equal(endpointHost + "/" + store + evalPath)

val is = Files.newInputStream(blobPath.nioPath)
val is = blobPath.newInputStream()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this new way of getting the input stream use blobPath.nioPath under the covers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed, I think both ways work now, but I left it using this new way just since it was neater

val fileText = (is.readAllBytes.map(_.toChar)).mkString
fileText should include ("This is my test file!!!! Did it work?")
}
Expand All @@ -65,10 +63,14 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers{
val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage")
val store = "inputs"
val evalPath = "/test/inputFile.txt"
val blobTokenGenerator: BlobTokenGenerator = BlobTokenGenerator.createBlobTokenGenerator(store, endpoint)
val fsm: FileSystemManager = FileSystemManager(store, endpoint, 10)
val testString = endpoint + "/" + store + evalPath
val blobPath1: BlobPath = new BlobPathBuilder(blobTokenGenerator, store, endpoint) build testString getOrElse fail()
val blobPath2: BlobPath = new BlobPathBuilder(blobTokenGenerator, store, endpoint) build testString getOrElse fail()
val blobPath1: BlobPath = new BlobPathBuilder(fsm, store, endpoint) build testString getOrElse fail()
blobPath1.nioPath.getFileSystem.close()
val blobPath2: BlobPath = new BlobPathBuilder(fsm, store, endpoint) build testString getOrElse fail()
blobPath1 should equal(blobPath2)
val is = blobPath1.newInputStream()
val fileText = (is.readAllBytes.map(_.toChar)).mkString
fileText should include ("This is my test file!!!! Did it work?")
}
}