Skip to content

Commit

Permalink
Iceberg class demo container (high-performance-spark#132)
Browse files Browse the repository at this point in the history
* Start working on making docker container

* Upgrade to latest Iceberg, ignore coursier file, install Scala Jupyter kernel

* Install correct jupyter

* We want the scala kernel to point to Spark & include the class path for the examples

* More progress on the container

* Ok we need to use 2.13.8 since using .14 gives us some cats issues, roll back kernel to match .8, cross-mount iceberg-workshop, forward 8877, do some tricks for faster container builds

* Make directories for cross mount if not present, add jupyter-lab launch to bash history for folks that want to launch bash and then they can easily up arrow (and by folks I mean me)

* Shellcheck fixes for build container script.

* Use axel quietly so I can find things

* Match scala version of Spark

* More shellcheck fixes

* Make the wgets quiet too.

* oops missed one.

* Fix scala version

* Update for Spark 4 / Scala 2.13

* Bump sbt version (note see the spark-400 branch for the cherry picked parts)

* Use 2.13 target

* Match comet to regular build
  • Loading branch information
holdenk authored Aug 12, 2024
1 parent 37c085c commit b3db591
Show file tree
Hide file tree
Showing 23 changed files with 169 additions and 45 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ jobs:
java-version: 17
- name: Setup comet
run:
cd accelerators; SPARK_MAJOR=3.4 ./setup_comet.sh
cd accelerators; SPARK_MAJOR=3.5 ./setup_comet.sh
- name: Run comet
run:
cd accelerators; ./comet_ex.sh
Expand Down
9 changes: 9 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,12 @@ gluten*.jar
spark-3*hadoop*/
spark-3*hadoop*.tgz
accelerators/incubator-gluten
# ignore the temporary myapp from the dockerbuild
myapp.tar
# ignore glutten
incubator-glutten/*
# ignore nested build file.
project/build.sbt
coursier
# Magic file we use for build tracking
oldhash
70 changes: 70 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Open JDK11, Spark 3.X and the latest JDKs get a little spicy
FROM azul/zulu-openjdk:11-latest

RUN apt-get -qq update && \
apt-get -qq -y upgrade && \
apt-get -qq -y install gnupg software-properties-common locales curl tzdata apt-transport-https curl gnupg sudo net-tools psmisc htop && \
locale-gen en_US.UTF-8 && \
apt-get -qq -y install gnupg software-properties-common curl git-core wget axel python3 python3-pip nano emacs vim && \
echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | tee /etc/apt/sources.list.d/sbt.list && \
echo "deb https://repo.scala-sbt.org/scalasbt/debian /" | tee /etc/apt/sources.list.d/sbt_old.list && \
curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | gpg --no-default-keyring --keyring gnupg-ring:/etc/apt/trusted.gpg.d/scalasbt-release.gpg --import && \
chmod 644 /etc/apt/trusted.gpg.d/scalasbt-release.gpg && \
apt-get update && \
apt-get -qq -y install sbt && \
rm -rf /var/lib/apt/lists/*

RUN curl -Lo coursier https://git.io/coursier-cli
RUN chmod +x coursier
# ensure the JAR of the CLI is in the coursier cache, in the image
RUN ./coursier --help
RUN pip install jupyter
RUN ./coursier bootstrap \
-r jitpack \
-i user -I user:sh.almond:scala-kernel-api_2.13.8:0.14.0-RC4 \
sh.almond:scala-kernel_2.13.8:0.14.0-RC4 \
--default=true --sources \
-o almond && \
./almond --install --log info --metabrowse --id scala2.13 --display-name "Scala 2.13"
RUN chmod a+xr almond coursier
RUN ./coursier launch almond --scala 2.13.8 -- --install
# Fun story: this does not work (Aug 8 2024) because it tries to download Scala 2 from Scala 3
#RUN ./coursier install scala:2.13.8 && ./coursier install scalac:2.13.8
RUN (axel --quiet https://downloads.lightbend.com/scala/2.13.8/scala-2.13.8.deb || wget https://downloads.lightbend.com/scala/2.13.8/scala-2.13.8.deb) && dpkg --install scala-2.13.8.deb && rm scala-2.13.8.deb

RUN adduser dev
RUN adduser dev sudo
RUN echo 'dev:dev' | chpasswd
RUN mkdir -p ~dev
RUN cp ./coursier ~dev/
RUN echo "color_prompt=yes" >> ~dev/.bashrc
RUN echo "export force_color_prompt=yes" >> ~dev/.bashrc
RUN echo "export SPARK_HOME=/high-performance-spark-examples/spark-3.5.1-bin-hadoop3" >> ~dev/.bashrc
RUN chown -R dev ~dev
USER dev
# Kernels are installed in user so we need to run as the user
RUN ./almond --install --log info --metabrowse --id scala2.13 --display-name "Scala 2.13"
RUN ./coursier launch almond --scala 2.13.8 -- --install
USER root

RUN mkdir /high-performance-spark-examples
RUN chown -R dev /high-performance-spark-examples
WORKDIR /high-performance-spark-examples
# Increase the chance of caching by copying just the env setup file first.
COPY --chown=dev:dev env_setup.sh ./
# Downloads and installs Spark ~3.5 & Iceberg 1.4 and slipstreams the JAR in-place
# Also downloads some test data
RUN SCALA_VERSION=2.13 ./env_setup.sh
RUN mv ~dev/.local/share/jupyter/kernels/scala2.13/kernel.json ~dev/.local/share/jupyter/kernels/scala2.13/kernel.json_back
# Note: We need to use /home in the COPY otherwise no happy pandas
COPY --chown=dev:dev misc/kernel.json /home/dev/kernel.json_new
RUN mv ~dev/kernel.json_new ~dev/.local/share/jupyter/kernels/scala2.13/kernel.json
RUN git clone https://github.com/holdenk/spark-upgrade.git
RUN chown -R dev /high-performance-spark-examples
ADD --chown=dev:dev myapp.tar /high-performance-spark-examples/
RUN chown -R dev /high-performance-spark-examples
USER dev
RUN echo "jupyter-lab --ip 0.0.0.0 --port 8877" >> ~/.bash_history
RUN sbt clean compile
CMD ["jupyter-lab", "--ip", "0.0.0.0", "--port", "8877"]

4 changes: 2 additions & 2 deletions accelerators/comet_ex.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
set -ex

# If you change this update the workflow version too.
SPARK_MAJOR=${SPARK_MAJOR:-3.4}
SPARK_VERSION=3.4.2
SPARK_MAJOR=${SPARK_MAJOR:-3.5}
SPARK_VERSION=${SPARK_MAJOR}.1
export SPARK_MAJOR
export SPARK_VERSION

Expand Down
2 changes: 1 addition & 1 deletion accelerators/setup_comet.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ fi
# Build JAR if not present
if [ -z "$(ls arrow-datafusion-comet/spark/target/comet-spark-spark*.jar)" ]; then
cd arrow-datafusion-comet
make clean release PROFILES="-Pspark-${SPARK_MAJOR}"
make clean release PROFILES="-Pspark-${SPARK_MAJOR} -Pscala-2.13"
cd ..
fi
COMET_JAR="$(pwd)/$(ls arrow-datafusion-comet/spark/target/comet-spark-spark*SNAPSHOT.jar)"
Expand Down
14 changes: 9 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
scalaVersion := "2.13.8"

lazy val root = (project in file("."))
.aggregate(core, native)


organization := "com.highperformancespark"

//tag::addSparkScalaFix[]
ThisBuild / scalafixDependencies +=
"com.holdenkarau" %% "spark-scalafix-rules-2.4.8" % "0.1.5"
ThisBuild / scalafixDependencies +=
"com.github.liancheng" %% "organize-imports" % "0.6.0"
// Needs to be commented out post-upgrade because of Scala versions.
//ThisBuild / scalafixDependencies +=
// "com.holdenkarau" %% "spark-scalafix-rules-2.4.8" % "0.1.5"
//ThisBuild / scalafixDependencies +=
// "com.github.liancheng" %% "organize-imports" % "0.6.0"
//end::addSparkScalaFix[]

lazy val V = _root_.scalafix.sbt.BuildInfo

scalaVersion := V.scala212
addCompilerPlugin(scalafixSemanticdb)
scalacOptions ++= List(
"-Yrangepos",
Expand Down Expand Up @@ -67,6 +69,7 @@ lazy val core = (project in file("core")) // regular scala code with @native met
.settings(javah / target := (native / nativeCompile / sourceDirectory).value / "include")
.settings(sbtJniCoreScope := Compile)
.settings(
scalaVersion := "2.13.8",
javacOptions ++= Seq("-source", "1.8", "-target", "1.8"),
parallelExecution in Test := false,
fork := true,
Expand Down Expand Up @@ -97,6 +100,7 @@ lazy val core = (project in file("core")) // regular scala code with @native met
// JNI Magic!
lazy val native = (project in file("native")) // native code and build script
.settings(nativeCompile / sourceDirectory := sourceDirectory.value)
.settings(scalaVersion := "2.13.8")
.enablePlugins(JniNative) // JniNative needs to be explicitly enabled

//tag::xmlVersionConflict[]
Expand Down
20 changes: 20 additions & 0 deletions build_container.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/bash

set -ex

cp .git/index /tmp/git_index
export GIT_INDEX_FILE=/tmp/git_index
git add -u
hash=$(git write-tree)
unset GIT_INDEX_FILE
oldhash=$(cat oldhash || true)
if [ "$hash" = "$oldhash" ] && [ -f myapp.tar ]; then
echo "Skipping making tar since we match."
else
echo "Making tar since no match"
git archive -o myapp.tar --format=tar HEAD
echo "$hash" > oldhash
fi
IMAGE=holdenk/hps:0.1
docker buildx build --platform=linux/amd64,linux/arm64 -t "${IMAGE}" . --push
#docker buildx build --platform=linux/amd64 -t "${IMAGE}" . --push
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ class MixedDataset(sqlCtx: SQLContext) {

//tag::maxPandaSizePerZipScala[]
def maxPandaSizePerZipScala(ds: Dataset[RawPanda]): Dataset[(String, Double)] = {
ds.groupByKey(rp => rp.zip).mapGroups{ case (g, iter) =>
def groupMapFun(g: String, iter: Iterator[RawPanda]): (String, Double) = {
(g, iter.map(_.attributes(2)).reduceLeft(Math.max(_, _)))
}
ds.groupByKey(rp => rp.zip).mapGroups(groupMapFun)
}
//end::maxPandaSizePerZipScala[]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ case class RawPanda(id: Long, zip: String, pt: String,
happy: Boolean, attributes: Array[Double]) {
override def equals(o: Any) = o match {
case other: RawPanda => (id == other.id && pt == other.pt &&
happy == other.happy && attributes.deep == other.attributes.deep)
happy == other.happy && attributes.sameElements(other.attributes))
case _ => false
}
override def hashCode(): Int = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package com.highperformancespark.examples.goldilocks

import scala.collection.Map
import scala.collection.mutable
import scala.collection.mutable.MutableList

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
Expand Down Expand Up @@ -254,7 +253,7 @@ object GoldilocksFirstTry {
// to sort the partitionsColumnsFreq array by the partition index (the
// first value in the tuple).
partitionColumnsFreq.sortBy(_._1).map { case (partitionIndex, columnsFreq) =>
val relevantIndexList = new MutableList[(Int, Long)]()
val relevantIndexList = new mutable.ListBuffer[(Int, Long)]()

columnsFreq.zipWithIndex.foreach{ case (colCount, colIndex) =>
val runningTotalCol = runningTotal(colIndex)
Expand Down Expand Up @@ -293,8 +292,8 @@ object GoldilocksFirstTry {
(partitionIndex : Int, valueColumnPairs : Iterator[(Double, Int)]) => {
val targetsInThisPart: List[(Int, Long)] = ranksLocations(partitionIndex)._2
if (targetsInThisPart.nonEmpty) {
val columnsRelativeIndex: Map[Int, List[Long]] =
targetsInThisPart.groupBy(_._1).mapValues(_.map(_._2))
val columnsRelativeIndex: collection.MapView[Int, List[Long]] =
targetsInThisPart.groupBy(_._1).mapValues(_.map(_._2))
val columnsInThisPart = targetsInThisPart.map(_._1).distinct

val runningTotals : mutable.HashMap[Int, Long]= new mutable.HashMap()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ object GoldilocksWithHashMap {
val runningTotal = Array.fill[Long](numOfColumns)(0)

partitionColumnsFreq.sortBy(_._1).map { case (partitionIndex, columnsFreq)=>
val relevantIndexList = new mutable.MutableList[(Int, Long)]()
val relevantIndexList = new mutable.ListBuffer[(Int, Long)]()

columnsFreq.zipWithIndex.foreach{ case (colCount, colIndex) =>
val runningTotalCol = runningTotal(colIndex)
Expand Down Expand Up @@ -303,7 +303,7 @@ object FindTargetsSubRoutine extends Serializable {
def withArrayBuffer(valueColumnPairsIter : Iterator[((Double, Int), Long)],
targetsInThisPart: List[(Int, Long)] ): Iterator[(Int, Double)] = {

val columnsRelativeIndex: Predef.Map[Int, List[Long]] =
val columnsRelativeIndex: collection.MapView[Int, List[Long]] =
targetsInThisPart.groupBy(_._1).mapValues(_.map(_._2))

// The column indices of the pairs that are desired rank statistics that live in
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package com.highperformancespark.examples.ml

import scala.collection.Map
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.MutableList

import org.apache.spark._
import org.apache.spark.ml._
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package com.highperformancespark.examples.ml

import scala.collection.Map
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.MutableList

import org.apache.spark._
import org.apache.spark.ml._
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
package com.highperformancespark.examples.ml

import scala.collection.Map
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.MutableList

import org.apache.spark._
import org.apache.spark.ml._
import org.apache.spark.ml.classification._
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package com.highperformancespark.examples.mllib

import scala.collection.Map
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.MutableList

import org.apache.spark._
import org.apache.spark.mllib.classification.LogisticRegressionModel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ object SimplePerfTest {
println(dataFrameTimeings.map(_._2).mkString(","))
}

def testOnRDD(rdd: RDD[(Int, Double)]) = {
rdd.map{case (x, y) => (x, (y, 1))}
.reduceByKey{case (x, y) => (x._1 + y._1, x._2 + y._2)}.count()
def testOnRDD(rdd: RDD[(Int, Double)]): Long = {
val kvc: RDD[(Int, (Double , Int))] = rdd.map{case (x, y) => (x, (y, 1))}
kvc.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).count()
}

def groupOnRDD(rdd: RDD[(Int, Double)]) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ class HappyPandasTest extends AnyFunSuite with DataFrameSuiteBase {
.flatMap(zipPandas => {
val pandas = zipPandas._2
val length = pandas.size - 1
val result = new mutable.MutableList[Row]
val result = new mutable.ListBuffer[Row]

for (i <- 0 to length) {
var totalSum = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ class QuantileOnlyArtisanalTest extends AnyFunSuite with BeforeAndAfterAll {
val inputAsKeyValuePairs = GoldilocksGroupByKey.mapToKeyValuePairs(input)
val groupByKeySolution = GoldilocksGroupByKey.findRankStatistics(
inputAsKeyValuePairs, List(2L,3L)).mapValues(_.toSet)
assert(whileLoopSolution == expectedResult)
assert(groupByKeySolution == expectedResult)
assert(whileLoopSolution.toMap == expectedResult)
assert(groupByKeySolution.toMap == expectedResult)
}

override def afterAll() {
Expand Down Expand Up @@ -136,7 +136,7 @@ class QuantileOnlyArtisanalTestContinued extends QuantileOnlyArtisanalTest {
val secondarySortSolution =
GoldilocksWithHashMap.findRankStatistics(
input, targetRanks = List(2L, 3L)).mapValues(_.toSet)
assert(secondarySortSolution == expectedResult)
assert(secondarySortSolution.toMap == expectedResult)
}

test("Secondary Sort"){
Expand Down
14 changes: 9 additions & 5 deletions env_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,29 @@ set -ex
# Download Spark and iceberg if not present
SPARK_MAJOR=${SPARK_MAJOR:-"3.5"}
SPARK_VERSION=${SPARK_VERSION:-"${SPARK_MAJOR}.1"}
SCALA_VERSION=${SCALA_VERSION:-"2.12"}
SCALA_VERSION=${SCALA_VERSION:-"2.13"}
HADOOP_VERSION="3"
SPARK_PATH="$(pwd)/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}"
SPARK_FILE="spark-${SPARK_VERSION}-bin-hadoop3.tgz"
ICEBERG_VERSION=${ICEBERG_VERSION:-"1.4.0"}
if [ "$SCALA_VERSION" = "2.13" ]; then
SPARK_FILE="spark-${SPARK_VERSION}-bin-hadoop3-scala2.13.tgz"
SPARK_PATH="$(pwd)/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}-scala2.13"
fi
ICEBERG_VERSION=${ICEBERG_VERSION:-"1.6.0"}
if [ ! -f "${SPARK_FILE}" ]; then
SPARK_DIST_URL="https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/${SPARK_FILE}"
SPARK_ARCHIVE_DIST_URL="https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/${SPARK_FILE}"
if command -v axel &> /dev/null
then
(axel "$SPARK_DIST_URL" || axel "$SPARK_ARCHIVE_DIST_URL") &
(axel --quiet "$SPARK_DIST_URL" || axel --quiet "$SPARK_ARCHIVE_DIST_URL") &
else
(wget "$SPARK_DIST_URL" || wget "$SPARK_ARCHIVE_DIST_URL") &
(wget --quiet "$SPARK_DIST_URL" || wget --quiet "$SPARK_ARCHIVE_DIST_URL") &
fi
fi
# Download Icberg if not present
ICEBERG_FILE="iceberg-spark-runtime-${SPARK_MAJOR}_${SCALA_VERSION}-${ICEBERG_VERSION}.jar"
if [ ! -f "${ICEBERG_FILE}" ]; then
wget "https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-spark-runtime-${SPARK_MAJOR}_${SCALA_VERSION}/${ICEBERG_VERSION}/${ICEBERG_FILE}" -O "${ICEBERG_FILE}" &
wget --quiet "https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-spark-runtime-${SPARK_MAJOR}_${SCALA_VERSION}/${ICEBERG_VERSION}/${ICEBERG_FILE}" -O "${ICEBERG_FILE}" &
fi
wait
sleep 1
Expand Down
19 changes: 19 additions & 0 deletions misc/kernel.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"argv": [
"java",
"-cp",
"/home/dev/.local/share/jupyter/kernels/scala2.13/launcher.jar:.:/high-performance-spark-examples/:/high-performance-spark-examples/target/scala-2.13/home/dev/.local/share/jupyter/kernels/scala/launcher.jar:/high-performance-spark-examples/spark-3.5.1-bin-hadoop3-scala2.13/jars/*",
"coursier.bootstrap.launcher.Launcher",
"--log",
"info",
"--metabrowse",
"--id",
"scala2.13",
"--display-name",
"Scala 2.13 (w/ Spark 3.5 & Iceberg 1.6)",
"--connection-file",
"{connection_file}"
],
"display_name": "Scala 2.13 (w/ Spark 3.5 & Iceberg 1.6)",
"language": "scala"
}
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.9.6
sbt.version=1.9.9
7 changes: 7 additions & 0 deletions run_container.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/bash
set -ex
./build_container.sh
docker image pull holdenk/hps:0.1
mkdir -p warehouse
mkdir -p iceberg-workshop
docker container run --mount type=bind,source="$(pwd)"/warehouse,target=/warehouse --mount type=bind,source="$(pwd)/iceberg-workshop",target=/high-performance-spark-examples/iceberg-workshop -p 8877:8877 -it holdenk/hps:0.1 /bin/bash
Loading

0 comments on commit b3db591

Please sign in to comment.