Skip to content

Commit

Permalink
Make pipeline version per Organism (#3534)
Browse files Browse the repository at this point in the history
* Add stubs and TODOs

* WIP (things start up fine it seems; but I think version isn't upgraded correctly yet)

* make more changes

* Update schema documentation based on migration changes

* format

* minor changes

* Add initialization of the table for all organisms

* Update schema documentation based on migration changes

* version cannot be null anymore after previous change

* progress in fixing tests

* Update schema documentation based on migration changes

* remove todos

* 21

* format

* Fixed some tests

* Update schema documentation based on migration changes

* Move table init to flyway init

* Fixed one test

* make mock bean primary -- one failing test remaining

* fixed last test

* adress review

* Add some docs

* Some logic changes (still needs testing)

* Update schema documentation based on migration changes

* docs

* docs

* foo

* Add test

* Add another test

* Add some documentation

* Update values.yaml to test non-identical prepro versions

* Versions must be integers

* improve documentation

* clarify

* Simplify table definitions

* Make test description more descriptive and override linter error for that line

* Add missing and

* fix typo

* Update schema documentation based on migration changes

* left join -> join

* Update schema documentation based on migration changes

* maybe fix?

* Fix constraint issue

* ...

* Disable PSQL reset

* Bump ebola sudan version to 2

* Undo test changes

---------

Co-authored-by: GitHub Action <[email protected]>
Co-authored-by: Cornelius Roemer <[email protected]>
  • Loading branch information
3 people authored Feb 13, 2025
1 parent f582615 commit 7166188
Show file tree
Hide file tree
Showing 18 changed files with 433 additions and 123 deletions.
82 changes: 42 additions & 40 deletions backend/docs/db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ ALTER SEQUENCE public.audit_log_id_seq OWNED BY public.audit_log.id;

CREATE TABLE public.current_processing_pipeline (
version bigint NOT NULL,
started_using_at timestamp without time zone NOT NULL
started_using_at timestamp without time zone NOT NULL,
organism text NOT NULL
);


Expand All @@ -192,6 +193,28 @@ CREATE TABLE public.data_use_terms_table (

ALTER TABLE public.data_use_terms_table OWNER TO postgres;

--
-- Name: sequence_entries; Type: TABLE; Schema: public; Owner: postgres
--

CREATE TABLE public.sequence_entries (
accession text NOT NULL,
version bigint NOT NULL,
organism text NOT NULL,
submission_id text NOT NULL,
submitter text NOT NULL,
approver text,
group_id integer NOT NULL,
submitted_at timestamp without time zone NOT NULL,
released_at timestamp without time zone,
is_revocation boolean DEFAULT false NOT NULL,
original_data jsonb,
version_comment text
);


ALTER TABLE public.sequence_entries OWNER TO postgres;

--
-- Name: sequence_entries_preprocessed_data; Type: TABLE; Schema: public; Owner: postgres
--
Expand Down Expand Up @@ -223,18 +246,18 @@ CREATE VIEW public.external_metadata_view AS
WHEN (all_external_metadata.external_metadata IS NULL) THEN jsonb_build_object('metadata', (cpd.processed_data -> 'metadata'::text))
ELSE jsonb_build_object('metadata', ((cpd.processed_data -> 'metadata'::text) || all_external_metadata.external_metadata))
END AS joint_metadata
FROM (( SELECT sequence_entries_preprocessed_data.accession,
sequence_entries_preprocessed_data.version,
sequence_entries_preprocessed_data.pipeline_version,
sequence_entries_preprocessed_data.processed_data,
sequence_entries_preprocessed_data.errors,
sequence_entries_preprocessed_data.warnings,
sequence_entries_preprocessed_data.processing_status,
sequence_entries_preprocessed_data.started_processing_at,
sequence_entries_preprocessed_data.finished_processing_at
FROM public.sequence_entries_preprocessed_data
WHERE (sequence_entries_preprocessed_data.pipeline_version = ( SELECT current_processing_pipeline.version
FROM public.current_processing_pipeline))) cpd
FROM (( SELECT sepd.accession,
sepd.version,
sepd.pipeline_version,
sepd.processed_data,
sepd.errors,
sepd.warnings,
sepd.processing_status,
sepd.started_processing_at,
sepd.finished_processing_at
FROM ((public.sequence_entries_preprocessed_data sepd
JOIN public.sequence_entries se ON (((sepd.accession = se.accession) AND (sepd.version = se.version))))
JOIN public.current_processing_pipeline cpp ON (((se.organism = cpp.organism) AND (sepd.pipeline_version = cpp.version))))) cpd
LEFT JOIN public.all_external_metadata ON (((all_external_metadata.accession = cpd.accession) AND (all_external_metadata.version = cpd.version))));


Expand Down Expand Up @@ -421,28 +444,6 @@ CREATE TABLE public.seqsets (

ALTER TABLE public.seqsets OWNER TO postgres;

--
-- Name: sequence_entries; Type: TABLE; Schema: public; Owner: postgres
--

CREATE TABLE public.sequence_entries (
accession text NOT NULL,
version bigint NOT NULL,
organism text NOT NULL,
submission_id text NOT NULL,
submitter text NOT NULL,
approver text,
group_id integer NOT NULL,
submitted_at timestamp without time zone NOT NULL,
released_at timestamp without time zone,
is_revocation boolean DEFAULT false NOT NULL,
original_data jsonb,
version_comment text
);


ALTER TABLE public.sequence_entries OWNER TO postgres;

--
-- Name: sequence_entries_view; Type: VIEW; Schema: public; Owner: postgres
--
Expand All @@ -466,7 +467,8 @@ CREATE VIEW public.sequence_entries_view AS
(sepd.processed_data || em.joint_metadata) AS joint_metadata,
CASE
WHEN se.is_revocation THEN ( SELECT current_processing_pipeline.version
FROM public.current_processing_pipeline)
FROM public.current_processing_pipeline
WHERE (current_processing_pipeline.organism = se.organism))
ELSE sepd.pipeline_version
END AS pipeline_version,
sepd.errors,
Expand All @@ -484,9 +486,9 @@ CREATE VIEW public.sequence_entries_view AS
WHEN ((sepd.warnings IS NOT NULL) AND (jsonb_array_length(sepd.warnings) > 0)) THEN 'HAS_WARNINGS'::text
ELSE 'NO_ISSUES'::text
END AS processing_result
FROM ((public.sequence_entries se
LEFT JOIN public.sequence_entries_preprocessed_data sepd ON (((se.accession = sepd.accession) AND (se.version = sepd.version) AND (sepd.pipeline_version = ( SELECT current_processing_pipeline.version
FROM public.current_processing_pipeline)))))
FROM (((public.sequence_entries se
LEFT JOIN public.sequence_entries_preprocessed_data sepd ON (((se.accession = sepd.accession) AND (se.version = sepd.version))))
LEFT JOIN public.current_processing_pipeline ccp ON (((se.organism = ccp.organism) AND (sepd.pipeline_version = ccp.version))))
LEFT JOIN public.external_metadata_view em ON (((se.accession = em.accession) AND (se.version = em.version))));


Expand Down Expand Up @@ -601,7 +603,7 @@ ALTER TABLE ONLY public.audit_log
--

ALTER TABLE ONLY public.current_processing_pipeline
ADD CONSTRAINT current_processing_pipeline_pkey PRIMARY KEY (version);
ADD CONSTRAINT current_processing_pipeline_pkey PRIMARY KEY (organism);


--
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ import org.flywaydb.core.Flyway
import org.jetbrains.exposed.spring.autoconfigure.ExposedAutoConfiguration
import org.jetbrains.exposed.sql.DatabaseConfig
import org.jetbrains.exposed.sql.Slf4jSqlDebugLogger
import org.jetbrains.exposed.sql.transactions.transaction
import org.loculus.backend.controller.LoculusCustomHeaders
import org.loculus.backend.log.REQUEST_ID_HEADER_DESCRIPTION
import org.loculus.backend.service.submission.CurrentProcessingPipelineTable
import org.loculus.backend.utils.DateProvider
import org.springdoc.core.customizers.OperationCustomizer
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.autoconfigure.ImportAutoConfiguration
Expand Down Expand Up @@ -67,13 +70,25 @@ class BackendSpringConfig {

@Bean
@Profile("!test")
fun getFlyway(dataSource: DataSource): Flyway {
fun getFlyway(dataSource: DataSource, backendConfig: BackendConfig, dateProvider: DateProvider): Flyway {
val configuration = Flyway.configure()
.baselineOnMigrate(true)
.dataSource(dataSource)
.validateMigrationNaming(true)
val flyway = Flyway(configuration)
flyway.migrate()

// Since migration V1.10 we need to initialize the CurrentProcessingPipelineTable
// in code, because the configured organisms are not known in the SQL table definitions.
logger.info("Initializing CurrentProcessingPipelineTable")
transaction {
val insertedRows = CurrentProcessingPipelineTable.setV1ForOrganismsIfNotExist(
backendConfig.organisms.keys,
dateProvider.getCurrentDateTime(),
)
logger.info("$insertedRows inserted.")
}

return flyway
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ open class SubmissionController(
@RequestParam pipelineVersion: Long,
@RequestHeader(value = HttpHeaders.IF_NONE_MATCH, required = false) ifNoneMatch: String?,
): ResponseEntity<StreamingResponseBody> {
val currentProcessingPipelineVersion = submissionDatabaseService.getCurrentProcessingPipelineVersion()
val currentProcessingPipelineVersion = submissionDatabaseService.getCurrentProcessingPipelineVersion(organism)
if (pipelineVersion < currentProcessingPipelineVersion) {
throw UnprocessableEntityException(
"The processing pipeline version $pipelineVersion is not accepted " +
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.loculus.backend.service.debug

import org.jetbrains.exposed.sql.deleteAll
import org.jetbrains.exposed.sql.insert
import org.loculus.backend.config.BackendConfig
import org.loculus.backend.service.datauseterms.DataUseTermsTable
import org.loculus.backend.service.submission.CurrentProcessingPipelineTable
import org.loculus.backend.service.submission.MetadataUploadAuxTable
Expand All @@ -13,7 +13,7 @@ import org.springframework.stereotype.Component
import org.springframework.transaction.annotation.Transactional

@Component
class DeleteSequenceDataService(private val dateProvider: DateProvider) {
class DeleteSequenceDataService(private val dateProvider: DateProvider, private val config: BackendConfig) {
@Transactional
fun deleteAllSequenceData() {
SequenceEntriesTable.deleteAll()
Expand All @@ -22,9 +22,9 @@ class DeleteSequenceDataService(private val dateProvider: DateProvider) {
SequenceUploadAuxTable.deleteAll()
DataUseTermsTable.deleteAll()
CurrentProcessingPipelineTable.deleteAll()
CurrentProcessingPipelineTable.insert {
it[versionColumn] = 1
it[startedUsingAtColumn] = dateProvider.getCurrentDateTime()
}
CurrentProcessingPipelineTable.setV1ForOrganismsIfNotExist(
config.organisms.keys,
dateProvider.getCurrentDateTime(),
)
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,53 @@
package org.loculus.backend.service.submission

import kotlinx.datetime.LocalDateTime
import org.jetbrains.exposed.sql.Table
import org.jetbrains.exposed.sql.andWhere
import org.jetbrains.exposed.sql.batchInsert
import org.jetbrains.exposed.sql.kotlin.datetime.datetime
import org.jetbrains.exposed.sql.selectAll
import org.jetbrains.exposed.sql.update

const val CURRENT_PROCESSING_PIPELINE_TABLE_NAME = "current_processing_pipeline"

object CurrentProcessingPipelineTable : Table(CURRENT_PROCESSING_PIPELINE_TABLE_NAME) {
val organismColumn = varchar("organism", 255)
val versionColumn = long("version")
val startedUsingAtColumn = datetime("started_using_at")

/**
* Every organism needs to have a current pipeline version in the CurrentProcessingPipelineTable.
* This function sets V1 for all given organisms, if no version is defined yet.
*/
fun setV1ForOrganismsIfNotExist(organisms: Collection<String>, now: LocalDateTime) =
CurrentProcessingPipelineTable.batchInsert(organisms, ignore = true) { organism ->
this[organismColumn] = organism
this[versionColumn] = 1
this[startedUsingAtColumn] = now
}

/**
* Given a version that was found that is potentially newer than the current once, check if the currently stored
* 'current' pipeline version for this organism is less than the one that was found?
* If so, the pipeline needs to 'update' i.e. reprocess older entries.
*/
fun pipelineNeedsUpdate(maybeNewerVersion: Long, organism: String) = CurrentProcessingPipelineTable
.selectAll()
.where { versionColumn less maybeNewerVersion }
.andWhere { organismColumn eq organism }
.empty()
.not()

/**
* Set the pipeline version for the given organism to newVersion.
*/
fun updatePipelineVersion(organism: String, newVersion: Long, startedUsingAt: LocalDateTime) =
CurrentProcessingPipelineTable.update(
where = {
organismColumn eq organism
},
) {
it[versionColumn] = newVersion
it[startedUsingAtColumn] = startedUsingAt
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ object SequenceEntriesTable : Table(SEQUENCE_ENTRIES_TABLE_NAME) {
)
}

fun distinctOrganisms() = SequenceEntriesTable
.select(SequenceEntriesTable.organismColumn)
.withDistinct()
.asSequence()
.map {
it[SequenceEntriesTable.organismColumn]
}

fun accessionVersionIsIn(accessionVersions: List<AccessionVersionInterface>) =
Pair(accessionColumn, versionColumn) inList accessionVersions.toPairs()

Expand Down
Loading

0 comments on commit 7166188

Please sign in to comment.