Skip to content

Commit

Permalink
Merge pull request #800 from eikek/file-backends
Browse files Browse the repository at this point in the history
File backends
  • Loading branch information
mergify[bot] authored Jun 25, 2022
2 parents e7b32cb + 222ec76 commit 219d0d2
Show file tree
Hide file tree
Showing 42 changed files with 3,160 additions and 204 deletions.
16 changes: 15 additions & 1 deletion modules/backend/src/main/scala/sharry/backend/BackendApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import cats.effect._
import sharry.backend.account._
import sharry.backend.alias.OAlias
import sharry.backend.auth.Login
import sharry.backend.config.Config
import sharry.backend.files.OFiles
import sharry.backend.job.PeriodicCleanup
import sharry.backend.mail.OMail
import sharry.backend.share.OShare
Expand All @@ -28,6 +30,8 @@ trait BackendApp[F[_]] {
def share: OShare[F]

def mail: OMail[F]

def files: OFiles[F]
}

object BackendApp {
Expand All @@ -40,21 +44,31 @@ object BackendApp {
aliasImpl <- OAlias[F](store)
shareImpl <- OShare[F](store, cfg.share)
mailImpl <- OMail[F](store, cfg.mail, JavaMailEmil[F]())
filesImpl <- Resource.pure(OFiles[F](store, cfg.files))
} yield new BackendApp[F] {
val login: Login[F] = loginImpl
val signup: OSignup[F] = signupImpl
val account: OAccount[F] = accountImpl
val alias: OAlias[F] = aliasImpl
val share: OShare[F] = shareImpl
val mail: OMail[F] = mailImpl
val files: OFiles[F] = filesImpl
}

def apply[F[_]: Async](
cfg: Config,
connectEC: ExecutionContext
): Resource[F, BackendApp[F]] =
for {
store <- Store.create(cfg.jdbc, cfg.share.chunkSize, connectEC, true)
store <- Store.create(
cfg.jdbc,
cfg.share.chunkSize,
cfg.computeChecksum,
cfg.files.defaultStoreConfig,
connectEC,
true
)

backend <- create(cfg, store)
_ <-
PeriodicCleanup.resource(cfg.cleanup, cfg.signup, backend.share, backend.signup)
Expand Down
19 changes: 0 additions & 19 deletions modules/backend/src/main/scala/sharry/backend/Config.scala

This file was deleted.

27 changes: 27 additions & 0 deletions modules/backend/src/main/scala/sharry/backend/config/Config.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package sharry.backend.config

import cats.data.ValidatedNec
import cats.syntax.all._

import sharry.backend.auth.AuthConfig
import sharry.backend.job.CleanupConfig
import sharry.backend.mail.MailConfig
import sharry.backend.share.ShareConfig
import sharry.backend.signup.SignupConfig
import sharry.store.{ComputeChecksumConfig, JdbcConfig}

case class Config(
jdbc: JdbcConfig,
signup: SignupConfig,
auth: AuthConfig,
share: ShareConfig,
cleanup: CleanupConfig,
mail: MailConfig,
files: FilesConfig,
computeChecksum: ComputeChecksumConfig
) {

def validate: ValidatedNec[String, Config] =
(files.validate, computeChecksum.validate)
.mapN((fc, cc) => copy(files = fc, computeChecksum = cc))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package sharry.backend.config

import cats.data.{Validated, ValidatedNec}

import sharry.common.Ident

case class CopyFilesConfig(
enable: Boolean,
source: Ident,
target: Ident,
parallel: Int
) {

def validate: ValidatedNec[String, Unit] =
if (source == target) Validated.invalidNec("Source and target must not be the same")
else Validated.validNec(())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package sharry.backend.config

import cats.data.{Validated, ValidatedNec}
import cats.syntax.all._

import sharry.common.Ident
import sharry.store.FileStoreConfig

case class FilesConfig(
defaultStore: Ident,
stores: Map[Ident, FileStoreConfig],
copyFiles: CopyFilesConfig
) {

val enabledStores: Map[Ident, FileStoreConfig] =
stores.view.filter(_._2.enabled).toMap

def defaultStoreConfig: FileStoreConfig =
enabledStores.getOrElse(
defaultStore,
sys.error(s"Store '${defaultStore.id}' not found. Is it enabled?")
)

def validate: ValidatedNec[String, FilesConfig] = {
val storesEmpty =
if (enabledStores.isEmpty)
Validated.invalidNec(
"No file stores defined! Make sure at least one enabled store is present."
)
else Validated.validNec(())

val defaultStorePresent =
enabledStores.get(defaultStore) match {
case Some(_) => Validated.validNec(())
case None =>
Validated.invalidNec(s"Default file store not present: ${defaultStore}")
}

val validCopyStores =
if (!copyFiles.enable) Validated.validNec(())
else {
val exist = enabledStores.contains(copyFiles.source) &&
enabledStores.contains(copyFiles.target)
if (exist) Validated.validNec(())
else
Validated.invalidNec(
s"The source or target name for the copy-files section doesn't exist in the list of enabled file stores."
)
}

(storesEmpty |+| defaultStorePresent |+| validCopyStores |+| copyFiles.validate)
.map(_ => this)
}
}
77 changes: 77 additions & 0 deletions modules/backend/src/main/scala/sharry/backend/files/OFiles.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package sharry.backend.files

import cats.data.OptionT
import cats.effect._
import cats.syntax.all._

import sharry.backend.config.FilesConfig
import sharry.common.Ident
import sharry.store.{FileStoreConfig, Store}

import binny.{AttributeName, CopyTool}

trait OFiles[F[_]] {

def computeBackgroundChecksum: Resource[F, F[Outcome[F, Throwable, Unit]]]

def copyFiles(source: FileStoreConfig, target: FileStoreConfig): F[Int]

def copyFiles(source: Ident, target: Ident): F[Int]
}

object OFiles {

def apply[F[_]: Async](
store: Store[F],
fileConfig: FilesConfig
): OFiles[F] =
new OFiles[F] {
private[this] val logger = sharry.logging.getLogger[F]

def computeBackgroundChecksum: Resource[F, F[Outcome[F, Throwable, Unit]]] =
Async[F].background(
store.fileStore.computeAttributes
.consumeAll(AttributeName.all)
.evalMap(store.fileStore.updateChecksum)
.compile
.drain
)

def copyFiles(source: Ident, target: Ident): F[Int] =
(for {
src <- OptionT.fromOption[F](fileConfig.enabledStores.get(source))
trg <- OptionT.fromOption[F](fileConfig.enabledStores.get(target))
r <- OptionT.liftF(copyFiles(src, trg))
} yield r).getOrElseF(
Sync[F].raiseError(
new IllegalArgumentException(
s"Source or target store not found for keys: ${source.id} and ${target.id}"
)
)
)

def copyFiles(source: FileStoreConfig, target: FileStoreConfig): F[Int] = {
val src = store.fileStore.createBinaryStore(source)
val trg = store.fileStore.createBinaryStore(target)
val binnyLogger = FileStoreConfig.SharryLogger(logger)

logger.info(s"Starting to copy $source -> $target") *>
CopyTool
.copyAll(
binnyLogger,
src,
trg,
store.fileStore.chunkSize,
fileConfig.copyFiles.parallel
)
.flatTap { r =>
logger.info(
s"Copied ${r.success} files, ${r.exist} existed already and ${r.notFound} were not found."
) *> (if (r.failed.nonEmpty)
logger.warn(s"Failed to copy these files: ${r.failed}")
else ().pure[F])
}
.map(_.success)
}
}
}
29 changes: 3 additions & 26 deletions modules/backend/src/main/scala/sharry/backend/share/Queries.scala
Original file line number Diff line number Diff line change
Expand Up @@ -346,39 +346,16 @@ object Queries {
}

def deleteFile[F[_]: Async](store: Store[F])(fileMetaId: Ident) = {
def deleteChunk(fid: Ident, chunk: Int): F[Int] =
store
.transact(
Sql
.deleteFrom(
FileChunkCols.table,
Sql.and(FileChunkCols.fileId.is(fid), FileChunkCols.chunkNr.is(chunk))
)
.update
.run
)

// When deleting large files, doing it in one transaction may blow
// memory. It is not important to be all-or-nothing, so here each
// chunk is deleted in one tx. This is slow, of course, but can be
// moved to a background thread. The cleanup job also detects
// orphaned files and removes them.
def deleteFileData(fid: Ident): F[Unit] =
Stream
.iterate(0)(_ + 1)
.covary[F]
.evalMap(n => deleteChunk(fid, n))
.takeWhile(_ > 0)
.compile
.drain
val deleteFileData =
store.fileStore.delete(fileMetaId)

def deleteFileMeta(fid: Ident): F[Int] =
store.transact(for {
a <- RShareFile.deleteByFileId(fid)
c <- Sql.deleteFrom(RFileMeta.table, RFileMeta.Columns.id.is(fid)).update.run
} yield a + c)

deleteFileData(fileMetaId) *> deleteFileMeta(fileMetaId)
deleteFileData *> deleteFileMeta(fileMetaId)
}

def deleteShare[F[_]: Async](share: Ident, background: Boolean)(
Expand Down
4 changes: 3 additions & 1 deletion modules/common/src/main/scala/sharry/common/Banner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ case class Banner(
gitHash: Option[String],
jdbcUrl: LenientUri,
configFile: Option[String],
baseUrl: LenientUri
baseUrl: LenientUri,
fileStoreConfig: String
) {

private val banner =
Expand All @@ -23,6 +24,7 @@ case class Banner(
s"Base-Url: ${baseUrl.asString}",
s"Database: ${jdbcUrl.asString}",
s"Config: ${configFile.getOrElse("")}",
s"FileRepo: $fileStoreConfig",
""
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
package sharry.common.pureconfig
package sharry.common.config

import java.nio.file.{Path => JPath}

import scala.reflect.ClassTag

import fs2.io.file.Path

import sharry.common._

import _root_.pureconfig._
import _root_.pureconfig.error.{CannotConvert, FailureReason}
import pureconfig._
import pureconfig.configurable.genericMapReader
import pureconfig.error.{CannotConvert, FailureReason}
import scodec.bits.ByteVector

object Implicits {
trait Implicits {
implicit val pathReader: ConfigReader[Path] =
ConfigReader[JPath].map(Path.fromNioPath)

implicit val lenientUriReader: ConfigReader[LenientUri] =
ConfigReader[String].emap(reason(LenientUri.parse))

Expand All @@ -21,6 +29,9 @@ object Implicits {
implicit val identReader: ConfigReader[Ident] =
ConfigReader[String].emap(reason(Ident.fromString))

implicit def identMapReader[B: ConfigReader]: ConfigReader[Map[Ident, B]] =
genericMapReader[Ident, B](reason(Ident.fromString))

implicit val byteVectorReader: ConfigReader[ByteVector] =
ConfigReader[String].emap(reason { str =>
if (str.startsWith("hex:"))
Expand All @@ -33,6 +44,9 @@ object Implicits {
implicit val byteSizeReader: ConfigReader[ByteSize] =
ConfigReader[String].emap(reason(ByteSize.parse))

implicit val signupModeReader: ConfigReader[SignupMode] =
ConfigReader[String].emap(reason(SignupMode.fromString))

def reason[A: ClassTag](
f: String => Either[String, A]
): String => Either[FailureReason, A] =
Expand All @@ -41,3 +55,5 @@ object Implicits {
CannotConvert(in, implicitly[ClassTag[A]].runtimeClass.toString, str)
)
}

object Implicits extends Implicits
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ object ScribeConfigure {
unsafeConfigure(sharryLogger, cfg)
unsafeConfigure(scribe.Logger("org.flywaydb"), cfg)
unsafeConfigure(scribe.Logger("binny"), cfg)
unsafeConfigure(scribe.Logger("org.http4s"), cfg)
// unsafeConfigure(scribe.Logger("org.http4s"), cfg)
}

def getRootMinimumLevel: Level =
Expand Down
Loading

0 comments on commit 219d0d2

Please sign in to comment.