Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
piffall committed May 3, 2022
2 parents 1ea3c25 + ce901a1 commit 9f2870d
Show file tree
Hide file tree
Showing 60 changed files with 647 additions and 354 deletions.
21 changes: 21 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
default_stages:
- commit
- push
fail_fast: true
repos:
- repo: local
hooks:
- id: scalafmt-test
name: Scalafmt Fixes
pass_filenames: false
language: system
entry: scalafmt
always_run: true
- id: scalac-lint
name: Scala lint
language: system
always_run: true
pass_filenames: false
verbose: true
entry: sbt
args: [ '; clean ; set scalacOptions ++= Seq("-Xfatal-warnings", "-Wconf:any:warning-verbose") ; compile' ]
42 changes: 42 additions & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
version=2.7.5
align {
preset = some
}
optIn.configStyleArguments = false
maxColumn = 90
newlines {
neverInResultType = false
alwaysBeforeElseAfterCurlyIf = false
topLevelStatements = [before,after]
implicitParamListModifierPrefer = before
avoidForSimpleOverflow = [tooLong]
}
danglingParentheses {
defnSite = false
callSite = false
ctrlSite = false
}
align {
openParenCallSite = false
openParenDefnSite = true
}
docstrings {
blankFirstLine = yes
style = SpaceAsterisk
}
continuationIndent {
extendSite = 2
withSiteRelativeToExtends = 2
ctorSite = 4
callSite = 2
defnSite = 4
}
runner {
optimizer {
forceConfigStyleOnOffset = 45
forceConfigStyleMinArgCount = 5
}
}
verticalMultiline {
newlineAfterOpenParen = false
}
51 changes: 20 additions & 31 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,15 @@ val dependencies = Seq(
"org.apache.spark" %% "spark-hive" % sparkVersion % Provided,
"org.apache.spark" %% "spark-avro" % sparkVersion % Provided,
"io.delta" %% "delta-core" % "0.7.0" % Provided,
"com.typesafe" % "config" % "1.3.2"
)
"com.typesafe" % "config" % "1.3.2")

val testDependencies = Seq(
"org.scalatest" %% "scalatest" % "3.0.5" % Test,
"org.scalamock" %% "scalamock" % "4.1.0" % Test,
"com.holdenkarau" %% "spark-testing-base" % s"${sparkTestVersion}_0.14.0" % Test
)
"com.holdenkarau" %% "spark-testing-base" % s"${sparkTestVersion}_0.14.0" % Test)

import xerial.sbt.Sonatype._

val settings = Seq(
organization := "com.damavis",
version := "0.3.10",
Expand All @@ -29,30 +28,27 @@ val settings = Seq(
libraryDependencies ++= dependencies ++ testDependencies,
fork in Test := true,
parallelExecution in Test := false,
envVars in Test := Map(
"MASTER" -> "local[*]"
),
envVars in Test := Map("MASTER" -> "local[*]"),
test in assembly := {},
// Sonatype
sonatypeProfileName := "com.damavis",
sonatypeProjectHosting := Some(
GitHubHosting("damavis", "damavis-spark", "[email protected]")),
publishMavenStyle := true,
licenses := Seq(
"APL2" -> url("http://www.apache.org/licenses/LICENSE-2.0.txt")),
licenses := Seq("APL2" -> url("http://www.apache.org/licenses/LICENSE-2.0.txt")),
developers := List(
Developer(id = "piffall",
name = "Cristòfol Torrens",
email = "[email protected]",
url = url("http://piffall.com")),
Developer(id = "priera",
name = "Pedro Riera",
email = "pedro.riera at damavis dot com",
url = url("http://github.com/priera")),
),
Developer(
id = "piffall",
name = "Cristòfol Torrens",
email = "[email protected]",
url = url("http://piffall.com")),
Developer(
id = "priera",
name = "Pedro Riera",
email = "pedro.riera at damavis dot com",
url = url("http://github.com/priera"))),
publishTo := sonatypePublishToBundle.value,
credentials += Publish.credentials
)
credentials += Publish.credentials)

lazy val root = (project in file("."))
.settings(name := "damavis-spark")
Expand All @@ -63,25 +59,18 @@ lazy val root = (project in file("."))
lazy val core = (project in file("damavis-spark-core"))
.settings(settings)
.settings(name := "damavis-spark-core")
.settings(
crossScalaVersions := supportedScalaVersions,
)
.settings(crossScalaVersions := supportedScalaVersions)

lazy val azure = (project in file("damavis-spark-azure"))
.settings(settings)
.settings(name := "damavis-spark-azure")
.settings(
crossScalaVersions := supportedScalaVersions,
)
.settings(crossScalaVersions := supportedScalaVersions)
.dependsOn(core)

lazy val snowflake = (project in file("damavis-spark-snowflake"))
.settings(settings)
.settings(name := "damavis-spark-snowflake")
.settings(
crossScalaVersions := supportedScalaVersions,
libraryDependencies ++= Seq(
"net.snowflake" %% "spark-snowflake" % "2.8.2-spark_3.0"
)
)
.dependsOn(core)
libraryDependencies ++= Seq("net.snowflake" %% "spark-snowflake" % "2.8.2-spark_3.0"))
.dependsOn(core)
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ import com.damavis.spark.resource.ResourceReader
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{DataFrame, SparkSession}

class SynapseReader(url: URL, query: String, tempDir: Path)(
implicit spark: SparkSession)
extends ResourceReader {
class SynapseReader(url: URL, query: String, tempDir: Path)(implicit spark: SparkSession)
extends ResourceReader {

override def read(): DataFrame = {
spark.read
Expand All @@ -19,4 +18,5 @@ class SynapseReader(url: URL, query: String, tempDir: Path)(
.option("query", query)
.load()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ import com.damavis.spark.resource.ResourceWriter
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{DataFrame, SparkSession}

class SynapseWriter(url: URL, table: String, tempDir: Path)(
implicit spark: SparkSession)
extends ResourceWriter {
class SynapseWriter(url: URL, table: String, tempDir: Path)(implicit spark: SparkSession)
extends ResourceWriter {

override def write(data: DataFrame): Unit = {
data.write
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ trait SparkApp extends SparkConf {
else {
val master = spark.conf.get("spark.master")
val localCores = "local\\[(\\d+|\\*)\\]".r.findAllIn(master)
if (localCores.hasNext) localCores.group(1) match {
case "*" => sys.runtime.availableProcessors()
case x => x.toInt
if (localCores.hasNext) {
localCores.group(1) match {
case "*" => sys.runtime.availableProcessors()
case x => x.toInt
}
} else {
1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ import org.slf4j.LoggerFactory
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}

class Database(
db: SparkDatabase,
fs: FileSystem,
protected[database] val catalog: Catalog)(implicit spark: SparkSession) {
class Database(db: SparkDatabase,
fs: FileSystem,
protected[database] val catalog: Catalog)(implicit spark: SparkSession) {

private lazy val logger = LoggerFactory.getLogger(this.getClass)

Expand Down Expand Up @@ -59,9 +58,7 @@ class Database(
}
}

def getUnmanagedTable(name: String,
path: String,
format: Format): Try[Table] = {
def getUnmanagedTable(name: String, path: String, format: Format): Try[Table] = {
Try {
val dbPath = parseAndCheckTableName(name)
val actualName = dbPath._2
Expand Down Expand Up @@ -142,7 +139,7 @@ class Database(

logger.info(
s"Table partitioned by ${catalogTable.partitionColumnNames.mkString("[", ",", "]")}")
catalogTable.schema.printTreeString()
logger.info(catalogTable.schema.treeString)

// This block of code is necessary because Databricks runtime do not
// provide DeltaTableUtils.
Expand All @@ -163,9 +160,7 @@ class Database(
logger.warn("Keeping catalog only data")
catalogTable
case ue: Throwable =>
logger.error(
"Could not combine catalog and delta meta, Unknown Cause: ",
ue)
logger.error("Could not combine catalog and delta meta, Unknown Cause: ", ue)
logger.warn("Keeping catalog only data")
catalogTable
}
Expand All @@ -178,10 +173,11 @@ class Database(
val partitions = metadata.partitionColumnNames

val columns = metadata.schema.map(field => {
Column(field.name,
field.dataType.simpleString,
partitions.contains(field.name),
field.nullable)
Column(
field.name,
field.dataType.simpleString,
partitions.contains(field.name),
field.nullable)

})
columns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@ object DbManager {

new Database(db, HadoopFS(), catalog)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@ package com.damavis.spark.database

import com.damavis.spark.resource.Format.Format

case class Column(name: String,
dataType: String,
partitioned: Boolean,
nullable: Boolean)
case class Column(name: String, dataType: String, partitioned: Boolean, nullable: Boolean)

sealed trait Table {
def database: String
Expand All @@ -24,7 +21,7 @@ case class RealTable(database: String,
format: Format,
managed: Boolean,
columns: Seq[Column])
extends Table
extends Table

case class DummyTable(database: String, name: String) extends Table {
override def path: String = ???
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.damavis.spark.database.exceptions

class DatabaseNotFoundException(name: String)
extends Exception(s"""Database "$name" not found in catalog""") {}
extends Exception(s"""Database "$name" not found in catalog""") {}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.damavis.spark.database.exceptions

class InvalidDatabaseNameException(name: String)
extends Exception(s""""$name" is not a valid database name""") {}
extends Exception(s""""$name" is not a valid database name""") {}
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
package com.damavis.spark.database.exceptions

class TableDefinitionException(val table: String, msg: String)
extends Exception(msg) {}
class TableDefinitionException(val table: String, msg: String) extends Exception(msg) {}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.damavis.spark.dataflow

class DataFlow(definition: DataFlowDefinition) {

def run(): Unit = {
for (source <- definition.sources)
source.compute()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.damavis.spark.dataflow

class DataFlowSource(processor: SourceProcessor)
extends DataFlowStage(processor) {
class DataFlowSource(processor: SourceProcessor) extends DataFlowStage(processor) {

override def ->(stage: StageSocket)(
implicit definition: DataFlowDefinition): DataFlowStage = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.damavis.spark.dataflow

object DataFlowStage {

def apply(processor: Processor): DataFlowStage =
new DataFlowStage(processor)

}

class DataFlowStage(private val processor: Processor) {
Expand All @@ -13,6 +15,7 @@ class DataFlowStage(private val processor: Processor) {

protected val sockets: SocketSet =
SocketSet(new StageSocket(this), new StageSocket(this))

private var deliverySocket: StageSocket = _

protected def toRun: Boolean = _toRun
Expand All @@ -38,8 +41,7 @@ class DataFlowStage(private val processor: Processor) {

}

def ->(socket: StageSocket)(
implicit definition: DataFlowDefinition): DataFlowStage = {
def ->(socket: StageSocket)(implicit definition: DataFlowDefinition): DataFlowStage = {
//TODO: disallow assignment to stages already connected

if (this == socket.stage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ abstract class JoinProcessor extends Processor {

override def compute(sockets: SocketSet): DataFrame =
computeImpl(sockets.left.get, sockets.right.get)

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package com.damavis.spark.dataflow

import org.apache.spark.sql.DataFrame

abstract class LinealProcessor extends Processor {
Expand All @@ -8,4 +9,5 @@ abstract class LinealProcessor extends Processor {
override def compute(sockets: SocketSet): DataFrame = {
computeImpl(sockets.left.get)
}

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package com.damavis.spark.dataflow

import org.apache.spark.sql.DataFrame

abstract class SourceProcessor extends Processor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import scala.language.implicitConversions
package object dataflow {

object implicits {

implicit def defaultSocketOfStage(stage: DataFlowStage): StageSocket =
stage.left

Expand All @@ -32,6 +33,7 @@ package object dataflow {

target.left
}

}

}
Loading

0 comments on commit 9f2870d

Please sign in to comment.