From 60708d875e484191333da88b1809ce24987374bf Mon Sep 17 00:00:00 2001 From: Angel Francisco Orta Date: Fri, 13 Mar 2020 19:19:10 +0100 Subject: [PATCH 1/2] CNAM-445:Create-studies-markdown --- Studies.md | 114 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) create mode 100644 Studies.md diff --git a/Studies.md b/Studies.md new file mode 100644 index 00000000..c55217f0 --- /dev/null +++ b/Studies.md @@ -0,0 +1,114 @@ +# Studies + +A study is created using [Extractor](https://github.com/X-DataInitiative/SCALPEL-Extraction/blob/master/src/main/scala/fr/polytechnique/cmap/cnam/etl/extractors/Extractor.scala), +[Transformer](Transformer.md), [Events](Events.md), +[Configuration](https://github.com/X-DataInitiative/SCALPEL-Extraction/blob/master/src/main/scala/fr/polytechnique/cmap/cnam/study/fall/config/FallConfig.scala) +and a [main](https://github.com/X-DataInitiative/SCALPEL-Extraction/blob/master/src/main/scala/fr/polytechnique/cmap/cnam/study/fall/FallMain.scala) +element that manages the treatment. +To exemplify this process we will use the [Fall](https://github.com/X-DataInitiative/SCALPEL-Extraction/blob/master/src/main/scala/fr/polytechnique/cmap/cnam/study/fall/)'s study. + +The entry point for this study is the [FallMain](https://github.com/X-DataInitiative/SCALPEL-Extraction/blob/master/src/main/scala/fr/polytechnique/cmap/cnam/study/fall/FallMain.scala) +object inherited from [Main](https://github.com/X-DataInitiative/SCALPEL-Extraction/blob/master/src/main/scala/fr/polytechnique/cmap/cnam/Main.scala) +and [FractureCodes](https://github.com/X-DataInitiative/SCALPEL-Extraction/blob/master/src/main/scala/fr/polytechnique/cmap/cnam/study/fall/codes/FractureCodes.scala), +from the first one you get the necessary elements to create a Spark session, use it and destroy it; from the second one +you get the codes to identify fractures. + +In the `Main` object we find the `main` method that contains the necessary to start and stop SparkSession, +recover the arguments that are going to be +used during the study, these arguments are the path of the configuration file and the +environment in which the treatment is launched and to pass both of them as parameters through `run` method. + +``` + def main(args: Array[String]): Unit = { + //This method starts the SparkSession + startContext() + val sqlCtx = sqlContext + val argsMap = args.map( + arg => arg.split("=")(0) -> arg.split("=")(1) + ).toMap + try { + //This method allows pass as parameters SQLContext and parsed arguments + run(sqlCtx, argsMap) + } + finally stopContext() + } +``` +``` +Arguments + + "conf"="/src/main/resources/config/fall/default.conf","env"="test" +``` + +The first step once treatment is running is load in config object all config values from config file. + +The `FallConfig` object allows to parametrize the study, with input, output and other parameters +to control study's values. There are a template to use with fall study. + +``` +# Template configuration file for the Fall study. To override the defaults, copy this file to your working +# directory, then uncomment the desired lines and pass the file path to spark-submit + +# input.dcir = "src/test/resources/test-input/DCIR.parquet" +# input.mco = "src/test/resources/test-input/MCO.parquet" +# input.mco_ce = "src/test/resources/test-input/MCO_CE.parquet" +# input.ir_ben = "src/test/resources/test-input/IR_BEN_R.parquet" +# input.ir_imb = "src/test/resources/test-input/IR_IMB_R.parquet" +# input.ir_pha = "src/test/resources/test-input/IR_PHA_R_With_molecules.parquet" + +# output.root = "target/test/output" +# output.save_mode = "errorIfExists" // Possible values = [overwrite, append, errorIfExists, withTimestamp] Strategy of saving output data. errorIfExists by deault + +# exposures.start_delay: 0 months // 0+ (Usually between 0 and 3). Represents the delay in months between a dispensation and its exposure start date. +# exposures.purchases_window: 0 months // 0+ (Usually 0 or 6) Represents the window size in months. Ignored when min_purchases=1. +# exposures.end_threshold_gc: 90 days // If periodStrategy="limited", represents the period without purchases for an exposure to be considered "finished". +# exposures.end_threshold_ngc: 30 days // If periodStrategy="limited", represents the period without purchases for an exposure to be considered "finished". +# exposures.end_delay: 30 days // Number of periods that we add to the exposure end to delay it (lag). + +# interactions.level: 3 // Integer representing the maximum number of values of Interaction. Please be careful as this not scale well beyond 5 when the data contains a patient with very high number of exposures + +# drugs.level: "Therapeutic" // Options are Therapeutic, Pharmacological, MoleculeCombination +# drugs.families: ["Antihypertenseurs", "Antidepresseurs", "Neuroleptiques", "Hypnotiques"] + +# patients.start_gap_in_months: 2 // filter Removes all patients who have got an event within N months after the study start. + +# sites.sites: ["BodySites"] + +# outcomes.fall_frame: 0 months // fractures are grouped if they happen in the same site within the period fallFrame, (default value 0 means no group) + +# run_parameters.outcome: ["Acts", "Diagnoses", "Outcomes"] // pipeline of calculation of outcome, possible values : Acts, Diagnoses, and Outcomes +# run_parameters.exposure: ["Patients", "StartGapPatients", "DrugPurchases", "Exposures"] // pipeline of the calculation of exposure, possible values : Patients, StartGapPatients, DrugPurchases, Exposures + +``` + +extracting from config object paths to load sources in sources objects or parameters to filter +study's data. +Later it runs three methods that yield [OperationMetadata](https://github.com/X-DataInitiative/SCALPEL-Extraction/blob/master/src/main/scala/fr/polytechnique/cmap/cnam/util/reporting/OperationMetadata.scala). + +All three methods takes the same parameters `Sources` and `FallConfig`, they are: + +- computeControls +- computeExposures +- computeOutcomes + +Here as example the output of `computeExposures`: + +``` +OperationMetadata(drug_purchases,List(DCIR),dispensations,target/test/output/drug_purchases/data,target/test/output/drug_purchases/patients), +OperationMetadata(extract_patients,List(DCIR, MCO, IR_BEN_R, MCO_CE),patients,target/test/output/extract_patients/data,), +OperationMetadata(filter_patients,List(drug_purchases, extract_patients),patients,target/test/output/filter_patients/data,) +``` +The result of these methods, all `OperationMetadata` are stored in a value `operationsMetadata` +and this values is stored with other descriptive values +(class name,start timestamp,end timestamp,operationsMetadata) in a case class `MainMetadata`. + +``` +case class MainMetadata( + className: String, + startTimestamp: java.util.Date, + endTimestamp: java.util.Date, + operations: List[OperationMetadata]) + extends JsonSerializable +``` + +This is the result of the study, is saved as Json in a environment and path passed as parameters to +`writeMetaData` method. \ No newline at end of file From 16a22882a11e6c712e1ef7b8a4cecdbaa4ba5dce Mon Sep 17 00:00:00 2001 From: Angel Francisco Orta Date: Tue, 24 Mar 2020 10:36:52 +0100 Subject: [PATCH 2/2] CNAM-445:Added some information to complete study's behaviour --- Studies.md | 83 ++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 74 insertions(+), 9 deletions(-) diff --git a/Studies.md b/Studies.md index c55217f0..20d83ce4 100644 --- a/Studies.md +++ b/Studies.md @@ -80,15 +80,70 @@ to control study's values. There are a template to use with fall study. ``` -extracting from config object paths to load sources in sources objects or parameters to filter +Extracting from config object paths to load sources in sources objects or parameters to filter study's data. Later it runs three methods that yield [OperationMetadata](https://github.com/X-DataInitiative/SCALPEL-Extraction/blob/master/src/main/scala/fr/polytechnique/cmap/cnam/util/reporting/OperationMetadata.scala). -All three methods takes the same parameters `Sources` and `FallConfig`, they are: +All three methods takes the same parameters `Sources` and `FallConfig`, these methods extract data from source for example in +`computecontrols`: + +``` + +//computecontrols + val opioids = OpioidsExtractor.extract(sources).cache() + +object OpioidsExtractor { + def extract(sources: Sources): Dataset[Event[Drug]] = { + new DrugExtractor(DrugConfig(MoleculeCombinationLevel, List(Opioids))).extract(sources, Set.empty) + } +} + +//Extractor is inherited here +class DrugExtractor(drugConfig: DrugConfig) extends Extractor[Drug] { + + override def extract( + sources: Sources, + codes: Set[String]) + (implicit ctag: universe.TypeTag[Drug]): Dataset[Event[Drug]] = { + + val input: DataFrame = getInput(sources) + + import input.sqlContext.implicits._ + + { + if (drugConfig.families.isEmpty) { + input.filter(isInExtractorScope _) + } + else { + input.filter(isInExtractorScope _).filter(isInStudy(codes) _) + } + }.flatMap(builder _).distinct() + } + +``` + +and some methods as well use `Transformer` to create `Event`([Events](Events.md)), for instance `computeOutcomes` +extract data from sources and later if condition is true, transform them into `Outcome`'s `Event` type. + +``` + if (fallConfig.runParameters.outcomes) { + logger.info("Fractures") + val fractures: Dataset[Event[Outcome]] = new FracturesTransformer(fallConfig) + .transform(optionLiberalActs.get, optionActs.get, optionDiagnoses.get) + operationsMetadata += { + OperationReporter + .report( + "fractures", + List("acts"), + OperationTypes.Outcomes, + fractures.toDF, + Path(fallConfig.output.outputSavePath), + fallConfig.output.saveMode + ) + } + } +``` -- computeControls -- computeExposures -- computeOutcomes Here as example the output of `computeExposures`: @@ -97,8 +152,8 @@ OperationMetadata(drug_purchases,List(DCIR),dispensations,target/test/output/dru OperationMetadata(extract_patients,List(DCIR, MCO, IR_BEN_R, MCO_CE),patients,target/test/output/extract_patients/data,), OperationMetadata(filter_patients,List(drug_purchases, extract_patients),patients,target/test/output/filter_patients/data,) ``` -The result of these methods, all `OperationMetadata` are stored in a value `operationsMetadata` -and this values is stored with other descriptive values +The result of these methods, all `OperationMetadata`, are stored in a value `operationsMetadata` +and this value is stored as a list with other descriptive values (class name,start timestamp,end timestamp,operationsMetadata) in a case class `MainMetadata`. ``` @@ -106,9 +161,19 @@ case class MainMetadata( className: String, startTimestamp: java.util.Date, endTimestamp: java.util.Date, +//This is the parameter that store the list of operationsMetadata operations: List[OperationMetadata]) extends JsonSerializable ``` +This one is the result of the study, it's saved as Json with other values passed as parameters to +`writeMetaData` method in [OperationReporter](https://github.com/X-DataInitiative/SCALPEL-Extraction/blob/master/src/main/scala/fr/polytechnique/cmap/cnam/util/reporting/OperationReporter.scala) +object. +``` + // Write Metadata + val metadata = MainMetadata(this.getClass.getName, startTimestamp, new java.util.Date(), operationsMetadata.toList) + val metadataJson: String = metadata.toJsonString() + + OperationReporter + .writeMetaData(metadataJson, "metadata_fall_" + format.format(startTimestamp) + ".json", argsMap("env")) -This is the result of the study, is saved as Json in a environment and path passed as parameters to -`writeMetaData` method. \ No newline at end of file +``` \ No newline at end of file