Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow linking with copy/delete in storage service #4728

Merged
merged 2 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ val akkaCorsVersion = "1.2.0"
val akkaVersion = "2.6.21"
val alpakkaVersion = "3.0.4"
val apacheCompressVersion = "1.25.0"
val apacheIOVersion = "2.15.1"
val awsSdkVersion = "2.17.184"
val byteBuddyAgentVersion = "1.10.17"
val betterMonadicForVersion = "0.3.1"
Expand Down Expand Up @@ -74,6 +75,7 @@ lazy val alpakkaFile = "com.lightbend.akka" %% "akka-stream-alp
lazy val alpakkaSse = "com.lightbend.akka" %% "akka-stream-alpakka-sse" % alpakkaVersion
lazy val alpakkaS3 = "com.lightbend.akka" %% "akka-stream-alpakka-s3" % alpakkaVersion
lazy val apacheCompress = "org.apache.commons" % "commons-compress" % apacheCompressVersion
lazy val apacheIO = "commons-io" % "commons-io" % apacheIOVersion
lazy val awsSdk = "software.amazon.awssdk" % "s3" % awsSdkVersion
lazy val betterMonadicFor = "com.olegpy" %% "better-monadic-for" % betterMonadicForVersion
lazy val byteBuddyAgent = "net.bytebuddy" % "byte-buddy-agent" % byteBuddyAgentVersion
Expand Down Expand Up @@ -756,6 +758,7 @@ lazy val storage = project
Docker / packageName := "nexus-storage",
libraryDependencies ++= Seq(
apacheCompress,
apacheIO,
akkaHttp,
akkaHttpCirce,
akkaStream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@ import cats.implicits._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.error.Rejection
import fs2.io.file.{CopyFlag, CopyFlags, Files, Path}
import java.nio.file.{Path => JPath}

trait TransactionalFileCopier {
def copyAll(files: NonEmptyList[CopyBetween]): IO[Unit]
}

final case class CopyBetween(source: Path, destination: Path)
object CopyBetween {
def mk(source: JPath, dest: JPath) = CopyBetween(Path.fromNioPath(source), Path.fromNioPath(dest))
}

final case class CopyOperationFailed(failingCopy: CopyBetween, e: Throwable) extends Rejection {
override def reason: String =
Expand Down
2 changes: 2 additions & 0 deletions storage/src/main/resources/app.conf
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ app {
# permissions fixer
fixer-enabled = false
fixer-command = []
# if atomic move (e.g. mv or rename) isn't supported, link using a copy and delete instead
link-with-atomic-move = false
}

# Allows to define default media types for the given file extensions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ object Main extends IOApp {
logger.warning("The application has been configured with anonymous, the caller will not be verified !")
}

logger.info(s"==== Full configuration is $appConfig ====")

val routes: Route = Routes(storages)

val httpBinding: Future[Http.ServerBinding] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import akka.stream.alpakka.file.scaladsl.Directory
import akka.stream.scaladsl.{FileIO, Keep}
import cats.data.{EitherT, NonEmptyList}
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.utils.{CopyBetween, TransactionalFileCopier}
import ch.epfl.bluebrain.nexus.storage.File._
import ch.epfl.bluebrain.nexus.storage.Rejection.PathNotFound
Expand All @@ -18,10 +19,10 @@ import ch.epfl.bluebrain.nexus.storage.attributes.{AttributesCache, ContentTypeD
import ch.epfl.bluebrain.nexus.storage.config.AppConfig.{DigestConfig, StorageConfig}
import ch.epfl.bluebrain.nexus.storage.files.{CopyFileOutput, ValidateFile}
import ch.epfl.bluebrain.nexus.storage.routes.CopyFile
import org.apache.commons.io.FileUtils

import java.nio.file.StandardCopyOption._
import java.nio.file.{Files, Path}
import fs2.io.file.{Path => Fs2Path}
import java.security.MessageDigest
import scala.concurrent.{ExecutionContext, Future}
import scala.sys.process._
Expand Down Expand Up @@ -159,9 +160,6 @@ object Storages {
type PathDoesNotExist = PathDoesNotExist.type
}

/**
* An Disk implementation of Storage interface.
*/
final class DiskStorage(
config: StorageConfig,
contentTypeDetector: ContentTypeDetector,
Expand All @@ -174,18 +172,35 @@ object Storages {
mt: Materializer
) extends Storages[AkkaSource] {

private val log = Logger[DiskStorage]
private val linkWithAtomicMove = config.linkWithAtomicMove.getOrElse(true)

private def logUnsafe(msg: String): Unit = {
import cats.effect.unsafe.implicits.global
log.info(msg).unsafeRunSync()
}

def exists(name: String): BucketExistence = {
val path = basePath(config, name)
if (path.getParent.getParent != config.rootVolume) BucketDoesNotExist
else if (Files.isDirectory(path) && Files.isReadable(path)) BucketExists
else BucketDoesNotExist
logUnsafe(s"Checking bucket existence at path $path")
if (path.getParent.getParent != config.rootVolume) {
logUnsafe(s"Invalid bucket because the root volume is not two directories above $path")
BucketDoesNotExist
} else if (Files.isDirectory(path) && Files.isReadable(path)) BucketExists
else {
logUnsafe(s"Invalid bucket because $path is not a readable directory")
BucketDoesNotExist
}
}

def pathExists(name: String, path: Uri.Path): PathExistence = {
val absPath = filePath(config, name, path)
if (Files.exists(absPath) && Files.isReadable(absPath) && descendantOf(absPath, basePath(config, name)))
PathExists
else PathDoesNotExist
else {
logUnsafe(s"Invalid absolute path $absPath for bucket $name and relative path $path")
PathDoesNotExist
}
}

def createFile(
Expand All @@ -195,6 +210,7 @@ object Storages {
)(implicit bucketEv: BucketExists, pathEv: PathDoesNotExist): IO[FileAttributes] =
for {
validated <- validateFile.forCreate(name, path)
_ <- log.info(s"Creating file in bucket $name at path $path")
_ <- IO.blocking(Files.createDirectories(validated.absDestPath.getParent))
msgDigest <- IO.delay(MessageDigest.getInstance(digestConfig.algorithm))
attributes <- streamFileContents(source, path, validated.absDestPath, msgDigest)
Expand Down Expand Up @@ -244,10 +260,11 @@ object Storages {
val process = Process(config.fixerCommand :+ absPath)

for {
_ <- log.info(s"Fixing permissions for file at $absPath")
exitCode <- IO.blocking(process ! logger)
_ <- IO.raiseUnless(exitCode == 0)(PermissionsFixingFailed(absPath, logger.toString))
} yield ()
} else IO.unit
} else log.info(s"Not changing permissions for file at $path")

private def computeSizeAndMoveFile(
absSourcePath: Path,
Expand All @@ -256,12 +273,26 @@ object Storages {
): IO[FileAttributes] =
for {
computedSize <- size(absSourcePath)
_ <- IO.blocking(Files.createDirectories(absDestPath.getParent))
_ <- IO.blocking(Files.move(absSourcePath, absDestPath, ATOMIC_MOVE))
msg = if (linkWithAtomicMove) "atomic move" else "copy and delete"
_ <- log.info(s"Performing link with $msg from $absSourcePath to $absDestPath")
_ <- if (linkWithAtomicMove) doMove(absSourcePath, absDestPath)
else doCopyAndDelete(absSourcePath, absDestPath, isDir)
_ <- IO.delay(cache.asyncComputePut(absDestPath, digestConfig.algorithm))
mediaType <- IO.blocking(contentTypeDetector(absDestPath, isDir))
} yield FileAttributes(absDestPath.toAkkaUri, computedSize, Digest.empty, mediaType)

private def doMove(absSourcePath: Path, absDestPath: Path): IO[Unit] =
IO.blocking(Files.createDirectories(absDestPath.getParent)) >>
IO.blocking(Files.move(absSourcePath, absDestPath, ATOMIC_MOVE)).void

private def doCopyAndDelete(absSourcePath: Path, absDestPath: Path, isDir: Boolean): IO[Unit] =
if (isDir)
IO.blocking(FileUtils.copyDirectory(absSourcePath.toFile, absDestPath.toFile)) >>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No lib we already have allows to copy directories ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that I could see, java nio doesn't and fs2 basically just wraps that. This avoids having to implement by hand using e.g. using FileVisitor or akka streams. Docs here - https://commons.apache.org/proper/commons-io/javadocs/api-release/org/apache/commons/io/FileUtils.html#copyDirectory-java.io.File-java.io.File-

IO.blocking(FileUtils.deleteDirectory(absSourcePath.toFile))
else
copyFiles.copyAll(NonEmptyList.of(CopyBetween.mk(absSourcePath, absDestPath))) >>
IO.blocking(Files.delete(absSourcePath))

private def size(absPath: Path): IO[Long] =
if (Files.isDirectory(absPath)) {
IO.fromFuture(IO.delay(Directory.walk(absPath).filter(Files.isRegularFile(_)).runFold(0L)(_ + Files.size(_))))
Expand All @@ -279,8 +310,7 @@ object Storages {
files.traverse(f =>
EitherT(validateFile.forCopyWithinProtectedDir(f.sourceBucket, destBucket, f.source, f.destination))
)
copyBetween =
validated.map(v => CopyBetween(Fs2Path.fromNioPath(v.absSourcePath), Fs2Path.fromNioPath(v.absDestPath)))
copyBetween = validated.map(v => CopyBetween.mk(v.absSourcePath, v.absDestPath))
_ <- EitherT.right[Rejection](copyFiles.copyAll(copyBetween))
} yield files.zip(validated).map { case (raw, valid) =>
CopyFileOutput(raw.source, raw.destination, valid.absSourcePath, valid.absDestPath)
Expand All @@ -293,7 +323,10 @@ object Storages {
val absPath = filePath(config, name, path)
if (Files.isRegularFile(absPath)) Right(fileSource(absPath) -> Some(absPath.getFileName.toString))
else if (Files.isDirectory(absPath)) Right(folderSource(absPath) -> None)
else Left(PathNotFound(name, path))
else {
logUnsafe(s"Invalid absolute path $absPath for bucket $name and relative path $path")
Left(PathNotFound(name, path))
}
}

def getAttributes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ object AppConfig {
extraPrefixes: List[Path],
protectedDirectory: Path,
fixerEnabled: Boolean,
fixerCommand: Vector[String]
fixerCommand: Vector[String],
linkWithAtomicMove: Option[Boolean]
)

/**
Expand Down
Loading