Skip to content

Commit c099533

Browse files
committed
[AURON #2017] [BUILD] Add Spark 4.x support to dev/reformat script.
Signed-off-by: slfan1989 <slfan1989@apache.org>
1 parent 1c4a4ae commit c099533

8 files changed

Lines changed: 66 additions & 12 deletions

File tree

.github/workflows/style.yml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,16 @@ jobs:
4646
java-version: 8
4747
cache: 'maven'
4848
check-latest: false
49+
- name: Capture JDK 8 home
50+
run: echo "JAVA_HOME_8_X64=$JAVA_HOME" >> "$GITHUB_ENV"
51+
- name: Setup JDK 17
52+
uses: actions/setup-java@v5
53+
with:
54+
distribution: 'adopt-hotspot'
55+
java-version: 17
56+
cache: 'maven'
57+
check-latest: false
58+
- name: Capture JDK 17 home
59+
run: echo "JAVA_HOME_17_X64=$JAVA_HOME" >> "$GITHUB_ENV"
4960
- run: |
5061
./dev/reformat --check

dev/reformat

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,37 @@ done
3232

3333
MODE=pre
3434
SCALA_PROFILE=scala-2.12
35+
ORIGINAL_PATH="${PATH}"
36+
function set_java_home() {
37+
local version="$1"
38+
local mac_version="$version"
39+
local candidate=""
40+
if [[ "${version}" == "8" ]]; then
41+
mac_version="1.8"
42+
fi
43+
44+
if [[ "$(uname)" == "Darwin" ]] && [[ -x /usr/libexec/java_home ]]; then
45+
candidate=$(/usr/libexec/java_home -v "${mac_version}" 2>/dev/null || true)
46+
else
47+
local java_home_version_x64_var="JAVA_HOME_${version}_X64"
48+
local java_home_version_var="JAVA_HOME_${version}"
49+
50+
if [[ -n "${!java_home_version_x64_var:-}" ]]; then
51+
candidate="${!java_home_version_x64_var}"
52+
elif [[ -n "${!java_home_version_var:-}" ]]; then
53+
candidate="${!java_home_version_var}"
54+
fi
55+
fi
56+
57+
if [[ -n "${candidate}" ]]; then
58+
export JAVA_HOME="${candidate}"
59+
export PATH="${candidate}/bin:${ORIGINAL_PATH}"
60+
else
61+
echo "JAVA_HOME for JDK ${version} not found."
62+
exit 1
63+
fi
64+
}
65+
3566
function run_maven() {
3667
if [[ "$CHECK" == "true" ]]; then
3768
"${PROJECT_DIR}"/build/mvn spotless:check compile test-compile scalafix:scalafix -Dscalafix.mode=CHECK -Dscalafix.skipTest=true -DskipBuildNative -P"${MODE}" -P"${SCALA_PROFILE}" "$@"
@@ -52,12 +83,23 @@ fi
5283
sparkver=spark-3.5
5384
for celebornver in celeborn-0.5 celeborn-0.6
5485
do
55-
run_maven -P"${sparkver}" -Pceleborn,"${celebornver}" -Puniffle,uniffle-0.10 -Ppaimon,paimon-1.2 -Pflink,flink-1.18 -Piceberg,iceberg-1.9
86+
run_maven -P"${sparkver}" -Pceleborn,"${celebornver}" -Puniffle,uniffle-0.10 -Ppaimon,paimon-1.2 -Pflink-1.18 -Piceberg-1.9
5687

5788
done
5889

59-
sparkvers=(spark-3.0 spark-3.1 spark-3.2 spark-3.3 spark-3.4)
90+
sparkvers=(spark-3.0 spark-3.1 spark-3.2 spark-3.3 spark-3.4 spark-4.0 spark-4.1)
6091
for sparkver in "${sparkvers[@]}"
6192
do
93+
if [[ $sparkver == spark-4.* ]]; then
94+
SCALA_PROFILE=scala-2.13
95+
else
96+
SCALA_PROFILE=scala-2.12
97+
fi
98+
if [[ $sparkver == spark-4.* ]]; then
99+
# Use JDK 17 for Spark 4.x to match the Spark 4.x requirement
100+
set_java_home 17
101+
else
102+
set_java_home 8
103+
fi
62104
run_maven -P"${sparkver}"
63105
done

spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ object AuronConverters extends Logging {
424424
assert(
425425
!exec.requiredSchema.exists(e => existTimestampType(e.dataType)),
426426
s"Parquet scan with timestamp type is not supported for table: ${tableIdentifier
427-
.getOrElse("unknown")}. " +
427+
.getOrElse("unknown")}. " +
428428
"Set spark.auron.enable.scan.parquet.timestamp=true to enable timestamp support " +
429429
"or remove timestamp columns from the query.")
430430
}
@@ -435,15 +435,15 @@ object AuronConverters extends Logging {
435435
assert(
436436
!exec.requiredSchema.exists(e => existTimestampType(e.dataType)),
437437
s"ORC scan with timestamp type is not supported for tableIdentifier: ${tableIdentifier
438-
.getOrElse("unknown")}. " +
438+
.getOrElse("unknown")}. " +
439439
"Set spark.auron.enable.scan.orc.timestamp=true to enable timestamp support " +
440440
"or remove timestamp columns from the query.")
441441
}
442442
addRenameColumnsExec(Shims.get.createNativeOrcScanExec(exec))
443443
case p =>
444444
throw new NotImplementedError(
445445
s"Cannot convert FileSourceScanExec tableIdentifier: ${tableIdentifier.getOrElse(
446-
"unknown")}, class: ${p.getClass.getName}")
446+
"unknown")}, class: ${p.getClass.getName}")
447447
}
448448
}
449449

spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ case class AuronColumnarOverrides(sparkSession: SparkSession) extends ColumnarRu
9191
dumpSimpleSparkPlanTreeNode(sparkPlanTransformed)
9292

9393
logInfo(s"Transformed spark plan after preColumnarTransitions:\n${sparkPlanTransformed
94-
.treeString(verbose = true, addSuffix = true)}")
94+
.treeString(verbose = true, addSuffix = true)}")
9595

9696
// post-transform
9797
Shims.get.postTransform(sparkPlanTransformed, sparkSession.sparkContext)

spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ object NativeHelper extends Logging {
7474
val heapMemory = Runtime.getRuntime.maxMemory()
7575
val offheapMemory = totalMemory - heapMemory
7676
logWarning(s"memory total: ${Utils.bytesToString(totalMemory)}, onheap: ${Utils.bytesToString(
77-
heapMemory)}, offheap: ${Utils.bytesToString(offheapMemory)}")
77+
heapMemory)}, offheap: ${Utils.bytesToString(offheapMemory)}")
7878
offheapMemory
7979
}
8080

spark-extension/src/main/scala/org/apache/spark/sql/auron/util/TaskContextHelper.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ object TaskContextHelper extends Logging {
4747
val thread = Thread.currentThread()
4848
val threadName = if (context != null) {
4949
s"auron native task ${context.partitionId()}.${context.attemptNumber()} in stage ${context
50-
.stageId()}.${context.stageAttemptNumber()} (TID ${context.taskAttemptId()})"
50+
.stageId()}.${context.stageAttemptNumber()} (TID ${context.taskAttemptId()})"
5151
} else {
5252
"auron native task " + thread.getName
5353
}

spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,10 @@ abstract class NativeParquetInsertIntoHiveTableBase(
6969
.filterKeys(Set("stage_id", "output_rows", "elapsed_compute"))
7070
.toSeq
7171
:+ ("io_time", SQLMetrics.createNanoTimingMetric(sparkContext, "Native.io_time"))
72-
:+ ("bytes_written",
73-
SQLMetrics
74-
.createSizeMetric(sparkContext, "Native.bytes_written")): _*)
72+
:+ (
73+
"bytes_written",
74+
SQLMetrics
75+
.createSizeMetric(sparkContext, "Native.bytes_written")): _*)
7576

7677
def check(): Unit = {
7778
val hadoopConf = sparkContext.hadoopConfiguration

thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/AuronUniffleShuffleReader.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ class AuronUniffleShuffleReader[K, C](
166166
}
167167
if (!emptyPartitionIds.isEmpty) {
168168
logDebug(s"Found ${emptyPartitionIds
169-
.size()} empty shuffle partitions: ${emptyPartitionIds.asScala.mkString(",")}")
169+
.size()} empty shuffle partitions: ${emptyPartitionIds.asScala.mkString(",")}")
170170
}
171171
iterators = shuffleDataIterList.iterator()
172172
if (iterators.hasNext) {

0 commit comments

Comments
 (0)