diff --git a/src/main/scala/fr/polytechnique/cmap/cnam/etl/transformers/exposures/ExposuresTransformer.scala b/src/main/scala/fr/polytechnique/cmap/cnam/etl/transformers/exposures/ExposuresTransformer.scala index 7244de89..bb3967dd 100644 --- a/src/main/scala/fr/polytechnique/cmap/cnam/etl/transformers/exposures/ExposuresTransformer.scala +++ b/src/main/scala/fr/polytechnique/cmap/cnam/etl/transformers/exposures/ExposuresTransformer.scala @@ -56,6 +56,7 @@ class ExposuresTransformer(config: ExposuresTransformerConfig) when(col(FollowUpEnd) < col(ExposureEnd), col(FollowUpEnd)).otherwise(col(ExposureEnd)) ).drop(ExposureEnd) // This makes sure that all the exposures end at the followup end date .withColumnRenamed("Correct_Exposure_End", ExposureEnd) + .where(col(ExposureStart) <= col(ExposureEnd)) .aggregateWeight( cumulativeExposureWindow, cumulativeStartThreshold, diff --git a/src/main/scala/fr/polytechnique/cmap/cnam/etl/transformers/exposures/LimitedExposurePeriodAdder.scala b/src/main/scala/fr/polytechnique/cmap/cnam/etl/transformers/exposures/LimitedExposurePeriodAdder.scala index 1bf9c06f..3d99f930 100644 --- a/src/main/scala/fr/polytechnique/cmap/cnam/etl/transformers/exposures/LimitedExposurePeriodAdder.scala +++ b/src/main/scala/fr/polytechnique/cmap/cnam/etl/transformers/exposures/LimitedExposurePeriodAdder.scala @@ -15,29 +15,29 @@ private class LimitedExposurePeriodAdder(data: DataFrame) extends ExposurePeriod private val orderedWindow = window.orderBy(col(Start)) /** * - * This strategy works as the following: - * 1. Each DrugPurchase will have a corresponding Exposure. - * 2. Each Exposure has one or multiple DrugPurchases. - * 3. An Exposure is defined recursively as follows: - * A. The first DrugPurchase defines a new Exposure. - * B. If there is a DrugPurchase within the defined window of the first DrugPurchase, then expand the current - * Exposure with the DrugPurchase. - * C. Else, close and set the end of the Exposure as the reach of the current and create a new Exposure with the - * DrugPurchase as the new Exposure. - * This strategy is suited for short term effects. - * !!! WARNING: THIS ONLY RETURNS EXPOSURES. - * - * @param minPurchases : Not used. - * @param startDelay : Not used. - * @param purchasesWindow : Not used. - * @param endThresholdGc : the period that defines the reach for Grand Condtionnement. - * @param endThresholdNgc : the period that defines the reach for Non Grand Condtionnement. - * @param endDelay : period added to the end of an exposure. - * @return: A DataFrame of Exposures. - */ + * This strategy works as the following: + * 1. Each DrugPurchase will have a corresponding Exposure. + * 2. Each Exposure has one or multiple DrugPurchases. + * 3. An Exposure is defined recursively as follows: + * A. The first DrugPurchase defines a new Exposure. + * B. If there is a DrugPurchase within the defined window of the first DrugPurchase, then expand the current + * Exposure with the DrugPurchase. + * C. Else, close and set the end of the Exposure as the reach of the latest Drug Purchase and create a new + * Exposure with the next DrugPurchase as the new Exposure. + * This strategy is suited for short term effects. + * !!! WARNING: THIS ONLY RETURNS EXPOSURES. + * + * @param minPurchases : Not used. + * @param startDelay : period to be added to delay the start of each DrugPurchase. + * @param purchasesWindow : Not used. + * @param endThresholdGc : the period that defines the reach for Grand Conditionnement. + * @param endThresholdNgc : the period that defines the reach for Non Grand Conditionnement. + * @param endDelay : period added to the end of an exposure. + * @return: A DataFrame of Exposures. + */ def withStartEnd( minPurchases: Int = 2, - startDelay: Period = 3.months, + startDelay: Period = 5.days, purchasesWindow: Period = 4.months, endThresholdGc: Option[Period] = Some(120.days), endThresholdNgc: Option[Period] = Some(40.days), @@ -46,11 +46,24 @@ private class LimitedExposurePeriodAdder(data: DataFrame) extends ExposurePeriod val outputColumns = (data.columns.toList ++ List(ExposureStart, ExposureEnd)).map(col) - val firstLastPurchase = getFirstAndLastPurchase(data, endThresholdGc.get, endThresholdNgc.get, endDelay.get) + val delayedDrugPurchases = delayStart(data, startDelay) + + val firstLastPurchase = getFirstAndLastPurchase( + delayedDrugPurchases, + endThresholdGc.get, + endThresholdNgc.get, + endDelay.get + ) toExposure(firstLastPurchase).select(outputColumns: _*) } + def delayStart(data: DataFrame, startDelay: Period): DataFrame = { + data.withColumn("NewStart", col("start").addPeriod(startDelay)) + .drop(col("start")) + .withColumnRenamed("NewStart", "start") + } + def toExposure(firstLastPurchase: DataFrame): DataFrame = { val condition = (col("Status") === "first" && lead(col("Status"), 1).over(orderedWindow) === "last") diff --git a/src/test/scala/fr/polytechnique/cmap/cnam/etl/transformers/exposures/LimitedExposurePeriodAdderSuite.scala b/src/test/scala/fr/polytechnique/cmap/cnam/etl/transformers/exposures/LimitedExposurePeriodAdderSuite.scala index 3cc5e47e..1284ceff 100644 --- a/src/test/scala/fr/polytechnique/cmap/cnam/etl/transformers/exposures/LimitedExposurePeriodAdderSuite.scala +++ b/src/test/scala/fr/polytechnique/cmap/cnam/etl/transformers/exposures/LimitedExposurePeriodAdderSuite.scala @@ -12,8 +12,8 @@ class LimitedExposurePeriodAdderSuite extends SharedContext { // instance created from a mock DataFrame, to allow testing the InnerImplicits implicit class private val mockInstance = new LimitedExposurePeriodAdder(mock(classOf[DataFrame])) - "getFirstAndLastPurchase" should "return the first and the last purchase of each potential exposure" in { + "delayStart" should "delay the start of events by the given period" in { val sqlCtx = sqlContext import sqlCtx.implicits._ @@ -30,15 +30,51 @@ class LimitedExposurePeriodAdderSuite extends SharedContext { ("A", "S", makeTS(2008, 2, 1), 1.0) ).toDF(PatientID, Value, Start, Weight) + + val expected = Seq( + ("A", "P", makeTS(2008, 1, 11), 1.0), + ("A", "P", makeTS(2008, 2, 11), 1.0), + ("A", "P", makeTS(2008, 5, 11), 1.0), + ("A", "P", makeTS(2008, 6, 11), 1.0), + ("A", "P", makeTS(2008, 7, 11), 1.0), + ("A", "P", makeTS(2009, 1, 11), 1.0), + ("A", "P", makeTS(2009, 7, 11), 1.0), + ("A", "P", makeTS(2009, 8, 11), 1.0), + ("A", "S", makeTS(2008, 2, 11), 1.0) + ).toDF(PatientID, Value, Start, Weight).select(PatientID, Value, Weight, Start) + + val result = mockInstance.delayStart(input, 10.days) + + assertDFs(expected, result) + } + + "getFirstAndLastPurchase" should "return the first and the last purchase of each potential exposure" in { + + val sqlCtx = sqlContext + import sqlCtx.implicits._ + + // Given + val input = Seq( + ("A", "P", makeTS(2008, 1, 1), 2.0), + ("A", "P", makeTS(2008, 2, 1), 1.0), + ("A", "P", makeTS(2008, 5, 1), 1.0), + ("A", "P", makeTS(2008, 6, 1), 1.0), + ("A", "P", makeTS(2008, 7, 1), 1.0), + ("A", "P", makeTS(2009, 1, 1), 1.0), + ("A", "P", makeTS(2009, 7, 1), 1.0), + ("A", "P", makeTS(2009, 8, 1), 1.0), + ("A", "S", makeTS(2008, 2, 1), 2.0) + ).toDF(PatientID, Value, Start, Weight) + val expected = Seq( - ("A", "P", makeTS(2008, 1, 1), 1.0, "first", makeTS(2008, 2, 11)), + ("A", "P", makeTS(2008, 1, 1), 2.0, "first", makeTS(2008, 4, 11)), ("A", "P", makeTS(2008, 2, 1), 1.0, "last", makeTS(2008, 3, 11)), ("A", "P", makeTS(2008, 5, 1), 1.0, "first", makeTS(2008, 6, 11)), ("A", "P", makeTS(2008, 7, 1), 1.0, "last", makeTS(2008, 8, 11)), ("A", "P", makeTS(2009, 1, 1), 1.0, "first", makeTS(2009, 2, 11)), ("A", "P", makeTS(2009, 7, 1), 1.0, "first", makeTS(2009, 8, 11)), ("A", "P", makeTS(2009, 8, 1), 1.0, "last", makeTS(2009, 9, 11)), - ("A", "S", makeTS(2008, 2, 1), 1.0, "first", makeTS(2008, 3, 11)) + ("A", "S", makeTS(2008, 2, 1), 2.0, "first", makeTS(2008, 5, 11)) ).toDF(PatientID, Value, Start, Weight, "Status", "purchaseReach") val result = mockInstance.getFirstAndLastPurchase(input, 1.months, 3.month, 10.days) @@ -93,15 +129,15 @@ class LimitedExposurePeriodAdderSuite extends SharedContext { ).toDF(PatientID, Value, Start, Weight) val expected = Seq( - ("A", "P", makeTS(2008, 1, 1), 1.0, makeTS(2008, 1, 1), makeTS(2008, 3, 11)), - ("A", "P", makeTS(2008, 5, 1), 1.0, makeTS(2008, 5, 1), makeTS(2008, 8, 11)), - ("A", "P", makeTS(2009, 1, 1), 1.0, makeTS(2009, 1, 1), makeTS(2009, 2, 11)), - ("A", "P", makeTS(2009, 7, 1), 1.0, makeTS(2009, 7, 1), makeTS(2009, 9, 11)), - ("A", "S", makeTS(2008, 2, 1), 1.0, makeTS(2008, 2, 1), makeTS(2008, 3, 11)) + ("A", "P", makeTS(2008, 1, 6), 1.0, makeTS(2008, 1, 6), makeTS(2008, 3, 16)), + ("A", "P", makeTS(2008, 5, 6), 1.0, makeTS(2008, 5, 6), makeTS(2008, 8, 16)), + ("A", "P", makeTS(2009, 1, 6), 1.0, makeTS(2009, 1, 6), makeTS(2009, 2, 16)), + ("A", "P", makeTS(2009, 7, 6), 1.0, makeTS(2009, 7, 6), makeTS(2009, 9, 16)), + ("A", "S", makeTS(2008, 2, 6), 1.0, makeTS(2008, 2, 6), makeTS(2008, 3, 16)) ).toDF(PatientID, Value, Start, Weight, ExposureStart, ExposureEnd) val result = new LimitedExposurePeriodAdder(input) - .withStartEnd(0, 0.months, 0.months, Some(1.months), Some(3.months), Some(10.days)) + .withStartEnd(0, 5.days, 0.months, Some(1.months), Some(3.months), Some(10.days)) assertDFs(expected, result) }