diff --git a/MIGRATIONS.unreleased.md b/MIGRATIONS.unreleased.md index 2982f22ad23..6f0aa5e7235 100644 --- a/MIGRATIONS.unreleased.md +++ b/MIGRATIONS.unreleased.md @@ -18,3 +18,4 @@ User-facing changes are documented in the [changelog](CHANGELOG.released.md). - Example command for the migration: `PG_PASSWORD=myPassword python main.py --src localhost:7500 --dst localhost:7155 --num_threads 20 --postgres webknossos@localhost:5430/webknossos` ### Postgres Evolutions: +- [126-credit-transactions.sql](conf/evolutions/126-credit-transactions.sql) diff --git a/app/controllers/CreditTransactionController.scala b/app/controllers/CreditTransactionController.scala new file mode 100755 index 00000000000..53aff2859fe --- /dev/null +++ b/app/controllers/CreditTransactionController.scala @@ -0,0 +1,77 @@ +package controllers + +import com.scalableminds.util.objectid.ObjectId +import com.scalableminds.util.time.Instant +import com.scalableminds.util.tools.{Fox, FoxImplicits} +import models.organization.{ + CreditTransaction, + CreditTransactionDAO, + CreditTransactionService, + CreditTransactionState, + OrganizationService +} +import models.user.UserService +import net.liftweb.common.Box.tryo +import play.api.mvc.{Action, AnyContent} +import play.silhouette.api.Silhouette +import security.WkEnv + +import javax.inject.Inject +import scala.concurrent.ExecutionContext + +class CreditTransactionController @Inject()(organizationService: OrganizationService, + creditTransactionService: CreditTransactionService, + creditTransactionDAO: CreditTransactionDAO, + userService: UserService, + sil: Silhouette[WkEnv])(implicit ec: ExecutionContext) + extends Controller + with FoxImplicits { + + def chargeUpCredits(organizationId: String, + creditAmount: Int, + moneySpent: String, + comment: Option[String], + expiresAt: Option[String]): Action[AnyContent] = sil.SecuredAction.async { implicit request => + for { + _ <- userService.assertIsSuperUser(request.identity) ?~> "Only super users can charge up credits" + moneySpentInDecimal <- tryo(BigDecimal(moneySpent)) ?~> s"moneySpent $moneySpent is not a valid decimal" + _ <- bool2Fox(moneySpentInDecimal > 0 || expiresAt.nonEmpty) ?~> "moneySpent must be a positive number" + _ <- bool2Fox(creditAmount > 0) ?~> "creditAmount must be a positive number" + commentNoOptional = comment.getOrElse(s"Charge up for $creditAmount credits for $moneySpent Euro.") + _ <- organizationService.ensureOrganizationHasPaidPlan(organizationId) + expirationDateOpt <- Fox.runOptional(expiresAt)(Instant.fromString) + chargeUpTransaction = CreditTransaction( + ObjectId.generate, + organizationId, + BigDecimal(creditAmount), + BigDecimal(0), // Charge up transactions are not refundable per default and do not need a marker on how much refundable credits are left. + None, + Some(moneySpentInDecimal), + commentNoOptional, + None, + CreditTransactionState.Completed, + expirationDateOpt + ) + _ <- creditTransactionService.doCreditTransaction(chargeUpTransaction) + } yield Ok + } + + def refundCreditTransaction(organizationId: String, transactionId: String): Action[AnyContent] = + sil.SecuredAction.async { implicit request => + for { + _ <- userService.assertIsSuperUser(request.identity) ?~> "Only super users can manually refund credits" + transaction <- creditTransactionDAO.findOne(transactionId) + _ <- bool2Fox(transaction._organization == organizationId) ?~> "Transaction is not for this organization" + _ <- organizationService.ensureOrganizationHasPaidPlan(organizationId) + _ <- creditTransactionDAO.refundTransaction(transaction._id) + } yield Ok + } + + def revokeExpiredCredits(): Action[AnyContent] = sil.SecuredAction.async { implicit request => + for { + _ <- userService.assertIsSuperUser(request.identity) ?~> "Only super users can manually revoke expired credits" + _ <- creditTransactionService.revokeExpiredCredits() + } yield Ok + } + +} diff --git a/app/controllers/JobController.scala b/app/controllers/JobController.scala index 20f1462076e..40e9add5ecf 100644 --- a/app/controllers/JobController.scala +++ b/app/controllers/JobController.scala @@ -2,12 +2,12 @@ package controllers import play.silhouette.api.Silhouette import com.scalableminds.util.geometry.{BoundingBox, Vec3Double, Vec3Int} -import com.scalableminds.util.accesscontext.GlobalAccessContext +import com.scalableminds.util.accesscontext.{DBAccessContext, GlobalAccessContext} import com.scalableminds.util.tools.Fox import models.dataset.{DataStoreDAO, DatasetDAO, DatasetLayerAdditionalAxesDAO, DatasetService} -import models.job._ -import models.organization.OrganizationDAO -import models.user.MultiUserDAO +import models.job.{JobCommand, _} +import models.organization.{CreditTransactionService, OrganizationDAO, OrganizationService} +import models.user.{MultiUserDAO, User} import play.api.i18n.Messages import play.api.libs.json._ import play.api.mvc.{Action, AnyContent, PlayBodyParsers} @@ -24,7 +24,9 @@ import com.scalableminds.webknossos.datastore.models.{LengthUnit, VoxelSize} import com.scalableminds.webknossos.datastore.dataformats.zarr.Zarr3OutputHelper import com.scalableminds.webknossos.datastore.datareaders.{AxisOrder, FullAxisOrder, NDBoundingBox} import com.scalableminds.webknossos.datastore.models.AdditionalCoordinate +import models.job.JobCommand.JobCommand import models.team.PricingPlan +import net.liftweb.common.Full object MovieResolutionSetting extends ExtendedEnumeration { val SD, HD = Value @@ -64,6 +66,8 @@ class JobController @Inject()( wkSilhouetteEnvironment: WkSilhouetteEnvironment, slackNotificationService: SlackNotificationService, organizationDAO: OrganizationDAO, + organizationService: OrganizationService, + creditTransactionService: CreditTransactionService, dataStoreDAO: DataStoreDAO)(implicit ec: ExecutionContext, playBodyParsers: PlayBodyParsers) extends Controller with Zarr3OutputHelper { @@ -85,6 +89,7 @@ class JobController @Inject()( for { _ <- bool2Fox(wkconf.Features.jobsEnabled) ?~> "job.disabled" jobs <- jobDAO.findAll + // TODO: Consider adding paid credits to job public writes. jobsJsonList <- Fox.serialCombined(jobs.sortBy(_.created).reverse)(jobService.publicWrites) } yield Ok(Json.toJson(jobsJsonList)) } @@ -243,9 +248,11 @@ class JobController @Inject()( _ <- datasetService.assertValidDatasetName(newDatasetName) _ <- datasetService.assertValidLayerNameLax(layerName) multiUser <- multiUserDAO.findOne(request.identity._multiUser) - _ <- Fox.runIf(!multiUser.isSuperUser)(jobService.assertBoundingBoxLimits(bbox, None)) annotationIdParsed <- Fox.runIf(doSplitMergerEvaluation)(annotationId.toFox) ?~> "job.inferNeurons.annotationIdEvalParamsMissing" command = JobCommand.infer_neurons + parsedBoundingBox <- jobService.parseBoundingBoxWithMagOpt(bbox, None) + // TODO: Disable this check. Credits should be enough to guard this. + _ <- Fox.runIf(!multiUser.isSuperUser)(jobService.assertBoundingBoxLimits(bbox, None)) commandArgs = Json.obj( "organization_id" -> organization._id, "dataset_name" -> dataset.name, @@ -260,9 +267,14 @@ class JobController @Inject()( "eval_sparse_tube_threshold_nm" -> evalSparseTubeThresholdNm, "eval_min_merger_path_length_nm" -> evalMinMergerPathLengthNm, ) - job <- jobService.submitJob(command, commandArgs, request.identity, dataset._dataStore) ?~> "job.couldNotRunNeuronInferral" - js <- jobService.publicWrites(job) - } yield Ok(js) + creditTransactionComment = s"Run for AI neuron segmentation for dataset ${dataset.name}" + jobAsJs <- runPaidJob(command, + commandArgs, + parsedBoundingBox, + creditTransactionComment, + request.identity, + dataset._dataStore) + } yield Ok(jobAsJs) } } @@ -282,9 +294,11 @@ class JobController @Inject()( _ <- datasetService.assertValidDatasetName(newDatasetName) _ <- datasetService.assertValidLayerNameLax(layerName) multiUser <- multiUserDAO.findOne(request.identity._multiUser) + command = JobCommand.infer_mitochondria + parsedBoundingBox <- jobService.parseBoundingBoxWithMagOpt(bbox, None) + // TODO: Disable this check. Credits should be enough to guard this. _ <- bool2Fox(multiUser.isSuperUser) ?~> "job.inferMitochondria.notAllowed.onlySuperUsers" _ <- Fox.runIf(!multiUser.isSuperUser)(jobService.assertBoundingBoxLimits(bbox, None)) - command = JobCommand.infer_mitochondria commandArgs = Json.obj( "organization_id" -> dataset._organization, "dataset_name" -> dataset.name, @@ -293,9 +307,14 @@ class JobController @Inject()( "layer_name" -> layerName, "bbox" -> bbox, ) - job <- jobService.submitJob(command, commandArgs, request.identity, dataset._dataStore) ?~> "job.couldNotRunInferMitochondria" - js <- jobService.publicWrites(job) - } yield Ok(js) + creditTransactionComment = s"Run for AI mitochondria segmentation for dataset ${dataset.name}" + jobAsJs <- runPaidJob(command, + commandArgs, + parsedBoundingBox, + creditTransactionComment, + request.identity, + dataset._dataStore) + } yield Ok(jobAsJs) } } @@ -314,6 +333,10 @@ class JobController @Inject()( _ <- bool2Fox(request.identity._organization == organization._id) ?~> "job.alignSections.notAllowed.organization" ~> FORBIDDEN _ <- datasetService.assertValidDatasetName(newDatasetName) _ <- datasetService.assertValidLayerNameLax(layerName) + datasetBoundingBox <- datasetService + .dataSourceFor(dataset) + .flatMap(_.toUsable) + .map(_.boundingBox) ?~> "dataset.boundingBox.unset" _ <- Fox.runOptional(annotationId)(ObjectId.fromString) command = JobCommand.align_sections commandArgs = Json.obj( @@ -324,9 +347,14 @@ class JobController @Inject()( "layer_name" -> layerName, "annotation_id" -> annotationId ) - job <- jobService.submitJob(command, commandArgs, request.identity, dataset._dataStore) ?~> "job.couldNotRunAlignSections" - js <- jobService.publicWrites(job) - } yield Ok(js) + creditTransactionComment = s"Run AI neuron segmentation for dataset ${dataset.name}" + jobAsJs <- runPaidJob(command, + commandArgs, + datasetBoundingBox, + creditTransactionComment, + request.identity, + dataset._dataStore) + } yield Ok(jobAsJs) } } @@ -468,10 +496,10 @@ class JobController @Inject()( _ <- bool2Fox(request.identity._organization == organization._id) ?~> "job.renderAnimation.notAllowed.organization" ~> FORBIDDEN userOrganization <- organizationDAO.findOne(request.identity._organization) animationJobOptions = request.body - _ <- Fox.runIf(userOrganization.pricingPlan == PricingPlan.Basic) { + _ <- Fox.runIf(PricingPlan.isPaidPlan(userOrganization.pricingPlan)) { bool2Fox(animationJobOptions.includeWatermark) ?~> "job.renderAnimation.mustIncludeWatermark" } - _ <- Fox.runIf(userOrganization.pricingPlan == PricingPlan.Basic) { + _ <- Fox.runIf(PricingPlan.isPaidPlan(userOrganization.pricingPlan)) { bool2Fox(animationJobOptions.movieResolution == MovieResolutionSetting.SD) ?~> "job.renderAnimation.resolutionMustBeSD" } layerName = animationJobOptions.layerName @@ -511,4 +539,46 @@ class JobController @Inject()( } yield Redirect(uri, Map(("token", Seq(userAuthToken.id)))) } + def getJobCosts(command: String, boundingBoxInMag: String): Action[AnyContent] = + sil.SecuredAction.async { implicit request => + for { + boundingBox <- BoundingBox.fromLiteral(boundingBoxInMag).toFox + jobCommand <- JobCommand.fromString(command).toFox + jobCosts = jobService.calculateJobCosts(boundingBox, jobCommand) + js = Json.obj( + "costsInCredits" -> jobCosts.toString(), + ) + } yield Ok(js) + } + + private def runPaidJob(command: JobCommand, + commandArgs: JsObject, + jobBoundingBox: BoundingBox, + creditTransactionComment: String, + user: User, + datastoreName: String)(implicit ctx: DBAccessContext): Fox[JsObject] = { + val costsInCredits = jobService.calculateJobCosts(jobBoundingBox, command) + for { + _ <- organizationService.ensureOrganizationHasPaidPlan(user._organization) ?~> "job.paidJob.notAllowed.noPaidPlan" + _ <- Fox.assertTrue(creditTransactionService.hasEnoughCredits(user._organization, costsInCredits)) ?~> "job.notEnoughCredits" + creditTransaction <- creditTransactionService.reserveCredits(user._organization, + costsInCredits, + creditTransactionComment, + None) + job <- jobService + .submitJob(command, commandArgs, user, datastoreName) + .futureBox + .flatMap { + case Full(job) => Fox.successful(job) + case _ => + creditTransactionService.refundTransactionWhenStartingJobFailed(creditTransaction) + Fox.failure("job.couldNotRunAlignSections") + + } + .toFox + _ <- creditTransactionService.addJobIdToTransaction(creditTransaction, job._id) + js <- jobService.publicWrites(job) + } yield js + } + } diff --git a/app/controllers/OrganizationController.scala b/app/controllers/OrganizationController.scala index ac987757fd3..203a85d10ed 100755 --- a/app/controllers/OrganizationController.scala +++ b/app/controllers/OrganizationController.scala @@ -88,7 +88,7 @@ class OrganizationController @Inject()( for { allOrgs <- organizationDAO.findAll(GlobalAccessContext) ?~> "organization.list.failed" org <- allOrgs.headOption.toFox ?~> "organization.list.failed" - js <- organizationService.publicWrites(org) + js <- organizationService.publicWrites(org)(GlobalAccessContext) } yield { if (allOrgs.length > 1) // Cannot list organizations publicly if there are multiple ones, due to privacy reasons Ok(JsNull) @@ -252,6 +252,24 @@ class OrganizationController @Inject()( } yield Ok } + def sendOrderCreditsEmail(requestedCredits: Int): Action[AnyContent] = + sil.SecuredAction.async { implicit request => + for { + _ <- bool2Fox(requestedCredits > 0) ?~> Messages("organization.creditOrder.notPositive") + _ <- bool2Fox(request.identity.isOrganizationOwner) ?~> Messages("organization.creditOrder.notAuthorized") + organization <- organizationDAO.findOne(request.identity._organization) ?~> Messages("organization.notFound") ~> NOT_FOUND + userEmail <- userService.emailFor(request.identity) + _ = logger.info( + s"Received credit order for organization ${organization.name} with $requestedCredits credits by user $userEmail") + _ = Mailer ! Send(defaultMails.orderCreditsMail(request.identity, userEmail, requestedCredits)) + _ = Mailer ! Send( + defaultMails.orderCreditsRequestMail(request.identity, + userEmail, + organization.name, + s"Purchase $requestedCredits WEBKNOSSOS credits.")) + } yield Ok + } + def pricingStatus: Action[AnyContent] = sil.SecuredAction.async { implicit request => for { diff --git a/app/controllers/WKRemoteWorkerController.scala b/app/controllers/WKRemoteWorkerController.scala index b9edd6bce67..c96cc3467ba 100644 --- a/app/controllers/WKRemoteWorkerController.scala +++ b/app/controllers/WKRemoteWorkerController.scala @@ -9,6 +9,7 @@ import models.job.JobCommand.JobCommand import javax.inject.Inject import models.job._ +import models.organization.CreditTransactionService import models.voxelytics.VoxelyticsDAO import net.liftweb.common.{Empty, Failure, Full} import play.api.libs.json.Json @@ -20,6 +21,7 @@ import scala.concurrent.ExecutionContext class WKRemoteWorkerController @Inject()(jobDAO: JobDAO, jobService: JobService, workerDAO: WorkerDAO, + creditTransactionService: CreditTransactionService, voxelyticsDAO: VoxelyticsDAO, aiInferenceDAO: AiInferenceDAO, datasetDAO: DatasetDAO, @@ -82,6 +84,12 @@ class WKRemoteWorkerController @Inject()(jobDAO: JobDAO, jobAfterChange <- jobDAO.findOne(jobIdParsed)(GlobalAccessContext) _ = jobService.trackStatusChange(jobBeforeChange, jobAfterChange) _ <- jobService.cleanUpIfFailed(jobAfterChange) + _ <- Fox.runIf(request.body.state == JobState.SUCCESS) { + creditTransactionService.completeTransactionOfJob(jobAfterChange._id)(GlobalAccessContext) + } + _ <- Fox.runIf(request.body.state == JobState.FAILURE || request.body.state == JobState.CANCELLED) { + creditTransactionService.refundTransactionForJob(jobAfterChange._id)(GlobalAccessContext) + } } yield Ok } diff --git a/app/mail/DefaultMails.scala b/app/mail/DefaultMails.scala index 59276c8936f..faf67465153 100755 --- a/app/mail/DefaultMails.scala +++ b/app/mail/DefaultMails.scala @@ -153,6 +153,23 @@ class DefaultMails @Inject()(conf: WkConf) { recipients = List("hello@webknossos.org") ) + def orderCreditsMail(user: User, userEmail: String, requestedCredits: Int): Mail = + Mail( + from = defaultSender, + subject = "Request to order WEBKNOSSOS credits", + bodyHtml = html.mail.orderCredits(user.name, requestedCredits, additionalFooter).body, + recipients = List(userEmail) + ) + + def orderCreditsRequestMail(user: User, userEmail: String, organizationName: String, messageBody: String): Mail = + Mail( + from = defaultSender, + subject = "Request to buy WEBKNOSSOS credits", + bodyHtml = + html.mail.orderCreditsRequest(user.name, userEmail, organizationName, messageBody, additionalFooter).body, + recipients = List("hello@webknossos.org") + ) + def jobSuccessfulGenericMail(user: User, userEmail: String, datasetName: String, diff --git a/app/models/job/JobService.scala b/app/models/job/JobService.scala index 5384ac256f1..b2043af8fc7 100644 --- a/app/models/job/JobService.scala +++ b/app/models/job/JobService.scala @@ -219,11 +219,31 @@ class JobService @Inject()(wkConf: WkConf, def assertBoundingBoxLimits(boundingBox: String, mag: Option[String]): Fox[Unit] = for { - parsedBoundingBox <- BoundingBox.fromLiteral(boundingBox).toFox ?~> "job.invalidBoundingBox" - parsedMag <- Vec3Int.fromMagLiteral(mag.getOrElse("1-1-1"), allowScalar = true) ?~> "job.invalidMag" - boundingBoxInMag = parsedBoundingBox / parsedMag + boundingBoxInMag <- parseBoundingBoxWithMagOpt(boundingBox, mag) _ <- bool2Fox(boundingBoxInMag.volume <= wkConf.Features.exportTiffMaxVolumeMVx * 1024 * 1024) ?~> "job.volumeExceeded" _ <- bool2Fox(boundingBoxInMag.size.maxDim <= wkConf.Features.exportTiffMaxEdgeLengthVx) ?~> "job.edgeLengthExceeded" } yield () + private def getJobCostsPerGVx(jobCommand: JobCommand): BigDecimal = + jobCommand match { + case JobCommand.infer_neurons => wkConf.Features.neuronInferralCostsPerGVx + case JobCommand.infer_mitochondria => wkConf.Features.mitochondriaInferralCostsPerGVx + case JobCommand.align_sections => wkConf.Features.alignmentCostsPerGVx + case _ => throw new IllegalArgumentException(s"Unsupported job command $jobCommand") + } + + def calculateJobCosts(boundingBoxInTargetMag: BoundingBox, jobCommand: JobCommand): BigDecimal = { + val costsPerGVx = getJobCostsPerGVx(jobCommand) + val volumeInGVx = boundingBoxInTargetMag.volume / math.pow(10, 9) + val costs = BigDecimal(volumeInGVx) * costsPerGVx + // TODO: Make the decimal round up after 4 places behind comma. + if (costs < BigDecimal(0.001)) BigDecimal(0.001) else costs + } + + def parseBoundingBoxWithMagOpt(boundingBox: String, mag: Option[String]): Fox[BoundingBox] = + for { + parsedBoundingBox <- BoundingBox.fromLiteral(boundingBox).toFox ?~> "job.invalidBoundingBox" + parsedMag <- Vec3Int.fromMagLiteral(mag.getOrElse("1-1-1"), allowScalar = true) ?~> "job.invalidMag" + } yield parsedBoundingBox / parsedMag + } diff --git a/app/models/organization/CreditTransaction.scala b/app/models/organization/CreditTransaction.scala new file mode 100644 index 00000000000..4c07e2fbbb8 --- /dev/null +++ b/app/models/organization/CreditTransaction.scala @@ -0,0 +1,366 @@ +package models.organization + +import com.scalableminds.util.accesscontext.DBAccessContext +import com.scalableminds.util.objectid.ObjectId +import com.scalableminds.util.time.Instant +import com.scalableminds.util.tools.Fox +import com.scalableminds.webknossos.schema.Tables.OrganizationCreditTransactionsRow +import com.scalableminds.webknossos.schema.Tables.OrganizationCreditTransactions +import models.organization.CreditTransactionState.TransactionState +import slick.dbio.DBIO +import slick.jdbc.GetResult +import slick.jdbc.PostgresProfile.api._ +import slick.lifted.Rep +import telemetry.SlackNotificationService +import utils.WkConf +import utils.sql.{SQLDAO, SqlClient, SqlToken} + +import javax.inject.Inject +import scala.concurrent.ExecutionContext +import scala.util.{Failure, Success} + +case class CreditTransaction(_id: ObjectId = ObjectId.generate, + _organization: String, + creditChange: BigDecimal, + refundableCreditChange: BigDecimal, + refundedTransactionId: Option[ObjectId] = None, + spentMoney: Option[BigDecimal], + comment: String, + _paidJob: Option[ObjectId], + state: TransactionState, + expirationDate: Option[Instant], + createdAt: Instant = Instant.now, + updatedAt: Instant = Instant.now, + isDeleted: Boolean = false) + +class CreditTransactionDAO @Inject()(organizationDAO: OrganizationDAO, + conf: WkConf, + slackNotificationService: SlackNotificationService, + sqlClient: SqlClient)(implicit ec: ExecutionContext) + extends SQLDAO[CreditTransaction, OrganizationCreditTransactionsRow, OrganizationCreditTransactions](sqlClient) { + + protected val collection = OrganizationCreditTransactions + + protected def idColumn(x: OrganizationCreditTransactions): Rep[String] = x._Id + + override protected def isDeletedColumn(x: OrganizationCreditTransactions): Rep[Boolean] = x.isDeleted + + override protected def parse(row: OrganizationCreditTransactionsRow): Fox[CreditTransaction] = + for { + state <- CreditTransactionState.fromString(row.state).toFox + id <- ObjectId.fromString(row._Id) + jobIdOpt <- Fox.runOptional(row._PaidJob)(ObjectId.fromStringSync) + refundedTransactionIdOpt <- Fox.runOptional(row.refundedTransactionId)(ObjectId.fromStringSync) + } yield { + CreditTransaction( + id, + row._Organization, + row.creditChange, + row.refundableCreditChange, + refundedTransactionIdOpt, + row.spentMoney, + row.comment, + jobIdOpt, + state, + row.expirationDate.map(Instant.fromDate), + Instant.fromSql(row.createdAt), + Instant.fromSql(row.updatedAt), + row.isDeleted + ) + } + + implicit val getOrganizationCreditTransactions: GetResult[CreditTransaction] = GetResult { prs => + import prs._ + val transactionId = <<[ObjectId] + val organizationId = <<[String] + val creditChange = <<[BigDecimal] + val refundableCreditChange = <<[BigDecimal] + val refundedTransactionId = <[ObjectId] + val spentMoney = <[BigDecimal] + val comment = <<[String] + val paidJobId = <[ObjectId] + val stateOpt = CreditTransactionState.fromString(<<[String]) + val state = stateOpt.getOrElse(throw new RuntimeException(s"Unknown credit transaction state: $stateOpt")) + val expiresAt = <[Instant] + val createdAt = <<[Instant] + val updatedAt = <<[Instant] + val isDeleted = <<[Boolean] + CreditTransaction( + transactionId, + organizationId, + creditChange, + refundableCreditChange, + refundedTransactionId, + spentMoney, + comment, + paidJobId, + state, + expiresAt, + createdAt, + updatedAt, + isDeleted + ) + } + + override protected def readAccessQ(requestingUserId: ObjectId): SqlToken = + q"""(_id IN (SELECT _organization FROM webknossos.users_ WHERE _multiUser = (SELECT _multiUser FROM webknossos.users_ WHERE _id = $requestingUserId))) + OR TRUE in (SELECT isSuperUser FROM webknossos.multiUsers_ WHERE _id IN (SELECT _multiUser FROM webknossos.users_ WHERE _id = $requestingUserId))""" + + // Any user from an organization can update their credit transactions as for now all users can start paid jobs. + override protected def updateAccessQ(requestingUserId: ObjectId): SqlToken = readAccessQ(requestingUserId) + + override protected def anonymousReadAccessQ(sharingToken: Option[String]): SqlToken = q"FALSE" + + override def findAll(implicit ctx: DBAccessContext): Fox[List[CreditTransaction]] = + for { + accessQuery <- readAccessQuery + r <- run(q"SELECT $columns FROM $existingCollectionName WHERE $accessQuery".as[OrganizationCreditTransactionsRow]) + parsed <- parseAll(r) + } yield parsed + + def findOne(transactionId: String)(implicit ctx: DBAccessContext): Fox[CreditTransaction] = + for { + accessQuery <- readAccessQuery + r <- run( + q"SELECT $columns FROM $existingCollectionName WHERE _id = $transactionId AND $accessQuery" + .as[OrganizationCreditTransactionsRow]) + parsed <- parseFirst(r, transactionId) + } yield parsed + + def getCreditBalance(organizationId: String)(implicit ctx: DBAccessContext): Fox[BigDecimal] = + for { + accessQuery <- readAccessQuery + r <- run( + q"SELECT COALESCE(SUM(credit_change), 0) FROM $existingCollectionName WHERE _organization = $organizationId AND $accessQuery" + .as[BigDecimal]) + firstRow <- r.headOption + } yield firstRow + + def insertNewPendingTransaction(transaction: CreditTransaction)(implicit ctx: DBAccessContext): Fox[Unit] = + for { + _ <- readAccessQuery + _ <- run( + q"""INSERT INTO webknossos.organization_credit_transactions + (_id, _organization, credit_change, refundable_credit_change, spent_money, comment, _paid_job, + state, expiration_date, created_at, updated_at, is_deleted) + VALUES + (${transaction._id}, ${transaction._organization}, ${transaction.creditChange.toString()}::DECIMAL, + ${transaction.refundableCreditChange.toString()}::DECIMAL, ${transaction.spentMoney.map(_.toString)}::DECIMAL, + ${transaction.comment}, ${transaction._paidJob}, ${CreditTransactionState.Pending}, ${transaction.expirationDate}, + ${transaction.createdAt}, ${transaction.updatedAt}, ${transaction.isDeleted}) + """.asUpdate + ) + } yield () + + def addJobIdToTransaction(transaction: CreditTransaction, jobId: ObjectId)(implicit ctx: DBAccessContext): Fox[Unit] = + for { + _ <- assertUpdateAccess(transaction._id) + _ <- run( + q"""UPDATE webknossos.organization_credit_transactions + SET _paid_job = $jobId, updated_at = NOW() + WHERE _id = ${transaction._id} + """.asUpdate + ) + } yield () + + def insertTransaction(transaction: CreditTransaction)(implicit ctx: DBAccessContext): Fox[Unit] = + for { + _ <- readAccessQuery + _ <- run(q"""INSERT INTO webknossos.organization_credit_transactions + (_id, _organization, credit_change, refundable_credit_change, refunded_transaction_id, spent_money, comment, _paid_job, + state, expiration_date, created_at, updated_at, is_deleted) + VALUES + (${transaction._id}, ${transaction._organization}, ${transaction.creditChange.toString()}::DECIMAL, + ${transaction.refundableCreditChange.toString()}::DECIMAL, ${transaction.refundedTransactionId}, + ${transaction.spentMoney.map(_.toString)}::DECIMAL, ${transaction.comment}, ${transaction._paidJob}, + ${transaction.state}, ${transaction.expirationDate}, ${transaction.createdAt}, ${transaction.updatedAt}, + ${transaction.isDeleted}) + """.asUpdate) + + } yield () + + private def insertRevokingTransaction(transaction: CreditTransaction): DBIOAction[Int, NoStream, Effect] = { + assert(transaction.state == CreditTransactionState.Completed) + assert(transaction.refundableCreditChange == 0) + assert(transaction.spentMoney.isEmpty) + assert(transaction.creditChange < 0, "Revoking transactions must have a negative credit change") + assert(transaction.expirationDate.isEmpty) + q"""INSERT INTO webknossos.organization_credit_transactions + (_id, _organization, credit_change, refundable_credit_change, spent_money, comment, _paid_job, + state, expiration_date, created_at, updated_at, is_deleted) + VALUES + (${transaction._id}, ${transaction._organization}, ${transaction.creditChange.toString()}::DECIMAL, + ${transaction.refundableCreditChange.toString()}::DECIMAL, ${transaction.spentMoney.map(_.toString)}::DECIMAL, + ${transaction.comment}, ${transaction._paidJob}, ${transaction.state}, ${transaction.expirationDate}, + ${transaction.createdAt}, ${transaction.updatedAt}, ${transaction.isDeleted}) + """.asUpdate + } + + def commitTransaction(transactionId: ObjectId)(implicit ctx: DBAccessContext): Fox[Unit] = + for { + _ <- assertUpdateAccess(transactionId) + _ <- run( + q"""UPDATE webknossos.organization_credit_transactions + SET state = ${CreditTransactionState.Completed}, refundable_credit_change = 0::DECIMAL, updated_at = NOW() + WHERE _id = $transactionId AND state + """.asUpdate + ) + } yield () + + def refundTransaction(transactionId: ObjectId)(implicit ctx: DBAccessContext): Fox[Unit] = + for { + _ <- assertUpdateAccess(transactionId) + transactionToRefund <- findOne(transactionId) + refundComment = transactionToRefund._paidJob + .map(jobId => s"Refund for failed job $jobId.") + .getOrElse(s"Refund for transaction $transactionId.") + setToRefunded = q"""UPDATE webknossos.organization_credit_transactions + SET state = ${CreditTransactionState.Refunded}, updated_at = NOW() + WHERE _id = $transactionId AND state = ${CreditTransactionState.Pending} + AND credit_change < 0 + """.asUpdate + insertRefundTransaction = q""" + INSERT INTO webknossos.organization_credit_transactions + (_id, _organization, credit_change, refundable_credit_change, refunded_transaction_id, comment, _paid_job, state) + VALUES ( + ${ObjectId.generate}, + ${transactionToRefund._organization}, + ( + SELECT refundable_credit_change * -1 + FROM webknossos.organization_credit_transactions + WHERE _id = $transactionId + AND state = ${CreditTransactionState.Refunded} + AND credit_change < 0 + ), + 0::DECIMAL, + $transactionId, + $refundComment, + ${transactionToRefund._paidJob}, + ${CreditTransactionState.Completed} + ) + """.asUpdate + updatedRows <- run(DBIO.sequence(List(setToRefunded, insertRefundTransaction)).transactionally) + _ <- bool2Fox(updatedRows.forall(_ == 1)) ?~> s"Failed to refund transaction ${transactionToRefund._id} properly." + } yield () + + def findTransactionForJob(jobId: ObjectId)(implicit ctx: DBAccessContext): Fox[CreditTransaction] = + for { + accessQuery <- readAccessQuery + r <- run( + q"SELECT $columns FROM $existingCollectionName WHERE _paid_job = $jobId AND $accessQuery" + .as[OrganizationCreditTransactionsRow]) + parsed <- parseFirst(r, jobId) + } yield parsed + +// TODO: Maybe remove this outer while loop here. + private def findAllOrganizationsWithExpiredCredits: Fox[List[String]] = + for { + r <- run(q"""SELECT DISTINCT _organization + FROM webknossos.organization_credit_transactions + WHERE expiration_date <= NOW() + AND state = ${CreditTransactionState.Completed} + AND credit_change > 0""".as[String]) + } yield r.toList + + private def revokeExpiredCreditsForOrganization(organizationId: String): DBIO[List[CreditTransaction]] = { + // TODO: Make this a transaction to have either all credits revoked of the organization or none + logger.info(s"revokeExpiredCreditsForOrganization for organization $organizationId") + + for { + transactionsWithExpiredCredits <- q"""SELECT * + FROM webknossos.organization_credit_transactions + WHERE _organization = $organizationId + AND expiration_date <= NOW() + AND state = 'Completed' + AND credit_change > 0 + ORDER BY created_at DESC + """.as[CreditTransaction] + transactionsWhereRevokingFailed <- transactionsWithExpiredCredits.foldLeft( + DBIO.successful(List()): DBIO[List[CreditTransaction]]) { (previousTransactionRevocation, transaction) => + previousTransactionRevocation.flatMap { transactionsWhereRevokingFailed => + revokeExpiredCreditsTransaction(transaction).asTry.flatMap { + case Success(_) => DBIO.successful(transactionsWhereRevokingFailed) + case Failure(e) => + logger.error(s"Failed to revoke some expired credits for organization ${transaction._organization}", e) + DBIO.successful(transactionsWhereRevokingFailed :+ transaction) + case _ => DBIO.successful(transactionsWhereRevokingFailed) + } + } + } + } yield transactionsWhereRevokingFailed + } + + private def revokeExpiredCreditsTransaction(transaction: CreditTransaction): DBIO[Unit] = { + logger.info(s"revokeExpiredCreditsTransaction for transaction ${transaction._id}") + for { + // Query: Sums up all spent credits since the transaction which are completed and subtracts refunded transactions. + spentCreditsSinceTransactionNegResult <- q""" + SELECT COALESCE(SUM(credit_change), 0) + FROM webknossos.organization_credit_transactions + WHERE _organization = ${transaction._organization} + AND created_at > ${transaction.createdAt} + AND (credit_change < 0) + OR (credit_change > 0 AND refunded_transaction_id IS NOT NULL) -- Counts also revoked transactions + OR (credit_change > 0 AND expiration_date <= NOW()) -- Counts also expired transactions + """.as[BigDecimal] + + // Extract values from query results + spentCreditsSinceTransactionNegative = spentCreditsSinceTransactionNegResult.headOption.getOrElse(BigDecimal(0)) + spentCreditsSinceTransactionPositive = spentCreditsSinceTransactionNegative * -1 + freeCreditsAvailable = transaction.creditChange - spentCreditsSinceTransactionPositive + + _ <- if (freeCreditsAvailable <= 0) { + // Fully spent, update state to 'Spent' + q""" + UPDATE webknossos.organization_credit_transactions + SET state = ${CreditTransactionState.Spent}, updated_at = NOW() + WHERE _id = ${transaction._id} + """.asUpdate + } else { + val stateOfExpiredTransaction = if (freeCreditsAvailable == transaction.creditChange) { + CreditTransactionState.Revoked + } else { CreditTransactionState.PartiallyRevoked } + val revokingTransaction = CreditTransaction( + ObjectId.generate, + transaction._organization, + -freeCreditsAvailable, + 0, + None, + None, + s"Revoked expired credits for transaction ${transaction._id}", + None, + CreditTransactionState.Completed, + None, + ) + for { + _ <- q""" + UPDATE webknossos.organization_credit_transactions + SET state = $stateOfExpiredTransaction, updated_at = NOW() + WHERE _id = ${transaction._id} + """.asUpdate + _ <- insertRevokingTransaction(revokingTransaction) + } yield () + } + _ = logger.info(s"revokeExpiredCreditsTransaction for transaction ${transaction._id} finished") + } yield () + } + + def runRevokeExpiredCredits(): Fox[Unit] = + for { + orgas <- findAllOrganizationsWithExpiredCredits + failedTransactionsToRevoke <- Fox.foldLeft(orgas.iterator, List(): List[CreditTransaction]) { + case (failedTransactions, organizationId) => + run(revokeExpiredCreditsForOrganization(organizationId).transactionally).map(failedTransactions ++ _) + } + _ = if (failedTransactionsToRevoke.nonEmpty) { + val failedTransactions = failedTransactionsToRevoke.map(transaction => + s"Failed to revoke credits for transaction ${transaction._id} for organization ${transaction._organization}.") + slackNotificationService.warn("Failed to revoke some expired credits for organizations", + s"${failedTransactions.mkString("\n")}") + } + } yield () + + def handOutMonthlyFreeCredits(): Fox[Unit] = + run(q"SELECT webknossos.hand_out_monthly_free_credits(${conf.Jobs.monthlyFreeCredits}::DECIMAL)".as[Unit]).map(_ => + ()) +} diff --git a/app/models/organization/CreditTransactionService.scala b/app/models/organization/CreditTransactionService.scala new file mode 100644 index 00000000000..7c9da1578e3 --- /dev/null +++ b/app/models/organization/CreditTransactionService.scala @@ -0,0 +1,113 @@ +package models.organization + +import com.scalableminds.util.accesscontext.DBAccessContext +import com.scalableminds.util.objectid.ObjectId +import com.scalableminds.util.tools.{Fox, FoxImplicits} +import com.scalableminds.webknossos.datastore.helpers.IntervalScheduler +import com.typesafe.scalalogging.LazyLogging +import org.apache.pekko.actor.ActorSystem +import play.api.inject.ApplicationLifecycle +import play.api.libs.json.{JsObject, Json} + +import javax.inject.Inject +import scala.concurrent.ExecutionContext +import scala.concurrent.duration.{DurationInt, FiniteDuration} + +class CreditTransactionService @Inject()(creditTransactionDAO: CreditTransactionDAO, + organizationService: OrganizationService, + val lifecycle: ApplicationLifecycle, + val system: ActorSystem)(implicit val ec: ExecutionContext) + extends FoxImplicits + with LazyLogging + with IntervalScheduler { + + def hasEnoughCredits(organizationId: String, creditsToSpent: BigDecimal)( + implicit ctx: DBAccessContext): Fox[Boolean] = + creditTransactionDAO.getCreditBalance(organizationId).map(balance => balance >= creditsToSpent) + + def reserveCredits(organizationId: String, creditsToSpent: BigDecimal, comment: String, paidJob: Option[ObjectId])( + implicit ctx: DBAccessContext): Fox[CreditTransaction] = + for { + _ <- organizationService.ensureOrganizationHasPaidPlan(organizationId) + pendingCreditTransaction = CreditTransaction(ObjectId.generate, + organizationId, + -creditsToSpent, + creditsToSpent, + None, + None, + comment, + paidJob, + CreditTransactionState.Pending, + None) + _ <- creditTransactionDAO.insertNewPendingTransaction(pendingCreditTransaction) + insertedTransaction <- creditTransactionDAO.findOne(pendingCreditTransaction._id) + } yield insertedTransaction + + def doCreditTransaction(creditTransaction: CreditTransaction)(implicit ctx: DBAccessContext): Fox[Unit] = + for { + _ <- organizationService.ensureOrganizationHasPaidPlan(creditTransaction._organization) + _ <- creditTransactionDAO.insertTransaction(creditTransaction) + } yield () + + def completeTransactionOfJob(jobId: ObjectId)(implicit ctx: DBAccessContext): Fox[Unit] = + for { + transaction <- creditTransactionDAO.findTransactionForJob(jobId) + _ <- organizationService.ensureOrganizationHasPaidPlan(transaction._organization) + _ <- creditTransactionDAO.commitTransaction(transaction._id) + } yield () + + def refundTransactionForJob(jobId: ObjectId)(implicit ctx: DBAccessContext): Fox[Unit] = + for { + transaction <- creditTransactionDAO.findTransactionForJob(jobId) + _ <- refundTransaction(transaction) + } yield () + + private def refundTransaction(creditTransaction: CreditTransaction)(implicit ctx: DBAccessContext): Fox[Unit] = + for { + _ <- organizationService.ensureOrganizationHasPaidPlan(creditTransaction._organization) + _ <- creditTransactionDAO.refundTransaction(creditTransaction._id) + } yield () + + // This method is explicitly named this way to warn that this method should only be called when starting a job has failed. + // Else refunding should be done via jobId. + def refundTransactionWhenStartingJobFailed(creditTransaction: CreditTransaction)( + implicit ctx: DBAccessContext): Fox[Unit] = refundTransaction(creditTransaction) + + def addJobIdToTransaction(creditTransaction: CreditTransaction, jobId: ObjectId)( + implicit ctx: DBAccessContext): Fox[Unit] = + creditTransactionDAO.addJobIdToTransaction(creditTransaction, jobId) + + def publicWrites(transaction: CreditTransaction): Fox[JsObject] = + Fox.successful( + Json.obj( + "id" -> transaction._id, + "organization_id" -> transaction._organization, + "creditChange" -> transaction.creditChange, + "spentMoney" -> transaction.spentMoney, + "comment" -> transaction.comment, + "paidJobId" -> transaction._paidJob, + "state" -> transaction.state, + "expirationDate" -> transaction.expirationDate, + "createdAt" -> transaction.createdAt, + "updatedAt" -> transaction.updatedAt, + "isDeleted" -> transaction.isDeleted + )) + + def revokeExpiredCredits(): Fox[Unit] = creditTransactionDAO.runRevokeExpiredCredits() + def handOutMonthlyFreeCredits(): Fox[Unit] = creditTransactionDAO.handOutMonthlyFreeCredits() + + override protected def tickerInterval: FiniteDuration = 1 minute + + // TODO: make this class a singleton or put this somewhere else as this is executed for each instance of the service + override protected def tick(): Unit = + for { + _ <- Fox.successful(()) + _ = logger.info("Starting revoking expired credits...") + _ <- revokeExpiredCredits() + _ = logger.info("Finished revoking expired credits.") + _ = logger.info("Staring handing out free monthly credits.") + _ <- handOutMonthlyFreeCredits() + _ = logger.info("Finished handing out free monthly credits.") + } yield () + () +} diff --git a/app/models/organization/CreditTransactionState.scala b/app/models/organization/CreditTransactionState.scala new file mode 100644 index 00000000000..0ba3d282c89 --- /dev/null +++ b/app/models/organization/CreditTransactionState.scala @@ -0,0 +1,8 @@ +package models.organization + +import com.scalableminds.util.enumeration.ExtendedEnumeration + +object CreditTransactionState extends ExtendedEnumeration { + type TransactionState = Value + val Pending, Completed, Refunded, Revoked, PartiallyRevoked, Spent = Value +} diff --git a/app/models/organization/OrganizationService.scala b/app/models/organization/OrganizationService.scala index 4f26e7b87b3..e91baf12bf2 100644 --- a/app/models/organization/OrganizationService.scala +++ b/app/models/organization/OrganizationService.scala @@ -22,6 +22,7 @@ class OrganizationService @Inject()(organizationDAO: OrganizationDAO, multiUserDAO: MultiUserDAO, userDAO: UserDAO, teamDAO: TeamDAO, + creditTransactionDAO: CreditTransactionDAO, dataStoreDAO: DataStoreDAO, folderDAO: FolderDAO, folderService: FolderService, @@ -37,7 +38,8 @@ class OrganizationService @Inject()(organizationDAO: OrganizationDAO, "name" -> organization.name ) - def publicWrites(organization: Organization, requestingUser: Option[User] = None): Fox[JsObject] = { + def publicWrites(organization: Organization, requestingUser: Option[User] = None)( + implicit ctx: DBAccessContext): Fox[JsObject] = { val adminOnlyInfo = if (requestingUser.exists(_.isAdminOf(organization._id))) { Json.obj( @@ -49,6 +51,7 @@ class OrganizationService @Inject()(organizationDAO: OrganizationDAO, for { usedStorageBytes <- organizationDAO.getUsedStorage(organization._id) ownerBox <- userDAO.findOwnerByOrg(organization._id).futureBox + creditBalance <- creditTransactionDAO.getCreditBalance(organization._id) ownerNameOpt = ownerBox.toOption.map(o => s"${o.firstName} ${o.lastName}") } yield Json.obj( @@ -62,7 +65,8 @@ class OrganizationService @Inject()(organizationDAO: OrganizationDAO, "includedUsers" -> organization.includedUsers, "includedStorageBytes" -> organization.includedStorageBytes, "usedStorageBytes" -> usedStorageBytes, - "ownerName" -> ownerNameOpt + "ownerName" -> ownerNameOpt, + "creditBalance" -> creditBalance ) ++ adminOnlyInfo } @@ -176,4 +180,9 @@ class OrganizationService @Inject()(organizationDAO: OrganizationDAO, _ <- organizationDAO.acceptTermsOfService(organizationId, version, Instant.now) } yield () + def ensureOrganizationHasPaidPlan(organizationId: String): Fox[Unit] = + for { + organization <- organizationDAO.findOne(organizationId)(GlobalAccessContext) + _ <- bool2Fox(PricingPlan.isPaidPlan(organization.pricingPlan)) ?~> "creditTransaction.notPaidPlan" + } yield () } diff --git a/app/models/team/PricingPlan.scala b/app/models/team/PricingPlan.scala index e3d9d150f7e..4d066cf48ce 100644 --- a/app/models/team/PricingPlan.scala +++ b/app/models/team/PricingPlan.scala @@ -5,4 +5,6 @@ import com.scalableminds.util.enumeration.ExtendedEnumeration object PricingPlan extends ExtendedEnumeration { type PricingPlan = Value val Basic, Team, Power, Team_Trial, Power_Trial, Custom = Value + + def isPaidPlan(plan: PricingPlan): Boolean = plan != Basic } diff --git a/app/utils/WkConf.scala b/app/utils/WkConf.scala index c36c9abf5ed..09cff962bce 100644 --- a/app/utils/WkConf.scala +++ b/app/utils/WkConf.scala @@ -119,6 +119,10 @@ class WkConf @Inject()(configuration: Configuration, certificateValidationServic val isWkorgInstance: Boolean = get[Boolean]("features.isWkorgInstance") val jobsEnabled: Boolean = get[Boolean]("features.jobsEnabled") val voxelyticsEnabled: Boolean = get[Boolean]("features.voxelyticsEnabled") + val neuronInferralCostsPerGVx: BigDecimal = BigDecimal(get[String]("features.neuronInferralCostsPerGVx")) + val mitochondriaInferralCostsPerGVx: BigDecimal = BigDecimal( + get[String]("features.mitochondriaInferralCostsPerGVx")) + val alignmentCostsPerGVx: BigDecimal = BigDecimal(get[String]("features.alignmentCostsPerGVx")) val taskReopenAllowed: FiniteDuration = get[Int]("features.taskReopenAllowedInSeconds") seconds val allowDeleteDatasets: Boolean = get[Boolean]("features.allowDeleteDatasets") val publicDemoDatasetUrl: String = get[String]("features.publicDemoDatasetUrl") @@ -200,6 +204,7 @@ class WkConf @Inject()(configuration: Configuration, certificateValidationServic object Jobs { val workerLivenessTimeout: FiniteDuration = get[FiniteDuration]("jobs.workerLivenessTimeout") + val monthlyFreeCredits: Int = get[Int]("jobs.monthlyFreeCredits") } object Airbrake { diff --git a/app/utils/sql/SqlTypeImplicits.scala b/app/utils/sql/SqlTypeImplicits.scala index a77ec936d7b..51725091e78 100644 --- a/app/utils/sql/SqlTypeImplicits.scala +++ b/app/utils/sql/SqlTypeImplicits.scala @@ -16,6 +16,10 @@ trait SqlTypeImplicits { override def apply(v1: PositionedResult): ObjectId = ObjectId(v1.<<) } + implicit protected object GetObjectIdOpt extends GetResult[Option[ObjectId]] { + override def apply(v1: PositionedResult): Option[ObjectId] = v1.nextStringOption().map(ObjectId(_)) + } + implicit protected object GetInstant extends GetResult[Instant] { override def apply(v1: PositionedResult): Instant = Instant.fromSql(v1.<<) } diff --git a/app/views/mail/orderCredits.scala.html b/app/views/mail/orderCredits.scala.html new file mode 100644 index 00000000000..fb0b998ba67 --- /dev/null +++ b/app/views/mail/orderCredits.scala.html @@ -0,0 +1,10 @@ +@(name: String, requestedCredits: Int, additionalFooter: String) + +@emailBaseTemplate(additionalFooter) { +
Hi @{name}
+Thank you for requesting to buy more WEBKNOSSOS credits. Our sales team will be in contact with you shortly with a formal offer.
+ +Requested additional credits: @{requestedCredits}
+ +With best regards,
the WEBKNOSSOS team
Hi WEBKNOSSOS sales team
+There is a new request to purchase WEBKNOSSOS credits.
+User: @{name} (@{email})
+Organization: @{organizationName}
+Request: @{messageBody}
+ +With best regards,
WEBKNOSSOS
+ {/* React complaint here:
. */}
+
{powerPlanFeatures.map((feature) => (
-