Skip to content

Commit

Permalink
Merge pull request #200 from X-DataInitiative/CNAM-410-Start-Delay-Li…
Browse files Browse the repository at this point in the history
…mitedExposure

CNAM-410: Add start delay for LimitedExposurePeriodAdder strategy
  • Loading branch information
Youcef Sebiat authored Sep 25, 2019
2 parents 9a839f0 + 0e8216c commit 151d597
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class ExposuresTransformer(config: ExposuresTransformerConfig)
when(col(FollowUpEnd) < col(ExposureEnd), col(FollowUpEnd)).otherwise(col(ExposureEnd))
).drop(ExposureEnd) // This makes sure that all the exposures end at the followup end date
.withColumnRenamed("Correct_Exposure_End", ExposureEnd)
.where(col(ExposureStart) <= col(ExposureEnd))
.aggregateWeight(
cumulativeExposureWindow,
cumulativeStartThreshold,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,29 @@ private class LimitedExposurePeriodAdder(data: DataFrame) extends ExposurePeriod
private val orderedWindow = window.orderBy(col(Start))

/** *
* This strategy works as the following:
* 1. Each DrugPurchase will have a corresponding Exposure.
* 2. Each Exposure has one or multiple DrugPurchases.
* 3. An Exposure is defined recursively as follows:
* A. The first DrugPurchase defines a new Exposure.
* B. If there is a DrugPurchase within the defined window of the first DrugPurchase, then expand the current
* Exposure with the DrugPurchase.
* C. Else, close and set the end of the Exposure as the reach of the current and create a new Exposure with the
* DrugPurchase as the new Exposure.
* This strategy is suited for short term effects.
* !!! WARNING: THIS ONLY RETURNS EXPOSURES.
*
* @param minPurchases : Not used.
* @param startDelay : Not used.
* @param purchasesWindow : Not used.
* @param endThresholdGc : the period that defines the reach for Grand Condtionnement.
* @param endThresholdNgc : the period that defines the reach for Non Grand Condtionnement.
* @param endDelay : period added to the end of an exposure.
* @return: A DataFrame of Exposures.
*/
* This strategy works as the following:
* 1. Each DrugPurchase will have a corresponding Exposure.
* 2. Each Exposure has one or multiple DrugPurchases.
* 3. An Exposure is defined recursively as follows:
* A. The first DrugPurchase defines a new Exposure.
* B. If there is a DrugPurchase within the defined window of the first DrugPurchase, then expand the current
* Exposure with the DrugPurchase.
* C. Else, close and set the end of the Exposure as the reach of the latest Drug Purchase and create a new
* Exposure with the next DrugPurchase as the new Exposure.
* This strategy is suited for short term effects.
* !!! WARNING: THIS ONLY RETURNS EXPOSURES.
*
* @param minPurchases : Not used.
* @param startDelay : period to be added to delay the start of each DrugPurchase.
* @param purchasesWindow : Not used.
* @param endThresholdGc : the period that defines the reach for Grand Conditionnement.
* @param endThresholdNgc : the period that defines the reach for Non Grand Conditionnement.
* @param endDelay : period added to the end of an exposure.
* @return: A DataFrame of Exposures.
*/
def withStartEnd(
minPurchases: Int = 2,
startDelay: Period = 3.months,
startDelay: Period = 5.days,
purchasesWindow: Period = 4.months,
endThresholdGc: Option[Period] = Some(120.days),
endThresholdNgc: Option[Period] = Some(40.days),
Expand All @@ -46,11 +46,24 @@ private class LimitedExposurePeriodAdder(data: DataFrame) extends ExposurePeriod

val outputColumns = (data.columns.toList ++ List(ExposureStart, ExposureEnd)).map(col)

val firstLastPurchase = getFirstAndLastPurchase(data, endThresholdGc.get, endThresholdNgc.get, endDelay.get)
val delayedDrugPurchases = delayStart(data, startDelay)

val firstLastPurchase = getFirstAndLastPurchase(
delayedDrugPurchases,
endThresholdGc.get,
endThresholdNgc.get,
endDelay.get
)

toExposure(firstLastPurchase).select(outputColumns: _*)
}

def delayStart(data: DataFrame, startDelay: Period): DataFrame = {
data.withColumn("NewStart", col("start").addPeriod(startDelay))
.drop(col("start"))
.withColumnRenamed("NewStart", "start")
}

def toExposure(firstLastPurchase: DataFrame): DataFrame = {
val condition = (col("Status") === "first"
&& lead(col("Status"), 1).over(orderedWindow) === "last")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ class LimitedExposurePeriodAdderSuite extends SharedContext {
// instance created from a mock DataFrame, to allow testing the InnerImplicits implicit class
private val mockInstance = new LimitedExposurePeriodAdder(mock(classOf[DataFrame]))

"getFirstAndLastPurchase" should "return the first and the last purchase of each potential exposure" in {

"delayStart" should "delay the start of events by the given period" in {
val sqlCtx = sqlContext
import sqlCtx.implicits._

Expand All @@ -30,15 +30,51 @@ class LimitedExposurePeriodAdderSuite extends SharedContext {
("A", "S", makeTS(2008, 2, 1), 1.0)
).toDF(PatientID, Value, Start, Weight)


val expected = Seq(
("A", "P", makeTS(2008, 1, 11), 1.0),
("A", "P", makeTS(2008, 2, 11), 1.0),
("A", "P", makeTS(2008, 5, 11), 1.0),
("A", "P", makeTS(2008, 6, 11), 1.0),
("A", "P", makeTS(2008, 7, 11), 1.0),
("A", "P", makeTS(2009, 1, 11), 1.0),
("A", "P", makeTS(2009, 7, 11), 1.0),
("A", "P", makeTS(2009, 8, 11), 1.0),
("A", "S", makeTS(2008, 2, 11), 1.0)
).toDF(PatientID, Value, Start, Weight).select(PatientID, Value, Weight, Start)

val result = mockInstance.delayStart(input, 10.days)

assertDFs(expected, result)
}

"getFirstAndLastPurchase" should "return the first and the last purchase of each potential exposure" in {

val sqlCtx = sqlContext
import sqlCtx.implicits._

// Given
val input = Seq(
("A", "P", makeTS(2008, 1, 1), 2.0),
("A", "P", makeTS(2008, 2, 1), 1.0),
("A", "P", makeTS(2008, 5, 1), 1.0),
("A", "P", makeTS(2008, 6, 1), 1.0),
("A", "P", makeTS(2008, 7, 1), 1.0),
("A", "P", makeTS(2009, 1, 1), 1.0),
("A", "P", makeTS(2009, 7, 1), 1.0),
("A", "P", makeTS(2009, 8, 1), 1.0),
("A", "S", makeTS(2008, 2, 1), 2.0)
).toDF(PatientID, Value, Start, Weight)

val expected = Seq(
("A", "P", makeTS(2008, 1, 1), 1.0, "first", makeTS(2008, 2, 11)),
("A", "P", makeTS(2008, 1, 1), 2.0, "first", makeTS(2008, 4, 11)),
("A", "P", makeTS(2008, 2, 1), 1.0, "last", makeTS(2008, 3, 11)),
("A", "P", makeTS(2008, 5, 1), 1.0, "first", makeTS(2008, 6, 11)),
("A", "P", makeTS(2008, 7, 1), 1.0, "last", makeTS(2008, 8, 11)),
("A", "P", makeTS(2009, 1, 1), 1.0, "first", makeTS(2009, 2, 11)),
("A", "P", makeTS(2009, 7, 1), 1.0, "first", makeTS(2009, 8, 11)),
("A", "P", makeTS(2009, 8, 1), 1.0, "last", makeTS(2009, 9, 11)),
("A", "S", makeTS(2008, 2, 1), 1.0, "first", makeTS(2008, 3, 11))
("A", "S", makeTS(2008, 2, 1), 2.0, "first", makeTS(2008, 5, 11))
).toDF(PatientID, Value, Start, Weight, "Status", "purchaseReach")

val result = mockInstance.getFirstAndLastPurchase(input, 1.months, 3.month, 10.days)
Expand Down Expand Up @@ -93,15 +129,15 @@ class LimitedExposurePeriodAdderSuite extends SharedContext {
).toDF(PatientID, Value, Start, Weight)

val expected = Seq(
("A", "P", makeTS(2008, 1, 1), 1.0, makeTS(2008, 1, 1), makeTS(2008, 3, 11)),
("A", "P", makeTS(2008, 5, 1), 1.0, makeTS(2008, 5, 1), makeTS(2008, 8, 11)),
("A", "P", makeTS(2009, 1, 1), 1.0, makeTS(2009, 1, 1), makeTS(2009, 2, 11)),
("A", "P", makeTS(2009, 7, 1), 1.0, makeTS(2009, 7, 1), makeTS(2009, 9, 11)),
("A", "S", makeTS(2008, 2, 1), 1.0, makeTS(2008, 2, 1), makeTS(2008, 3, 11))
("A", "P", makeTS(2008, 1, 6), 1.0, makeTS(2008, 1, 6), makeTS(2008, 3, 16)),
("A", "P", makeTS(2008, 5, 6), 1.0, makeTS(2008, 5, 6), makeTS(2008, 8, 16)),
("A", "P", makeTS(2009, 1, 6), 1.0, makeTS(2009, 1, 6), makeTS(2009, 2, 16)),
("A", "P", makeTS(2009, 7, 6), 1.0, makeTS(2009, 7, 6), makeTS(2009, 9, 16)),
("A", "S", makeTS(2008, 2, 6), 1.0, makeTS(2008, 2, 6), makeTS(2008, 3, 16))
).toDF(PatientID, Value, Start, Weight, ExposureStart, ExposureEnd)

val result = new LimitedExposurePeriodAdder(input)
.withStartEnd(0, 0.months, 0.months, Some(1.months), Some(3.months), Some(10.days))
.withStartEnd(0, 5.days, 0.months, Some(1.months), Some(3.months), Some(10.days))

assertDFs(expected, result)
}
Expand Down

0 comments on commit 151d597

Please sign in to comment.