Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CNAM-445:Create-studies-markdown #245

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 179 additions & 0 deletions Studies.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
# Studies

A study is created using [Extractor](https://github.com/X-DataInitiative/SCALPEL-Extraction/blob/master/src/main/scala/fr/polytechnique/cmap/cnam/etl/extractors/Extractor.scala),
[Transformer](Transformer.md), [Events](Events.md),
[Configuration](https://github.com/X-DataInitiative/SCALPEL-Extraction/blob/master/src/main/scala/fr/polytechnique/cmap/cnam/study/fall/config/FallConfig.scala)
and a [main](https://github.com/X-DataInitiative/SCALPEL-Extraction/blob/master/src/main/scala/fr/polytechnique/cmap/cnam/study/fall/FallMain.scala)
element that manages the treatment.
To exemplify this process we will use the [Fall](https://github.com/X-DataInitiative/SCALPEL-Extraction/blob/master/src/main/scala/fr/polytechnique/cmap/cnam/study/fall/)'s study.

The entry point for this study is the [FallMain](https://github.com/X-DataInitiative/SCALPEL-Extraction/blob/master/src/main/scala/fr/polytechnique/cmap/cnam/study/fall/FallMain.scala)
object inherited from [Main](https://github.com/X-DataInitiative/SCALPEL-Extraction/blob/master/src/main/scala/fr/polytechnique/cmap/cnam/Main.scala)
and [FractureCodes](https://github.com/X-DataInitiative/SCALPEL-Extraction/blob/master/src/main/scala/fr/polytechnique/cmap/cnam/study/fall/codes/FractureCodes.scala),
from the first one you get the necessary elements to create a Spark session, use it and destroy it; from the second one
you get the codes to identify fractures.

In the `Main` object we find the `main` method that contains the necessary to start and stop SparkSession,
recover the arguments that are going to be
used during the study, these arguments are the path of the configuration file and the
environment in which the treatment is launched and to pass both of them as parameters through `run` method.

```
def main(args: Array[String]): Unit = {
//This method starts the SparkSession
startContext()
val sqlCtx = sqlContext
val argsMap = args.map(
arg => arg.split("=")(0) -> arg.split("=")(1)
).toMap
try {
//This method allows pass as parameters SQLContext and parsed arguments
run(sqlCtx, argsMap)
}
finally stopContext()
}
```
```
Arguments

"conf"="/src/main/resources/config/fall/default.conf","env"="test"
```

The first step once treatment is running is load in config object all config values from config file.

The `FallConfig` object allows to parametrize the study, with input, output and other parameters
to control study's values. There are a template to use with fall study.

```
# Template configuration file for the Fall study. To override the defaults, copy this file to your working
# directory, then uncomment the desired lines and pass the file path to spark-submit

# input.dcir = "src/test/resources/test-input/DCIR.parquet"
# input.mco = "src/test/resources/test-input/MCO.parquet"
# input.mco_ce = "src/test/resources/test-input/MCO_CE.parquet"
# input.ir_ben = "src/test/resources/test-input/IR_BEN_R.parquet"
# input.ir_imb = "src/test/resources/test-input/IR_IMB_R.parquet"
# input.ir_pha = "src/test/resources/test-input/IR_PHA_R_With_molecules.parquet"

# output.root = "target/test/output"
# output.save_mode = "errorIfExists" // Possible values = [overwrite, append, errorIfExists, withTimestamp] Strategy of saving output data. errorIfExists by deault

# exposures.start_delay: 0 months // 0+ (Usually between 0 and 3). Represents the delay in months between a dispensation and its exposure start date.
# exposures.purchases_window: 0 months // 0+ (Usually 0 or 6) Represents the window size in months. Ignored when min_purchases=1.
# exposures.end_threshold_gc: 90 days // If periodStrategy="limited", represents the period without purchases for an exposure to be considered "finished".
# exposures.end_threshold_ngc: 30 days // If periodStrategy="limited", represents the period without purchases for an exposure to be considered "finished".
# exposures.end_delay: 30 days // Number of periods that we add to the exposure end to delay it (lag).

# interactions.level: 3 // Integer representing the maximum number of values of Interaction. Please be careful as this not scale well beyond 5 when the data contains a patient with very high number of exposures

# drugs.level: "Therapeutic" // Options are Therapeutic, Pharmacological, MoleculeCombination
# drugs.families: ["Antihypertenseurs", "Antidepresseurs", "Neuroleptiques", "Hypnotiques"]

# patients.start_gap_in_months: 2 // filter Removes all patients who have got an event within N months after the study start.

# sites.sites: ["BodySites"]

# outcomes.fall_frame: 0 months // fractures are grouped if they happen in the same site within the period fallFrame, (default value 0 means no group)

# run_parameters.outcome: ["Acts", "Diagnoses", "Outcomes"] // pipeline of calculation of outcome, possible values : Acts, Diagnoses, and Outcomes
# run_parameters.exposure: ["Patients", "StartGapPatients", "DrugPurchases", "Exposures"] // pipeline of the calculation of exposure, possible values : Patients, StartGapPatients, DrugPurchases, Exposures

```

Extracting from config object paths to load sources in sources objects or parameters to filter
study's data.
Later it runs three methods that yield [OperationMetadata](https://github.com/X-DataInitiative/SCALPEL-Extraction/blob/master/src/main/scala/fr/polytechnique/cmap/cnam/util/reporting/OperationMetadata.scala).

All three methods takes the same parameters `Sources` and `FallConfig`, these methods extract data from source for example in
`computecontrols`:

```

//computecontrols
val opioids = OpioidsExtractor.extract(sources).cache()

object OpioidsExtractor {
def extract(sources: Sources): Dataset[Event[Drug]] = {
new DrugExtractor(DrugConfig(MoleculeCombinationLevel, List(Opioids))).extract(sources, Set.empty)
}
}

//Extractor is inherited here
class DrugExtractor(drugConfig: DrugConfig) extends Extractor[Drug] {

override def extract(
sources: Sources,
codes: Set[String])
(implicit ctag: universe.TypeTag[Drug]): Dataset[Event[Drug]] = {

val input: DataFrame = getInput(sources)

import input.sqlContext.implicits._

{
if (drugConfig.families.isEmpty) {
input.filter(isInExtractorScope _)
}
else {
input.filter(isInExtractorScope _).filter(isInStudy(codes) _)
}
}.flatMap(builder _).distinct()
}

```

and some methods as well use `Transformer` to create `Event`([Events](Events.md)), for instance `computeOutcomes`
extract data from sources and later if condition is true, transform them into `Outcome`'s `Event` type.

```
if (fallConfig.runParameters.outcomes) {
logger.info("Fractures")
val fractures: Dataset[Event[Outcome]] = new FracturesTransformer(fallConfig)
.transform(optionLiberalActs.get, optionActs.get, optionDiagnoses.get)
operationsMetadata += {
OperationReporter
.report(
"fractures",
List("acts"),
OperationTypes.Outcomes,
fractures.toDF,
Path(fallConfig.output.outputSavePath),
fallConfig.output.saveMode
)
}
}
```


Here as example the output of `computeExposures`:

```
OperationMetadata(drug_purchases,List(DCIR),dispensations,target/test/output/drug_purchases/data,target/test/output/drug_purchases/patients),
OperationMetadata(extract_patients,List(DCIR, MCO, IR_BEN_R, MCO_CE),patients,target/test/output/extract_patients/data,),
OperationMetadata(filter_patients,List(drug_purchases, extract_patients),patients,target/test/output/filter_patients/data,)
```
The result of these methods, all `OperationMetadata`, are stored in a value `operationsMetadata`
and this value is stored as a list with other descriptive values
(class name,start timestamp,end timestamp,operationsMetadata) in a case class `MainMetadata`.

```
case class MainMetadata(
className: String,
startTimestamp: java.util.Date,
endTimestamp: java.util.Date,
//This is the parameter that store the list of operationsMetadata
operations: List[OperationMetadata])
extends JsonSerializable
```
This one is the result of the study, it's saved as Json with other values passed as parameters to
`writeMetaData` method in [OperationReporter](https://github.com/X-DataInitiative/SCALPEL-Extraction/blob/master/src/main/scala/fr/polytechnique/cmap/cnam/util/reporting/OperationReporter.scala)
object.
```
// Write Metadata
val metadata = MainMetadata(this.getClass.getName, startTimestamp, new java.util.Date(), operationsMetadata.toList)
val metadataJson: String = metadata.toJsonString()

OperationReporter
.writeMetaData(metadataJson, "metadata_fall_" + format.format(startTimestamp) + ".json", argsMap("env"))

```