Skip to content

Commit

Permalink
#35 show 'merge rate' in isJoinableWith check
Browse files Browse the repository at this point in the history
  • Loading branch information
FRosner committed Feb 9, 2016
2 parents db09484 + c0e7d9b commit 2e70ef5
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 8 deletions.
17 changes: 12 additions & 5 deletions src/main/scala/de/frosner/ddq/core/Check.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ case class Check(dataFrame: DataFrame,
displayName: Option[String] = Option.empty,
cacheMethod: Option[StorageLevel] = Check.DEFAULT_CACHE_METHOD,
constraints: Seq[Constraint] = Seq.empty) {

def addConstraint(c: Constraint): Check =
Check(dataFrame, displayName, cacheMethod, constraints ++ List(c))

Expand Down Expand Up @@ -563,15 +563,22 @@ object Check {
}:_*)

// check if join yields some values
val join = renamedDf.distinct.join(renamedRef, renamedColumns.map{
val renamedDfDistinct = renamedDf.distinct
val distinctBefore = renamedDfDistinct.count
val join = renamedDfDistinct.join(renamedRef, renamedColumns.map{
case (baseColumn, refColumn) => new Column(baseColumn) === new Column(refColumn)
}.reduce(_ && _))
val matchingRows = join.count
val matchingRows = join.distinct.count
val unmatchedKeysPercentage = ((matchingRows.toDouble / distinctBefore) * 100).round

val columnNoun = if(columns.length == 1) "Column" else "Columns"
val columnsString = columns.map{ case (baseCol, refCol) => baseCol + "->" + refCol }.mkString(", ")
if (matchingRows > 0)
ConstraintSuccess(s"${if(columns.length == 1) "Column" else "Columns"} $columnsString can be used for joining (${ if(matchingRows == 1) "one distinct row" else s"$matchingRows distinct rows"} match)")
ConstraintSuccess(s"$columnNoun $columnsString can be used for joining (" +
s"join columns cardinality in base table: $distinctBefore, " +
s"join columns cardinality after joining: $matchingRows ($unmatchedKeysPercentage" + "%)")
else
ConstraintFailure(s"${if(columns.length == 1) "Column" else "Columns"} $columnsString cannot be used for joining (no rows match)")
ConstraintFailure(s"$columnNoun $columnsString cannot be used for joining (no rows match)")
}
)

Expand Down
31 changes: 28 additions & 3 deletions src/test/scala/de/frosner/ddq/core/CheckTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,9 @@ class CheckTest extends FlatSpec with Matchers with BeforeAndAfterEach with Befo
val ref = makeIntegerDf(List(1, 2, 5))
val check = Check(base).isJoinableWith(ref, "column" -> "column")
val constraint = check.constraints.head
val result = ConstraintSuccess("Column column->column can be used for joining (2 distinct rows match)")
val result = ConstraintSuccess("Column column->column can be used for joining (" +
"join columns cardinality in base table: 3, " +
"join columns cardinality after joining: 2 (67%)")
check.run().constraintResults shouldBe Map(constraint -> result)
}

Expand All @@ -480,7 +482,9 @@ class CheckTest extends FlatSpec with Matchers with BeforeAndAfterEach with Befo
val ref = makeIntegersDf(List(1, 2, 100), List(1, 5, 100))
val check = Check(base).isJoinableWith(ref, "column1" -> "column1", "column2" -> "column2")
val constraint = check.constraints.head
val result = ConstraintSuccess("Columns column1->column1, column2->column2 can be used for joining (one distinct row match)")
val result = ConstraintSuccess("Columns column1->column1, column2->column2 can be used for joining (" +
"join columns cardinality in base table: 2, " +
"join columns cardinality after joining: 1 (50%)")
check.run().constraintResults shouldBe Map(constraint -> result)
}

Expand All @@ -489,10 +493,31 @@ class CheckTest extends FlatSpec with Matchers with BeforeAndAfterEach with Befo
val ref = makeIntegersDf(List(1, 3, 100), List(1, 500, 100))
val check = Check(base).isJoinableWith(ref, "column1" -> "column1", "column3" -> "column2")
val constraint = check.constraints.head
val result = ConstraintSuccess("Columns column1->column1, column3->column2 can be used for joining (one distinct row match)")
val result = ConstraintSuccess("Columns column1->column1, column3->column2 can be used for joining (" +
"join columns cardinality in base table: 2, " +
"join columns cardinality after joining: 1 (50%)")
check.run().constraintResults shouldBe Map(constraint -> result)
}

it should "compute the matched keys in a non-commutative way" in {
val base = makeIntegerDf(List(1, 1, 1, 1, 1, 1, 1, 1, 1, 2))
val ref = makeIntegerDf(List(1))

val check1 = Check(base).isJoinableWith(ref, "column" -> "column")
val constraint1 = check1.constraints.head
val result1 = ConstraintSuccess("Column column->column can be used for joining (" +
"join columns cardinality in base table: 2, " +
"join columns cardinality after joining: 1 (50%)")
check1.run().constraintResults shouldBe Map(constraint1 -> result1)

val check2 = Check(ref).isJoinableWith(base, "column" -> "column")
val constraint2 = check2.constraints.head
val result2 = ConstraintSuccess("Column column->column can be used for joining (" +
"join columns cardinality in base table: 1, " +
"join columns cardinality after joining: 1 (100%)")
check2.run().constraintResults shouldBe Map(constraint2 -> result2)
}

it should "fail if a join on the given columns yields no result" in {
val base = makeIntegersDf(List(1, 2, 5), List(1, 2, 5), List(1, 100, 3))
val ref = makeIntegersDf(List(1, 1, 100), List(1, 10, 100))
Expand Down

0 comments on commit 2e70ef5

Please sign in to comment.