Skip to content

Commit

Permalink
Squash of adding initial autotune for number of executors. This also …
Browse files Browse the repository at this point in the history
…removes a lot of JSON code and drops handling for the classic SparkLens file format instead folks should directly use the history files

part2
  • Loading branch information
holdenk committed May 9, 2024
1 parent ce8e9f9 commit a49ee50
Show file tree
Hide file tree
Showing 34 changed files with 2,385 additions and 1,420 deletions.
15 changes: 15 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,18 @@ distMaven/

# Newt
/.newt-cache/

# Spark stuff used for e2e testing
*.jar
*.tgz
data/
spark-*-bin-hadoop*/
derby.log
warehouse/
metastore_db

# metals
project/.bloop
project/metals.sbt
.metals
.bsp
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ libraryDependencies += "org.apache.httpcomponents" % "httpclient" % "4.5.13" % "

libraryDependencies += "org.apache.httpcomponents" % "httpmime" % "4.5.13" % "provided"

libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.9" % "test"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.17" % "test"
libraryDependencies += "org.junit.platform" % "junit-platform-engine" % "1.6.3" % "test"
libraryDependencies += "org.junit.platform" % "junit-platform-launcher" % "1.6.3" % "test"

Expand Down
14 changes: 14 additions & 0 deletions e2e/partioned_table_join.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CREATE TABLE IF NOT EXISTS local.developers (
username string,
firstname string,
lastname string)
USING iceberg
PARTITIONED BY (username);
CREATE TABLE IF NOT EXISTS local.projects (
creator string,
projectname string)
USING iceberg
PARTITIONED BY (creator);
INSERT INTO local.developers VALUES("krisnova", "Kris", "Nova");
INSERT INTO local.projects VALUES("krisnova", "aurae");
SELECT * FROM local.developers INNER JOIN local.projects ON local.projects.creator = local.developers.username;
66 changes: 66 additions & 0 deletions env_setup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#!/bin/bash

set -ex

# Download Spark and iceberg if not present
SPARK_MAJOR=${SPARK_MAJOR:-"3.3"}
SPARK_VERSION=${SPARK_VERSION:-"${SPARK_MAJOR}.4"}
SCALA_VERSION=${SCALA_VERSION:-"2.12"}
HADOOP_VERSION="3"
SPARK_PATH="$(pwd)/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}"
SPARK_FILE="spark-${SPARK_VERSION}-bin-hadoop3.tgz"
ICEBERG_VERSION=${ICEBERG_VERSION:-"1.4.0"}
if [ ! -f "${SPARK_FILE}" ]; then
SPARK_DIST_URL="https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/${SPARK_FILE}"
SPARK_ARCHIVE_DIST_URL="https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/${SPARK_FILE}"
if command -v axel &> /dev/null
then
(axel "$SPARK_DIST_URL" || axel "$SPARK_ARCHIVE_DIST_URL") &
else
(wget "$SPARK_DIST_URL" || wget "$SPARK_ARCHIVE_DIST_URL") &
fi
fi
# Download Icberg if not present
ICEBERG_FILE="iceberg-spark-runtime-${SPARK_MAJOR}_${SCALA_VERSION}-${ICEBERG_VERSION}.jar"
if [ ! -f "${ICEBERG_FILE}" ]; then
wget "https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-spark-runtime-${SPARK_MAJOR}_${SCALA_VERSION}/${ICEBERG_VERSION}/${ICEBERG_FILE}" -O "${ICEBERG_FILE}" &
fi
# See https://www.mail-archive.com/[email protected]/msg27796.html
BC_FILE="bcpkix-jdk15on-1.68.jar"
BC_PROV_FILE="bcprov-jdk15on-1.68.jar"
if [ ! -f "${BC_FILE}" ]; then
wget https://repo1.maven.org/maven2/org/bouncycastle/bcpkix-jdk15on/1.68/bcpkix-jdk15on-1.68.jar
fi
if [ ! -f "${BC_PROV_FILE}" ]; then
wget https://repo1.maven.org/maven2/org/bouncycastle/bcprov-jdk15on/1.68/bcprov-jdk15on-1.68.jar
fi
wait
sleep 1
# Setup the env
if [ ! -d "${SPARK_PATH}" ]; then
tar -xf "${SPARK_FILE}"
fi

SPARK_HOME="${SPARK_PATH}"
export SPARK_HOME

if [ ! -f "${SPARK_PATH}/jars/${ICEBERG_FILE}" ]; then
# Delete the old JAR first.
rm "${SPARK_PATH}/jars/iceberg-spark-runtime*.jar" || echo "No old version to delete."
cp "${ICEBERG_FILE}" "${SPARK_PATH}/jars/${ICEBERG_FILE}"
fi

# Copy boncy castle for Kube
cp bc*jdk*.jar ${SPARK_PATH}/jars/

# Set up for running pyspark and friends
export PATH="${SPARK_PATH}:${SPARK_PATH}/python:${SPARK_PATH}/bin:${SPARK_PATH}/sbin:${PATH}"

# Make sure we have a history directory
mkdir -p /tmp/spark-events

mkdir -p ./data/fetched/
if [ ! -f ./data/fetched/2021 ]; then
wget "https://gender-pay-gap.service.gov.uk/viewing/download-data/2021" -O ./data/fetched/2021
fi

72 changes: 72 additions & 0 deletions run_e2e.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/bin/bash

# Runs an end to end test, updates the output

set -ex

rm -rf /tmp/spark-events

source env_setup.sh


sql_file=e2e/partioned_table_join.sql
EXTENSIONS=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

# Run a simple local e2e example

mkdir -p /tmp/spark-events
${SPARK_HOME}/bin/spark-sql --master local[5] \
--conf spark.eventLog.enabled=true \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
--conf spark.sql.catalog.spark_catalog.type=hive \
--conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.local.type=hadoop \
--conf "spark.sql.catalog.local.warehouse=$PWD/warehouse" \
--name "fresh" \
-f "${sql_file}"


cp /tmp/spark-events/* ./src/test/event-history-test-files/local-fresh

# Same example but with dynamic allocation turned on IF AND ONLY IF we have a kube cluster? idk.

source setup_micro_k8s.sh

if [ -n "${kube_host}" ]; then

rm -rf /tmp/spark-events
# This warehouse path sort of works but not really, ideally we'd point to s3/minio but it's enough
# for now.
WAREHOUSE_PATH=${WAREHOUSE_PATH:-/tmp/warehouse}
rm -rf ${WAREHOUSE_PATH}
mkdir -p /tmp/spark-events
${SPARK_HOME}/bin/spark-sql --master "k8s://${kube_host}" \
--deploy-mode client \
--conf spark.eventLog.enabled=true \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
--conf spark.sql.catalog.spark_catalog.type=hive \
--conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.local.type=hadoop \
--conf spark.dynamicAllocation.enabled=true \
--conf "spark.sql.catalog.local.warehouse=/tmp/warehouse" \
--conf spark.kubernetes.container.image=${image} \
--conf spark.kubernetes.authenticate.caCertFile=${cert_path} \
--conf spark.kubernetes.authenticate.submission.caCertFile=${cert_path} \
--conf spark.dynamicAllocation.shuffleTracking.enabled=true \
--name "fresh-kube" \
-f "${sql_file}"

cp /tmp/spark-events/* ./src/test/event-history-test-files/local-fresh-dynamic

# TODO: Register trap for this.
cp ~/.kube/${backup_config} ~/.kube/config
else
echo "Not updating dynamically scaled version"
fi

# Run only the tests that use the "fresh" output
# that is the one where we run Spark as part of the test above
# This allows us to test in CI we support different history file versions provided we're compiled
# for that version.
sbt -DsparkVersion=${SPARK_VERSION} ";clean;compile;testOnly com.qubole.sparklens.app.EventHistoryFileReportingSuite -- -z fresh"

117 changes: 117 additions & 0 deletions scalastyle-config.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
<scalastyle>
<name>Scalastyle standard configuration</name>
<check level="warning" class="org.scalastyle.file.FileTabChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.file.FileLengthChecker" enabled="true">
<parameters>
<parameter name="maxFileLength"><![CDATA[800]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.file.HeaderMatchesChecker" enabled="false">
<parameters>
<parameter name="header"><![CDATA[// Copyright (C) 2011-2012 the original author or authors.
// See the LICENCE.txt file distributed with this work for additional
// information regarding copyright ownership.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.file.WhitespaceEndOfLineChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.scalariform.SpacesBeforePlusChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.file.FileLineLengthChecker" enabled="true">
<parameters>
<parameter name="maxLineLength"><![CDATA[200]]></parameter>
<parameter name="tabSize"><![CDATA[4]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.ClassNamesChecker" enabled="true">
<parameters>
<parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.ObjectNamesChecker" enabled="true">
<parameters>
<parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.PackageObjectNamesChecker" enabled="true">
<parameters>
<parameter name="regex"><![CDATA[^[a-z][A-Za-z]*$]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.EqualsHashCodeChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true">
<parameters>
<parameter name="illegalImports"><![CDATA[sun._,java.awt._]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
<parameters>
<parameter name="maxParameters"><![CDATA[8]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="false">
<parameters>
<parameter name="ignore"><![CDATA[-1,0,1,2,3]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.NoWhitespaceBeforeLeftBracketChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.scalariform.NoWhitespaceAfterLeftBracketChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.scalariform.ReturnChecker" enabled="false"></check>
<check level="warning" class="org.scalastyle.scalariform.NullChecker" enabled="false"></check>
<check level="warning" class="org.scalastyle.scalariform.NoCloneChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.scalariform.NoFinalizeChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.scalariform.CovariantEqualsChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.scalariform.StructuralTypeChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.file.RegexChecker" enabled="false">
<parameters>
<parameter name="regex"><![CDATA[println]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.NumberOfTypesChecker" enabled="true">
<parameters>
<parameter name="maxTypes"><![CDATA[30]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.CyclomaticComplexityChecker" enabled="true">
<parameters>
<parameter name="maximum"><![CDATA[10]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.UppercaseLChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.scalariform.SimplifyBooleanExpressionChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.scalariform.IfBraceChecker" enabled="true">
<parameters>
<parameter name="singleLineAllowed"><![CDATA[true]]></parameter>
<parameter name="doubleLineAllowed"><![CDATA[false]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.MethodLengthChecker" enabled="true">
<parameters>
<parameter name="maxLength"><![CDATA[50]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.MethodNamesChecker" enabled="true">
<parameters>
<parameter name="regex"><![CDATA[^[a-z][A-Za-z0-9]*$]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.NumberOfMethodsInTypeChecker" enabled="true">
<parameters>
<parameter name="maxMethods"><![CDATA[30]]></parameter>
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="false"></check>
<check level="warning" class="org.scalastyle.file.NewLineAtEofChecker" enabled="false"></check>
<check level="warning" class="org.scalastyle.file.NoNewLineAtEofChecker" enabled="false"></check>
</scalastyle>
37 changes: 37 additions & 0 deletions setup_micro_k8s.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
if [command -v microk8s ]; then
echo "microk8s already installed yay!"
elif [ command -v snap ]; then
sudo snap install microk8s --classic
MYUSER=$USER
sudo usermod -a -G microk8s $MYUSER
sudo chown -R $MYUSER ~/.kube
newgrp microk8s
elif [ command -v homebrew ]; then
brew install ubuntu/microk8s/microk8s
microk8s install
echo "Find your cert (see https://www.waitingforcode.com/apache-spark/setting-up-apache-spark-kubernetes-microk8s/read)"
exit 1
fi
if [ command -v microk8s ]; then
sudo microk8s status --wait-ready
sudo microk8s enable dns
sudo microk8s enable dashboard
sudo microk8s enable storage
sudo microk8s enable registry
fi
kube_host=$(sudo microk8s kubectl cluster-info |grep control |grep http |sed -ne 's/.*\(http[^"]*\).*/\1/p' | cut -d \' -f 2)
if [ -n "${kube_host}" ]; then
cd "${SPARK_HOME}"
kube_host="https://127.0.0.1:16443"
repo=localhost:32000/local-spark
tag=magic
image="${repo}/spark:${tag}"
./bin/docker-image-tool.sh -r "localhost:32000/local-spark" -t ${tag} build
cert_path=/var/snap/microk8s/current/certs/ca.crt
backup_config="config-$(date +%s)"
cp ~/.kube/config ~/.kube/${backup_config}
# I don't love this but kubeconfig & KUBECONFIG don't seemt to be respected by Spark.
sudo microk8s config > ~/.kube/config
docker push "${image}"
cd -
fi
15 changes: 14 additions & 1 deletion src/main/scala/com/qubole/sparklens/QuboleJobListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener {
protected val stageIDToJobID = new mutable.HashMap[Int, Long]
protected val failedStages = new ListBuffer[String]
protected val appMetrics = new AggregateMetrics()
protected var jobConf: Option[Map[String, String]] = None

private def hostCount():Int = hostMap.size

Expand Down Expand Up @@ -130,6 +131,16 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener {
}
}

override def onOtherEvent(event: SparkListenerEvent): Unit = {
}

override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = {
// For now we just grab the first Spark conf.
if (jobConf == None) {
jobConf = environmentUpdate.environmentDetails.get("Spark Properties").map(_.toMap)
}
}

override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
//println(s"Application ${applicationStart.appId} started at ${applicationStart.time}")
appInfo.applicationID = applicationStart.appId.getOrElse("NA")
Expand Down Expand Up @@ -161,7 +172,9 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener {
jobMap,
jobSQLExecIDMap,
stageMap,
stageIDToJobID)
stageIDToJobID,
jobConf
)

EmailReportHelper.generateReport(appContext.toString(), sparkConf)
AppAnalyzer.startAnalyzers(appContext)
Expand Down
Loading

0 comments on commit a49ee50

Please sign in to comment.