diff --git a/.github/workflows/example.yml b/.github/workflows/example.yml
index e038055923c..ae503663aa4 100644
--- a/.github/workflows/example.yml
+++ b/.github/workflows/example.yml
@@ -39,6 +39,7 @@ concurrency:
jobs:
build:
+ name: 'Spark ${{ matrix.spark }}, Hadoop ${{ matrix.hadoop }}, Sedona ${{ matrix.sedona }}'
runs-on: ubuntu-22.04
strategy:
fail-fast: false
@@ -56,23 +57,6 @@ jobs:
spark-compat: '3.4'
sedona: 1.8.0
hadoop: 3.3.4
- env:
- JAVA_TOOL_OPTIONS: >-
- -XX:+IgnoreUnrecognizedVMOptions
- --add-opens=java.base/java.lang=ALL-UNNAMED
- --add-opens=java.base/java.lang.invoke=ALL-UNNAMED
- --add-opens=java.base/java.lang.reflect=ALL-UNNAMED
- --add-opens=java.base/java.io=ALL-UNNAMED
- --add-opens=java.base/java.net=ALL-UNNAMED
- --add-opens=java.base/java.nio=ALL-UNNAMED
- --add-opens=java.base/java.util=ALL-UNNAMED
- --add-opens=java.base/java.util.concurrent=ALL-UNNAMED
- --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
- --add-opens=java.base/sun.nio.ch=ALL-UNNAMED
- --add-opens=java.base/sun.nio.cs=ALL-UNNAMED
- --add-opens=java.base/sun.security.action=ALL-UNNAMED
- --add-opens=java.base/sun.util.calendar=ALL-UNNAMED
- -Djdk.reflect.useDirectMethodHandle=false
steps:
- uses: actions/checkout@v4
- uses: actions/setup-java@v4
@@ -100,7 +84,8 @@ jobs:
path: ~/.m2
key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
restore-keys: ${{ runner.os }}-m2
- - env:
+ - name: Test Scala Spark SQL Example
+ env:
SPARK_VERSION: ${{ matrix.spark }}
SPARK_LOCAL_IP: 127.0.0.1
SPARK_COMPAT_VERSION: ${{ matrix.spark-compat }}
@@ -109,16 +94,28 @@ jobs:
run: |
cd examples/spark-sql
mvn versions:set -DnewVersion=${SEDONA_VERSION} -DgenerateBackupPoms=false
- mvn clean install \
+ mvn clean test \
-Dspark.version=${SPARK_VERSION} \
-Dspark.compat.version=${SPARK_COMPAT_VERSION} \
-Dsedona.version=${SEDONA_VERSION} \
-Dhadoop.version=${HADOOP_VERSION}
- java -jar target/sedona-spark-example-${SEDONA_VERSION}.jar
- - env:
+ - name: Test Java Spark SQL Example
+ env:
+ SPARK_VERSION: ${{ matrix.spark }}
+ SPARK_LOCAL_IP: 127.0.0.1
+ SPARK_COMPAT_VERSION: ${{ matrix.spark-compat }}
+ SEDONA_VERSION: ${{ matrix.sedona }}
+ HADOOP_VERSION: ${{ matrix.hadoop }}
+ run: |
+ cd examples/java-spark-sql
+ mvn versions:set -DnewVersion=${SEDONA_VERSION} -DgenerateBackupPoms=false
+ mvn clean test \
+ -Dspark.version=${SPARK_VERSION} \
+ -Dspark.compat.version=${SPARK_COMPAT_VERSION}
+ - name: Test Flink SQL Example
+ env:
SEDONA_VERSION: ${{ matrix.sedona }}
run: |
cd examples/flink-sql
mvn versions:set -DnewVersion=${SEDONA_VERSION} -DgenerateBackupPoms=false
- mvn clean install
- java -jar target/sedona-flink-example-${SEDONA_VERSION}.jar
+ mvn clean test
diff --git a/examples/flink-sql/pom.xml b/examples/flink-sql/pom.xml
index d6f7e97b68a..6c72b6acf4b 100644
--- a/examples/flink-sql/pom.xml
+++ b/examples/flink-sql/pom.xml
@@ -31,7 +31,7 @@
UTF-8
compile
1.19.0
- compile
+ provided
2.12
33.1
2.17.2
@@ -247,6 +247,20 @@
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 2.22.2
+
+
+ --add-opens=java.base/sun.nio.ch=ALL-UNNAMED
+ --add-opens=java.base/java.nio=ALL-UNNAMED
+ --add-opens=java.base/java.lang=ALL-UNNAMED
+ --add-opens=java.base/java.lang.invoke=ALL-UNNAMED
+ --add-opens=java.base/java.util=ALL-UNNAMED
+
+
+
org.jacoco
jacoco-maven-plugin
@@ -266,6 +280,29 @@
+
+ com.diffplug.spotless
+ spotless-maven-plugin
+ 2.35.0
+
+
+
+ 1.15.0
+
+
+ ../../tools/maven/license-header.txt
+
+
+
+
+
+
+ check
+
+ compile
+
+
+
diff --git a/examples/flink-sql/src/main/java/FlinkExample.java b/examples/flink-sql/src/main/java/FlinkExample.java
index c59eb8125eb..7ded36e0d6e 100644
--- a/examples/flink-sql/src/main/java/FlinkExample.java
+++ b/examples/flink-sql/src/main/java/FlinkExample.java
@@ -16,70 +16,84 @@
* specific language governing permissions and limitations
* under the License.
*/
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.call;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-
import org.apache.sedona.flink.SedonaFlinkRegistrator;
import org.apache.sedona.flink.expressions.Constructors;
-import static org.apache.flink.table.api.Expressions.$;
-import static org.apache.flink.table.api.Expressions.call;
-
-public class FlinkExample
-{
- static String[] pointColNames = {"geom_point", "name_point", "event_time", "proc_time"};
-
- static String[] polygonColNames = {"geom_polygon", "name_polygon", "event_time", "proc_time"};
+public class FlinkExample {
+ static String[] pointColNames = {"geom_point", "name_point", "event_time", "proc_time"};
- public static void main(String[] args) {
- int testDataSize = 10;
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
- StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
- SedonaFlinkRegistrator.registerType(env);
- SedonaFlinkRegistrator.registerFunc(tableEnv);
+ static String[] polygonColNames = {"geom_polygon", "name_polygon", "event_time", "proc_time"};
- // Create a fake WKT string table source
- Table pointWktTable = Utils.createTextTable(env, tableEnv, Utils.createPointWKT(testDataSize), pointColNames);
+ public static void main(String[] args) {
+ testS2SpatialJoin(10);
+ }
- // Create a geometry column
- Table pointTable = pointWktTable.select(
- call("ST_GeomFromWKT", $(pointColNames[0])).as(pointColNames[0]),
- $(pointColNames[1]));
+ public static void testS2SpatialJoin(int testDataSize) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+ SedonaFlinkRegistrator.registerType(env);
+ SedonaFlinkRegistrator.registerFunc(tableEnv);
- // Create S2CellID
- pointTable = pointTable.select($(pointColNames[0]), $(pointColNames[1]),
- call("ST_S2CellIDs", $(pointColNames[0]), 6).as("s2id_array"));
- // Explode s2id array
- tableEnv.createTemporaryView("pointTable", pointTable);
- pointTable = tableEnv.sqlQuery("SELECT geom_point, name_point, s2id_point FROM pointTable CROSS JOIN UNNEST(pointTable.s2id_array) AS tmpTbl1(s2id_point)");
+ // Create a fake WKT string table source
+ Table pointWktTable =
+ Utils.createTextTable(env, tableEnv, Utils.createPointWKT(testDataSize), pointColNames);
+ // Create a geometry column
+ Table pointTable =
+ pointWktTable.select(
+ call("ST_GeomFromWKT", $(pointColNames[0])).as(pointColNames[0]), $(pointColNames[1]));
- // Create a fake WKT string table source
- Table polygonWktTable = Utils.createTextTable(env, tableEnv, Utils.createPolygonWKT(testDataSize), polygonColNames);
- // Create a geometry column
- Table polygonTable = polygonWktTable.select(call(Constructors.ST_GeomFromWKT.class.getSimpleName(),
- $(polygonColNames[0])).as(polygonColNames[0]),
- $(polygonColNames[1]));
- // Create S2CellID
- polygonTable = polygonTable.select($(polygonColNames[0]), $(polygonColNames[1]),
- call("ST_S2CellIDs", $(polygonColNames[0]), 6).as("s2id_array"));
- // Explode s2id array
- tableEnv.createTemporaryView("polygonTable", polygonTable);
- polygonTable = tableEnv.sqlQuery("SELECT geom_polygon, name_polygon, s2id_polygon FROM polygonTable CROSS JOIN UNNEST(polygonTable.s2id_array) AS tmpTbl2(s2id_polygon)");
+ // Create S2CellID
+ pointTable =
+ pointTable.select(
+ $(pointColNames[0]),
+ $(pointColNames[1]),
+ call("ST_S2CellIDs", $(pointColNames[0]), 6).as("s2id_array"));
+ // Explode s2id array
+ tableEnv.createTemporaryView("pointTable", pointTable);
+ pointTable =
+ tableEnv.sqlQuery(
+ "SELECT geom_point, name_point, s2id_point FROM pointTable CROSS JOIN UNNEST(pointTable.s2id_array) AS tmpTbl1(s2id_point)");
- // TODO: TableImpl.print() occurs EOF Exception due to https://issues.apache.org/jira/browse/FLINK-35406
- // Use polygonTable.execute().print() when FLINK-35406 is fixed.
- polygonTable.execute().collect().forEachRemaining(row -> System.out.println(row));
+ // Create a fake WKT string table source
+ Table polygonWktTable =
+ Utils.createTextTable(env, tableEnv, Utils.createPolygonWKT(testDataSize), polygonColNames);
+ // Create a geometry column
+ Table polygonTable =
+ polygonWktTable.select(
+ call(Constructors.ST_GeomFromWKT.class.getSimpleName(), $(polygonColNames[0]))
+ .as(polygonColNames[0]),
+ $(polygonColNames[1]));
+ // Create S2CellID
+ polygonTable =
+ polygonTable.select(
+ $(polygonColNames[0]),
+ $(polygonColNames[1]),
+ call("ST_S2CellIDs", $(polygonColNames[0]), 6).as("s2id_array"));
+ // Explode s2id array
+ tableEnv.createTemporaryView("polygonTable", polygonTable);
+ polygonTable =
+ tableEnv.sqlQuery(
+ "SELECT geom_polygon, name_polygon, s2id_polygon FROM polygonTable CROSS JOIN UNNEST(polygonTable.s2id_array) AS tmpTbl2(s2id_polygon)");
- // Join two tables by their S2 ids
- Table joinResult = pointTable.join(polygonTable).where($("s2id_point").isEqual($("s2id_polygon")));
- // Optional: remove false positives
- joinResult = joinResult.where(call("ST_Contains", $("geom_polygon"), $("geom_point")));
- joinResult.execute().collect().forEachRemaining(row -> System.out.println(row));
- }
+ // TODO: TableImpl.print() occurs EOF Exception due to
+ // https://issues.apache.org/jira/browse/FLINK-35406
+ // Use polygonTable.execute().print() when FLINK-35406 is fixed.
+ polygonTable.execute().collect().forEachRemaining(row -> System.out.println(row));
+ // Join two tables by their S2 ids
+ Table joinResult =
+ pointTable.join(polygonTable).where($("s2id_point").isEqual($("s2id_polygon")));
+ // Optional: remove false positives
+ joinResult = joinResult.where(call("ST_Contains", $("geom_polygon"), $("geom_point")));
+ joinResult.execute().collect().forEachRemaining(row -> System.out.println(row));
+ }
}
diff --git a/examples/flink-sql/src/main/java/Utils.java b/examples/flink-sql/src/main/java/Utils.java
index 0a95ab1b7bc..abe1c3d3a48 100644
--- a/examples/flink-sql/src/main/java/Utils.java
+++ b/examples/flink-sql/src/main/java/Utils.java
@@ -16,7 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
+import static org.apache.flink.table.api.Expressions.$;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -27,91 +32,102 @@
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.apache.flink.table.api.Expressions.$;
-
-public class Utils
-{
- static Long timestamp_base = new Timestamp(System.currentTimeMillis()).getTime();
- static Long time_interval = 1L; // Generate a record per this interval. Unit is second
+public class Utils {
+ static Long timestamp_base = new Timestamp(System.currentTimeMillis()).getTime();
+ static Long time_interval = 1L; // Generate a record per this interval. Unit is second
- static List createPointText(int size){
- List data = new ArrayList<>();
- for (int i = 0; i < size; i++) {
- // Create a number of points (1, 1) (2, 2) ...
- data.add(Row.of(i + "," + i, "point" + i, timestamp_base + time_interval * 1000 * i));
- }
- return data;
+ static List createPointText(int size) {
+ List data = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ // Create a number of points (1, 1) (2, 2) ...
+ data.add(Row.of(i + "," + i, "point" + i, timestamp_base + time_interval * 1000 * i));
}
+ return data;
+ }
- static List createPolygonText(int size) {
- List data = new ArrayList<>();
- for (int i = 0; i < size; i++) {
- // Create polygons each of which only has 1 match in points
- // Each polygon is an envelope like (-0.5, -0.5, 0.5, 0.5)
- String minX = String.valueOf(i - 0.5);
- String minY = String.valueOf(i - 0.5);
- String maxX = String.valueOf(i + 0.5);
- String maxY = String.valueOf(i + 0.5);
- List polygon = new ArrayList<>();
- polygon.add(minX);polygon.add(minY);
- polygon.add(minX);polygon.add(maxY);
- polygon.add(maxX);polygon.add(maxY);
- polygon.add(maxX);polygon.add(minY);
- polygon.add(minX);polygon.add(minY);
- data.add(Row.of(String.join(",", polygon), "polygon" + i, timestamp_base + time_interval * 1000 * i));
- }
- return data;
+ static List createPolygonText(int size) {
+ List data = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ // Create polygons each of which only has 1 match in points
+ // Each polygon is an envelope like (-0.5, -0.5, 0.5, 0.5)
+ String minX = String.valueOf(i - 0.5);
+ String minY = String.valueOf(i - 0.5);
+ String maxX = String.valueOf(i + 0.5);
+ String maxY = String.valueOf(i + 0.5);
+ List polygon = new ArrayList<>();
+ polygon.add(minX);
+ polygon.add(minY);
+ polygon.add(minX);
+ polygon.add(maxY);
+ polygon.add(maxX);
+ polygon.add(maxY);
+ polygon.add(maxX);
+ polygon.add(minY);
+ polygon.add(minX);
+ polygon.add(minY);
+ data.add(
+ Row.of(
+ String.join(",", polygon), "polygon" + i, timestamp_base + time_interval * 1000 * i));
}
+ return data;
+ }
- static List createPointWKT(int size){
- List data = new ArrayList<>();
- for (int i = 0; i < size; i++) {
- // Create a number of points (1, 1) (2, 2) ...
- data.add(Row.of("POINT (" + i + " " + i +")", "point" + i, timestamp_base + time_interval * 1000 * i));
- }
- return data;
+ static List createPointWKT(int size) {
+ List data = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ // Create a number of points (1, 1) (2, 2) ...
+ data.add(
+ Row.of(
+ "POINT (" + i + " " + i + ")",
+ "point" + i,
+ timestamp_base + time_interval * 1000 * i));
}
+ return data;
+ }
- static List createPolygonWKT(int size) {
- List data = new ArrayList<>();
- for (int i = 0; i < size; i++) {
- // Create polygons each of which only has 1 match in points
- // Each polygon is an envelope like (-0.5, -0.5, 0.5, 0.5)
- String minX = String.valueOf(i - 0.5);
- String minY = String.valueOf(i - 0.5);
- String maxX = String.valueOf(i + 0.5);
- String maxY = String.valueOf(i + 0.5);
- List polygon = new ArrayList<>();
- polygon.add(minX + " " + minY);
- polygon.add(minX + " " + maxY);
- polygon.add(maxX + " " + maxY);
- polygon.add(maxX + " " + minY);
- polygon.add(minX + " " + minY);
- data.add(Row.of("POLYGON ((" + String.join(", ", polygon) + "))", "polygon" + i, timestamp_base + time_interval * 1000 * i));
- }
- return data;
+ static List createPolygonWKT(int size) {
+ List data = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ // Create polygons each of which only has 1 match in points
+ // Each polygon is an envelope like (-0.5, -0.5, 0.5, 0.5)
+ String minX = String.valueOf(i - 0.5);
+ String minY = String.valueOf(i - 0.5);
+ String maxX = String.valueOf(i + 0.5);
+ String maxY = String.valueOf(i + 0.5);
+ List polygon = new ArrayList<>();
+ polygon.add(minX + " " + minY);
+ polygon.add(minX + " " + maxY);
+ polygon.add(maxX + " " + maxY);
+ polygon.add(maxX + " " + minY);
+ polygon.add(minX + " " + minY);
+ data.add(
+ Row.of(
+ "POLYGON ((" + String.join(", ", polygon) + "))",
+ "polygon" + i,
+ timestamp_base + time_interval * 1000 * i));
}
+ return data;
+ }
- static Table createTextTable(StreamExecutionEnvironment env, StreamTableEnvironment tableEnv, List data, String[] colNames){
- TypeInformation>[] colTypes = {
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO
- };
- RowTypeInfo typeInfo = new RowTypeInfo(colTypes, Arrays.copyOfRange(colNames, 0, 3));
- DataStream ds = env.fromCollection(data).returns(typeInfo);
- // Generate Time Attribute
- WatermarkStrategy wmStrategy =
- WatermarkStrategy
- .forMonotonousTimestamps()
- .withTimestampAssigner((event, timestamp) -> event.getFieldAs(2));
- return tableEnv.fromDataStream(ds.assignTimestampsAndWatermarks(wmStrategy), $(colNames[0]), $(colNames[1]), $(colNames[2]).rowtime(), $(colNames[3]).proctime());
- }
-
-
+ static Table createTextTable(
+ StreamExecutionEnvironment env,
+ StreamTableEnvironment tableEnv,
+ List data,
+ String[] colNames) {
+ TypeInformation>[] colTypes = {
+ BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO
+ };
+ RowTypeInfo typeInfo = new RowTypeInfo(colTypes, Arrays.copyOfRange(colNames, 0, 3));
+ DataStream ds = env.fromCollection(data).returns(typeInfo);
+ // Generate Time Attribute
+ WatermarkStrategy wmStrategy =
+ WatermarkStrategy.forMonotonousTimestamps()
+ .withTimestampAssigner((event, timestamp) -> event.getFieldAs(2));
+ return tableEnv.fromDataStream(
+ ds.assignTimestampsAndWatermarks(wmStrategy),
+ $(colNames[0]),
+ $(colNames[1]),
+ $(colNames[2]).rowtime(),
+ $(colNames[3]).proctime());
+ }
}
diff --git a/examples/flink-sql/src/test/java/FlinkFunctionsTest.java b/examples/flink-sql/src/test/java/FlinkFunctionsTest.java
new file mode 100644
index 00000000000..64c63f81271
--- /dev/null
+++ b/examples/flink-sql/src/test/java/FlinkFunctionsTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.junit.Test;
+
+public class FlinkFunctionsTest {
+ @Test
+ public void testS2SpatialJoinWithSmallDataset() {
+ // Test with small dataset
+ FlinkExample.testS2SpatialJoin(5);
+ }
+
+ @Test
+ public void testS2SpatialJoinWithMediumDataset() {
+ // Test with medium dataset
+ FlinkExample.testS2SpatialJoin(10);
+ }
+
+ @Test
+ public void testS2SpatialJoinWithLargeDataset() {
+ // Test with larger dataset
+ FlinkExample.testS2SpatialJoin(20);
+ }
+}
diff --git a/examples/java-spark-sql/pom.xml b/examples/java-spark-sql/pom.xml
index 640cf400f25..787e8d911b9 100644
--- a/examples/java-spark-sql/pom.xml
+++ b/examples/java-spark-sql/pom.xml
@@ -23,7 +23,7 @@
org.apache.sedona
sedona-java-spark-example
- 1.6.1
+ 1.8.0
Sedona : Examples : Java Spark SQL
Example project for Apache Sedona with Java and Spark.
@@ -32,11 +32,31 @@
provided
test
- 1.6.1
+ 1.8.0
1.8.0-33.1
- 3.5.7
+ 4.0.1
+ 4.0
+ 2.13
4.0.1
- 3.0.0
+
+
+
+ -XX:+IgnoreUnrecognizedVMOptions
+ --add-opens=java.base/java.lang=ALL-UNNAMED
+ --add-opens=java.base/java.lang.invoke=ALL-UNNAMED
+ --add-opens=java.base/java.lang.reflect=ALL-UNNAMED
+ --add-opens=java.base/java.io=ALL-UNNAMED
+ --add-opens=java.base/java.net=ALL-UNNAMED
+ --add-opens=java.base/java.nio=ALL-UNNAMED
+ --add-opens=java.base/java.util=ALL-UNNAMED
+ --add-opens=java.base/java.util.concurrent=ALL-UNNAMED
+ --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
+ --add-opens=java.base/sun.nio.ch=ALL-UNNAMED
+ --add-opens=java.base/sun.nio.cs=ALL-UNNAMED
+ --add-opens=java.base/sun.security.action=ALL-UNNAMED
+ --add-opens=java.base/sun.util.calendar=ALL-UNNAMED
+ -Djdk.reflect.useDirectMethodHandle=false
+
@@ -47,17 +67,17 @@
org.apache.sedona
- sedona-spark-shaded-3.5_2.13
+ sedona-spark-shaded-${spark.compat.version}_${scala.compat.version}
${sedona.version}
org.apache.spark
- spark-core_2.13
+ spark-core_${scala.compat.version}
${spark.version}
org.apache.spark
- spark-sql_2.13
+ spark-sql_${scala.compat.version}
${spark.version}
${spark.scope}
@@ -68,9 +88,10 @@
${javax.scope}
- org.junit.jupiter
- junit-jupiter-engine
- 5.2.0-M1
+ junit
+ junit
+ 4.13.1
+ test
@@ -79,21 +100,15 @@
org.apache.maven.plugins
maven-surefire-plugin
- 3.2.5
+ 2.22.2
-
- --add-opens=java.base/sun.nio.ch=ALL-UNNAMED
- --add-opens=java.base/java.nio=ALL-UNNAMED
- --add-opens=java.base/java.lang=ALL-UNNAMED
- --add-opens=java.base/java.lang.invoke=ALL-UNNAMED
- --add-opens=java.base/java.util=ALL-UNNAMED
-
+ ${extraJavaArgs}
org.apache.maven.plugins
maven-shade-plugin
- 2.1
+ 3.5.0
package
@@ -136,12 +151,7 @@
-
- --add-opens=java.base/sun.nio.ch=ALL-UNNAMED
- --add-opens=java.base/java.nio=ALL-UNNAMED
- --add-opens=java.base/java.lang=ALL-UNNAMED
- --add-opens=java.base/java.util=ALL-UNNAMED
-
+ ${extraJavaArgs}
@@ -149,36 +159,26 @@
com.diffplug.spotless
spotless-maven-plugin
- ${spotless.version}
+ 2.35.0
-
-
-
-
-
- .gitattributes
- .gitignore
-
-
-
-
-
- true
- 4
-
-
-
-
-
- 1.10
-
- true
- false
-
+
+ 1.15.0
+
+
+ ../../tools/maven/license-header.txt
+
-
+
+
+
+ check
+
+ compile
+
+
+
diff --git a/examples/java-spark-sql/src/main/java/spark/GeoParquetAccessor.java b/examples/java-spark-sql/src/main/java/spark/GeoParquetAccessor.java
index ba8e6a1f659..1745d823abc 100644
--- a/examples/java-spark-sql/src/main/java/spark/GeoParquetAccessor.java
+++ b/examples/java-spark-sql/src/main/java/spark/GeoParquetAccessor.java
@@ -16,9 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package spark;
+import java.util.List;
import org.apache.sedona.core.spatialOperator.RangeQuery;
import org.apache.sedona.core.spatialOperator.SpatialPredicate;
import org.apache.sedona.core.spatialRDD.SpatialRDD;
@@ -31,60 +31,57 @@
import org.locationtech.jts.geom.GeometryFactory;
import org.locationtech.jts.geom.Polygon;
-import java.util.List;
-
-
public class GeoParquetAccessor {
- private final SparkSession session;
- private String parquetPath;
+ private final SparkSession session;
+ private String parquetPath;
- public GeoParquetAccessor() {
- this.session = new SedonaSparkSession().getSession();
- this.parquetPath = "";
- }
+ public GeoParquetAccessor() {
+ this.session = new SedonaSparkSession().getSession();
+ this.parquetPath = "";
+ }
- //Overload with constructor that has Spark session provided
- //Use to avoid error - can't have two SparkContext objects on one JVM
- public GeoParquetAccessor(SparkSession session, String parquetPath) {
- this.session = session;
- this.parquetPath = parquetPath;
- }
+ // Overload with constructor that has Spark session provided
+ // Use to avoid error - can't have two SparkContext objects on one JVM
+ public GeoParquetAccessor(SparkSession session, String parquetPath) {
+ this.session = session;
+ this.parquetPath = parquetPath;
+ }
- public List selectFeaturesByPolygon(double xmin, double ymax,
- double xmax, double ymin,
- String geometryColumn) {
+ public List selectFeaturesByPolygon(
+ double xmin, double ymax, double xmax, double ymin, String geometryColumn) {
- //Read the GeoParquet file into a DataFrame
- Dataset insarDF = session.read().format("geoparquet").load(parquetPath);
+ // Read the GeoParquet file into a DataFrame
+ Dataset insarDF = session.read().format("geoparquet").load(parquetPath);
- //Convert the DataFrame to a SpatialRDD
- //The second argument to toSpatialRdd is the name of the geometry column.
- SpatialRDD insarRDD = Adapter.toSpatialRdd(insarDF, geometryColumn);
+ // Convert the DataFrame to a SpatialRDD
+ // The second argument to toSpatialRdd is the name of the geometry column.
+ SpatialRDD insarRDD = Adapter.toSpatialRdd(insarDF, geometryColumn);
- // Define the polygon for the spatial query
- GeometryFactory geometryFactory = new GeometryFactory();
- Coordinate[] coordinates = new Coordinate[] {
- new Coordinate(xmin, ymin),
- new Coordinate(xmax, ymin),
- new Coordinate(xmax, ymax),
- new Coordinate(xmin, ymax),
- new Coordinate(xmin, ymin) // A closed polygon has the same start and end coordinate
+ // Define the polygon for the spatial query
+ GeometryFactory geometryFactory = new GeometryFactory();
+ Coordinate[] coordinates =
+ new Coordinate[] {
+ new Coordinate(xmin, ymin),
+ new Coordinate(xmax, ymin),
+ new Coordinate(xmax, ymax),
+ new Coordinate(xmin, ymax),
+ new Coordinate(xmin, ymin) // A closed polygon has the same start and end coordinate
};
- Polygon queryPolygon = geometryFactory.createPolygon(coordinates);
-
- // Perform the spatial range query
- // This will return all geometries that intersect with the query polygon.
- // Alternatives are SpatialPredicate.CONTAINS or SpatialPredicate.WITHIN
- SpatialRDD resultRDD = new SpatialRDD<>();
- try {
- resultRDD.rawSpatialRDD = RangeQuery.SpatialRangeQuery(insarRDD, queryPolygon, SpatialPredicate.INTERSECTS, false);
- } catch (Exception e) {
- e.printStackTrace();
- }
+ Polygon queryPolygon = geometryFactory.createPolygon(coordinates);
- // Collect the results back to the driver
- return resultRDD.getRawSpatialRDD().collect();
+ // Perform the spatial range query
+ // This will return all geometries that intersect with the query polygon.
+ // Alternatives are SpatialPredicate.CONTAINS or SpatialPredicate.WITHIN
+ SpatialRDD resultRDD = new SpatialRDD<>();
+ try {
+ resultRDD.rawSpatialRDD =
+ RangeQuery.SpatialRangeQuery(insarRDD, queryPolygon, SpatialPredicate.INTERSECTS, false);
+ } catch (Exception e) {
+ e.printStackTrace();
}
+ // Collect the results back to the driver
+ return resultRDD.getRawSpatialRDD().collect();
+ }
}
diff --git a/examples/java-spark-sql/src/main/java/spark/SedonaGeoParquetMain.java b/examples/java-spark-sql/src/main/java/spark/SedonaGeoParquetMain.java
index 4a114372830..5dfe45a46e3 100644
--- a/examples/java-spark-sql/src/main/java/spark/SedonaGeoParquetMain.java
+++ b/examples/java-spark-sql/src/main/java/spark/SedonaGeoParquetMain.java
@@ -16,46 +16,45 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package spark;
-import org.locationtech.jts.geom.Coordinate;
-import org.locationtech.jts.geom.Geometry;
-
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Properties;
+import org.locationtech.jts.geom.Coordinate;
+import org.locationtech.jts.geom.Geometry;
public class SedonaGeoParquetMain {
- protected static Properties properties;
- protected static String parquetPath;
- protected static SedonaSparkSession session;
+ protected static Properties properties;
+ protected static String parquetPath;
+ protected static SedonaSparkSession session;
- public static void main(String args[]) {
+ public static void main(String args[]) {
- session = new SedonaSparkSession();
- //Get parquetPath and any other application.properties
- try {
- ClassLoader loader = Thread.currentThread().getContextClassLoader();
- Properties properties = new Properties();
- InputStream is = loader.getResourceAsStream("application.properties");
- properties.load(is);
- parquetPath = properties.getProperty("parquet.path");
- } catch (IOException e) {
- e.printStackTrace();
- parquetPath = "";
- }
- GeoParquetAccessor accessor = new GeoParquetAccessor(session.session, parquetPath);
- //Test parquet happens to be in New Zealand Transverse Mercator (EPSG:2193) (meters)
- List geoms = accessor.selectFeaturesByPolygon(1155850, 4819840, 1252000, 4748100, "geometry");
- System.out.println("Coordinates of convex hull of points in boundary:");
- for (Geometry geom : geoms) {
- Coordinate[] convexHullCoordinates = geom.convexHull().getCoordinates();
- for (Coordinate coord : convexHullCoordinates) {
- System.out.println(String.format("\t%s", coord.toString()));
- }
- }
+ session = new SedonaSparkSession();
+ // Get parquetPath and any other application.properties
+ try {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ Properties properties = new Properties();
+ InputStream is = loader.getResourceAsStream("application.properties");
+ properties.load(is);
+ parquetPath = properties.getProperty("parquet.path");
+ } catch (IOException e) {
+ e.printStackTrace();
+ parquetPath = "";
+ }
+ GeoParquetAccessor accessor = new GeoParquetAccessor(session.session, parquetPath);
+ // Test parquet happens to be in New Zealand Transverse Mercator (EPSG:2193) (meters)
+ List geoms =
+ accessor.selectFeaturesByPolygon(1155850, 4819840, 1252000, 4748100, "geometry");
+ System.out.println("Coordinates of convex hull of points in boundary:");
+ for (Geometry geom : geoms) {
+ Coordinate[] convexHullCoordinates = geom.convexHull().getCoordinates();
+ for (Coordinate coord : convexHullCoordinates) {
+ System.out.println(String.format("\t%s", coord.toString()));
+ }
}
+ }
}
diff --git a/examples/java-spark-sql/src/main/java/spark/SedonaSparkSession.java b/examples/java-spark-sql/src/main/java/spark/SedonaSparkSession.java
index 6be6c995855..aaf1c938fef 100644
--- a/examples/java-spark-sql/src/main/java/spark/SedonaSparkSession.java
+++ b/examples/java-spark-sql/src/main/java/spark/SedonaSparkSession.java
@@ -16,36 +16,35 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package spark;
import org.apache.sedona.spark.SedonaContext;
import org.apache.spark.sql.SparkSession;
-
public class SedonaSparkSession {
- public SparkSession session;
-
- public SedonaSparkSession() {
-
- //Set configuration for localhost spark cluster. Intended to be run from IDE or similar.
- //Use SedonaContext builder to create SparkSession with Sedona extensions
- SparkSession config = SedonaContext.builder()
- .appName(this.getClass().getSimpleName())
- .master("local[*]")
- .config("spark.ui.enabled", "false")
- .config("spark.driver.extraJavaOptions",
- "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED")
- .getOrCreate();
-
- //Create Sedona-enabled SparkSession
- this.session = SedonaContext.create(config);
- }
-
- public SparkSession getSession() {
- // Access SparkSession object
- return this.session;
- }
-
+ public SparkSession session;
+
+ public SedonaSparkSession() {
+
+ // Set configuration for localhost spark cluster. Intended to be run from IDE or similar.
+ // Use SedonaContext builder to create SparkSession with Sedona extensions
+ SparkSession config =
+ SedonaContext.builder()
+ .appName(this.getClass().getSimpleName())
+ .master("local[*]")
+ .config("spark.ui.enabled", "false")
+ .config(
+ "spark.driver.extraJavaOptions",
+ "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED")
+ .getOrCreate();
+
+ // Create Sedona-enabled SparkSession
+ this.session = SedonaContext.create(config);
+ }
+
+ public SparkSession getSession() {
+ // Access SparkSession object
+ return this.session;
+ }
}
diff --git a/examples/java-spark-sql/src/test/java/spark/SedonaParquetTest.java b/examples/java-spark-sql/src/test/java/spark/SedonaParquetTest.java
index 036cdda9567..f965e5b5107 100644
--- a/examples/java-spark-sql/src/test/java/spark/SedonaParquetTest.java
+++ b/examples/java-spark-sql/src/test/java/spark/SedonaParquetTest.java
@@ -16,87 +16,71 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package spark;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
-
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
public class SedonaParquetTest {
-
- protected static Properties properties;
- protected static String parquetPath;
- protected static SedonaSparkSession session;
-
- public SedonaParquetTest() {
- }
-
- @BeforeAll
- public static void setUpClass() throws IOException {
-
- session = new SedonaSparkSession();
- //Get parquetPath and any other application.properties
- try {
- ClassLoader loader = Thread.currentThread().getContextClassLoader();
- Properties properties = new Properties();
- InputStream is = loader.getResourceAsStream("application.properties");
- properties.load(is);
- parquetPath = properties.getProperty("parquet.path");
- } catch (IOException e) {
- e.printStackTrace();
- parquetPath = "";
- }
-
+ protected static Properties properties;
+ protected static String parquetPath;
+ protected static SedonaSparkSession session;
+
+ public SedonaParquetTest() {}
+
+ @BeforeClass
+ public static void setUpClass() throws IOException {
+
+ session = new SedonaSparkSession();
+ // Get parquetPath and any other application.properties
+ try {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ Properties properties = new Properties();
+ InputStream is = loader.getResourceAsStream("application.properties");
+ properties.load(is);
+ parquetPath = properties.getProperty("parquet.path");
+ } catch (IOException e) {
+ e.printStackTrace();
+ parquetPath = "";
}
-
- @AfterAll
- public static void tearDownClass() {
- }
-
- @BeforeEach
- public void setUp() {
- }
-
- @AfterEach
- public void tearDown() {
- }
-
- @Test
- public void connects() {
- assertNotNull(session, "SparkSedonaSession not initialized correctly.");
- assertNotNull(session.session, "Spark session not initialized inside SparkSedonaSession.");
- }
-
- @Test
- public void parquetAccessible() {
- File file = new File(parquetPath);
- assertTrue(file.exists(), "Parquet file does not exist.");
- assertTrue(file.canRead(), "Can't read geoparquet file on record.");
- }
-
- @Test
- public void canLoadRDD() {
- assertNotNull(session, "Session is null.");
- Dataset insarDF = session.session.read()
- .format("geoparquet")
- .load(parquetPath);
- assertNotNull(insarDF, "Dataset was not created.");
- assertTrue(insarDF.count() > 0, "Dataset is empty.");
- }
-
+ }
+
+ @AfterClass
+ public static void tearDownClass() {}
+
+ @Before
+ public void setUp() {}
+
+ @Test
+ public void connects() {
+ assertNotNull("SparkSedonaSession not initialized correctly.", session);
+ assertNotNull("Spark session not initialized inside SparkSedonaSession.", session.session);
+ }
+
+ @Test
+ public void parquetAccessible() {
+ File file = new File(parquetPath);
+ assertTrue("Parquet file does not exist.", file.exists());
+ assertTrue("Can't read geoparquet file on record.", file.canRead());
+ }
+
+ @Test
+ public void canLoadRDD() {
+ assertNotNull("Session is null.", session);
+ Dataset insarDF = session.session.read().format("geoparquet").load(parquetPath);
+ assertNotNull("Dataset was not created.", insarDF);
+ assertTrue("Dataset is empty.", insarDF.count() > 0);
+ }
}
diff --git a/examples/spark-sql/pom.xml b/examples/spark-sql/pom.xml
index f2c647d6530..28066a64607 100644
--- a/examples/spark-sql/pom.xml
+++ b/examples/spark-sql/pom.xml
@@ -30,7 +30,7 @@
UTF-8
- compile
+ provided
compile
compile
@@ -66,13 +66,13 @@
org.apache.spark
spark-core_${scala.compat.version}
${spark.version}
- ${dependency.scope}
+ ${spark.scope}
org.apache.spark
spark-sql_${scala.compat.version}
${spark.version}
- ${dependency.scope}
+ ${spark.scope}
org.apache.sedona
@@ -185,7 +185,7 @@
org.apache.hadoop
hadoop-aws
${hadoop.version}
- ${dependency.scope}
+ ${spark.scope}
junit
@@ -193,6 +193,12 @@
4.13.1
test
+
+ org.scalatest
+ scalatest_${scala.compat.version}
+ 3.2.15
+ test
+
@@ -355,6 +361,48 @@
${extraJavaArgs}
+
+ org.scalatest
+ scalatest-maven-plugin
+ 2.2.0
+
+ ${project.build.directory}/surefire-reports
+ .
+ TestSuite.txt
+ --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false
+
+
+
+ test
+
+ test
+
+
+
+
+
+ com.diffplug.spotless
+ spotless-maven-plugin
+ 2.35.0
+
+
+
+ 1.15.0
+
+
+ ../../tools/maven/license-header.txt
+
+
+
+
+
+
+ check
+
+ compile
+
+
+
diff --git a/examples/spark-sql/src/main/scala/Main.scala b/examples/spark-sql/src/main/scala/Main.scala
index cd3e4c67a87..45efd851652 100644
--- a/examples/spark-sql/src/main/scala/Main.scala
+++ b/examples/spark-sql/src/main/scala/Main.scala
@@ -26,6 +26,15 @@ import org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator
import org.apache.sedona.viz.sql.utils.SedonaVizRegistrator
+/**
+ * Main entry point for running Sedona SQL, RDD, and Visualization examples.
+ * Demonstrates various spatial operations including:
+ * - SQL-based spatial queries and joins
+ * - GeoParquet I/O operations
+ * - Shapefile and raster data handling
+ * - RDD-based spatial analysis
+ * - Spatial visualization techniques
+ */
object Main extends App {
Logger.getRootLogger().setLevel(Level.WARN)
@@ -39,20 +48,31 @@ object Main extends App {
val resourceFolder = System.getProperty("user.dir")+"/src/test/resources/"
+ // SQL-based spatial operations
+ println("=== Running SQL Examples ===")
testPredicatePushdownAndRangeJonQuery(sedona)
testDistanceJoinQuery(sedona)
testAggregateFunction(sedona)
testShapefileConstructor(sedona)
testRasterIOAndMapAlgebra(sedona)
+ // GeoParquet operations
+ println("\n=== Running GeoParquet Examples ===")
+ testGeoParquetWriter(sedona)
+ testGeoParquetReader(sedona)
+
+ // RDD-based spatial analysis
+ println("\n=== Running RDD Examples ===")
visualizeSpatialColocation(sedona)
calculateSpatialColocation(sedona)
+ // Visualization examples
+ println("\n=== Running Visualization Examples ===")
buildScatterPlot(sedona)
buildHeatMap(sedona)
buildChoroplethMap(sedona)
parallelFilterRenderNoStitch(sedona)
sqlApiVisualization(sedona)
- System.out.println("All SedonaSQL DEMOs passed!")
+ println("\n✅ All Sedona examples completed successfully!")
}
diff --git a/examples/spark-sql/src/main/scala/RddExample.scala b/examples/spark-sql/src/main/scala/RddExample.scala
index 7dc54860cf1..6f663051501 100644
--- a/examples/spark-sql/src/main/scala/RddExample.scala
+++ b/examples/spark-sql/src/main/scala/RddExample.scala
@@ -17,7 +17,6 @@
* under the License.
*/
-import Main.resourceFolder
import org.apache.sedona.core.enums.{GridType, IndexType}
import org.apache.sedona.core.formatMapper.shapefileParser.ShapefileReader
import org.apache.sedona.core.spatialOperator.JoinQuery
@@ -34,6 +33,8 @@ import java.awt.Color
object RddExample {
+ val resourceFolder = System.getProperty("user.dir")+"/src/test/resources/"
+
// Data link (in shapefile): https://geo.nyu.edu/catalog/nyu_2451_34514
val nycArealandmarkShapefileLocation = resourceFolder+"nyc-area-landmark-shapefile"
@@ -42,16 +43,25 @@ object RddExample {
val colocationMapLocation = System.getProperty("user.dir")+"/colocationMap"
+ /**
+ * Visualizes spatial co-location between NYC landmarks and taxi pickup points.
+ * Creates an overlay visualization with landmarks (scatter plot) and taxi trips (heat map).
+ *
+ * Note: This function uses RDD API to demonstrate low-level spatial operations.
+ * For DataFrame-based approach, see SqlExample.
+ *
+ * @param sedona SparkSession with Sedona extensions enabled
+ */
def visualizeSpatialColocation(sedona: SparkSession): Unit =
{
// Prepare NYC area landmarks which includes airports, museums, colleges, hospitals
- var arealmRDD = ShapefileReader.readToPolygonRDD(sedona.sparkContext, nycArealandmarkShapefileLocation)
+ val arealmRDD = ShapefileReader.readToPolygonRDD(sedona.sparkContext, nycArealandmarkShapefileLocation)
// Prepare NYC taxi trips. Only use the taxi trips' pickup points
- var tripDf = sedona.read.format("csv").option("delimiter",",").option("header","false").load(nyctripCSVLocation)
+ val tripDf = sedona.read.format("csv").option("delimiter",",").option("header","false").load(nyctripCSVLocation)
// Convert from DataFrame to RDD. This can also be done directly through Sedona RDD API.
tripDf.createOrReplaceTempView("tripdf")
- var tripRDD = Adapter.toSpatialRdd(sedona.sql("select ST_Point(cast(tripdf._c0 as Decimal(24, 14)), cast(tripdf._c1 as Decimal(24, 14))) as point from tripdf")
+ val tripRDD = Adapter.toSpatialRdd(sedona.sql("select ST_Point(cast(tripdf._c0 as Decimal(24, 14)), cast(tripdf._c1 as Decimal(24, 14))) as point from tripdf")
, "point")
// Convert the Coordinate Reference System from degree-based to meter-based. This returns the accurate distance calculate.
@@ -79,6 +89,16 @@ object RddExample {
imageGenerator.SaveRasterImageAsLocalFile(overlayOperator.backRasterImage, colocationMapLocation, ImageType.PNG)
}
+ /**
+ * Calculates spatial co-location using Ripley's K function.
+ * Analyzes whether taxi trips are clustered around NYC landmarks at various distance thresholds.
+ * Uses distance join queries to compute co-location statistics.
+ *
+ * The Ripley's K function tests for spatial clustering/dispersion by comparing
+ * observed vs expected point patterns at increasing distance bands.
+ *
+ * @param sedona SparkSession with Sedona extensions enabled
+ */
def calculateSpatialColocation(sedona: SparkSession): Unit =
{
@@ -88,22 +108,22 @@ object RddExample {
// Use the center point of area landmarks to check co-location. This is required by Ripley's K function.
arealmRDD.rawSpatialRDD = arealmRDD.rawSpatialRDD.rdd.map[Geometry](f=>
{
- var geom = f.getCentroid
+ val geom = f.getCentroid
// Copy non-spatial attributes
geom.setUserData(f.getUserData)
geom
})
// The following two lines are optional. The purpose is to show the structure of the shapefile.
- var arealmDf = Adapter.toDf(arealmRDD, sedona)
+ val arealmDf = Adapter.toDf(arealmRDD, sedona)
arealmDf.show()
// Prepare NYC taxi trips. Only use the taxi trips' pickup points
- var tripDf = sedona.read.format("csv").option("delimiter",",").option("header","false").load(nyctripCSVLocation)
+ val tripDf = sedona.read.format("csv").option("delimiter",",").option("header","false").load(nyctripCSVLocation)
tripDf.show() // Optional
// Convert from DataFrame to RDD. This can also be done directly through Sedona RDD API.
tripDf.createOrReplaceTempView("tripdf")
- var tripRDD = Adapter.toSpatialRdd(sedona.sql("select ST_Point(cast(tripdf._c0 as Decimal(24, 14)), cast(tripdf._c1 as Decimal(24, 14))) as point, 'def' as trip_attr from tripdf")
+ val tripRDD = Adapter.toSpatialRdd(sedona.sql("select ST_Point(cast(tripdf._c0 as Decimal(24, 14)), cast(tripdf._c1 as Decimal(24, 14))) as point, 'def' as trip_attr from tripdf")
, "point")
// Convert the Coordinate Reference System from degree-based to meter-based. This returns the accurate distance calculate.
@@ -127,27 +147,27 @@ object RddExample {
val beginDistance = 0.0
var currentDistance = 0.0
- // Start the iteration
+ // Start the iteration - test multiple distance bands
println("distance(meter),observedL,difference,coLocationStatus")
for (i <- 1 to iterationTimes)
{
currentDistance = beginDistance + i*distanceIncrement
- var bufferedArealmRDD = new CircleRDD(arealmRDD,currentDistance)
+ val bufferedArealmRDD = new CircleRDD(arealmRDD,currentDistance)
bufferedArealmRDD.spatialPartitioning(tripRDD.getPartitioner)
// Run Sedona Distance Join Query
- var adjacentMatrix = JoinQuery.DistanceJoinQueryFlat(tripRDD, bufferedArealmRDD,true,true)
+ val adjacentMatrix = JoinQuery.DistanceJoinQueryFlat(tripRDD, bufferedArealmRDD,true,true)
// Uncomment the following two lines if you want to see what the join result looks like in SparkSQL
// import scala.collection.JavaConversions._
- // var adjacentMatrixDf = Adapter.toDf(adjacentMatrix, arealmRDD.fieldNames, tripRDD.fieldNames, sparkSession)
+ // val adjacentMatrixDf = Adapter.toDf(adjacentMatrix, arealmRDD.fieldNames, tripRDD.fieldNames, sparkSession)
// adjacentMatrixDf.show()
- var observedK = adjacentMatrix.count()*area*1.0/(arealmRDD.approximateTotalCount*tripRDD.approximateTotalCount)
- var observedL = Math.sqrt(observedK/Math.PI)
- var expectedL = currentDistance
- var colocationDifference = observedL - expectedL
- var colocationStatus = {if (colocationDifference>0) "Co-located" else "Dispersed"}
+ val observedK = adjacentMatrix.count()*area*1.0/(arealmRDD.approximateTotalCount*tripRDD.approximateTotalCount)
+ val observedL = Math.sqrt(observedK/Math.PI)
+ val expectedL = currentDistance
+ val colocationDifference = observedL - expectedL
+ val colocationStatus = {if (colocationDifference>0) "Co-located" else "Dispersed"}
println(s"""$currentDistance,$observedL,$colocationDifference,$colocationStatus""")
}
diff --git a/examples/spark-sql/src/main/scala/SqlExample.scala b/examples/spark-sql/src/main/scala/SqlExample.scala
index 367f06160f6..ed34c08d2f8 100644
--- a/examples/spark-sql/src/main/scala/SqlExample.scala
+++ b/examples/spark-sql/src/main/scala/SqlExample.scala
@@ -17,42 +17,46 @@
* under the License.
*/
-import Main.resourceFolder
-import org.apache.sedona.core.formatMapper.shapefileParser.ShapefileReader
-import org.apache.sedona.core.spatialRDD.SpatialRDD
import org.apache.sedona.core.utils.SedonaConf
-import org.apache.sedona.sql.utils.Adapter
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{SaveMode, SparkSession}
import org.locationtech.jts.geom.{Coordinate, Geometry, GeometryFactory}
object SqlExample {
+ val resourceFolder = System.getProperty("user.dir")+"/src/test/resources/"
+
val csvPolygonInputLocation = resourceFolder + "testenvelope.csv"
val csvPointInputLocation = resourceFolder + "testpoint.csv"
val shapefileInputLocation = resourceFolder + "shapefiles/dbf"
val rasterdatalocation = resourceFolder + "raster/"
+ /**
+ * Demonstrates predicate pushdown optimization and range join queries with spatial indexing.
+ * Tests ST_Contains predicate with polygon and point data, including spatial filter pushdown.
+ *
+ * @param sedona SparkSession with Sedona extensions enabled
+ */
def testPredicatePushdownAndRangeJonQuery(sedona: SparkSession):Unit =
{
val sedonaConf = new SedonaConf(sedona.conf)
println(sedonaConf)
- var polygonCsvDf = sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPolygonInputLocation)
+ val polygonCsvDf = sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPolygonInputLocation)
polygonCsvDf.createOrReplaceTempView("polygontable")
polygonCsvDf.show()
- var polygonDf = sedona.sql("select ST_PolygonFromEnvelope(cast(polygontable._c0 as Decimal(24,20)),cast(polygontable._c1 as Decimal(24,20)), cast(polygontable._c2 as Decimal(24,20)), cast(polygontable._c3 as Decimal(24,20))) as polygonshape from polygontable")
+ val polygonDf = sedona.sql("select ST_PolygonFromEnvelope(cast(polygontable._c0 as Decimal(24,20)),cast(polygontable._c1 as Decimal(24,20)), cast(polygontable._c2 as Decimal(24,20)), cast(polygontable._c3 as Decimal(24,20))) as polygonshape from polygontable")
polygonDf.createOrReplaceTempView("polygondf")
polygonDf.show()
- var pointCsvDF = sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation)
+ val pointCsvDF = sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation)
pointCsvDF.createOrReplaceTempView("pointtable")
pointCsvDF.show()
- var pointDf = sedona.sql("select ST_Point(cast(pointtable._c0 as Decimal(24,20)),cast(pointtable._c1 as Decimal(24,20))) as pointshape from pointtable")
+ val pointDf = sedona.sql("select ST_Point(cast(pointtable._c0 as Decimal(24,20)),cast(pointtable._c1 as Decimal(24,20))) as pointshape from pointtable")
pointDf.createOrReplaceTempView("pointdf")
pointDf.show()
- var rangeJoinDf = sedona.sql("select * from polygondf, pointdf where ST_Contains(polygondf.polygonshape,pointdf.pointshape) " +
+ val rangeJoinDf = sedona.sql("select * from polygondf, pointdf where ST_Contains(polygondf.polygonshape,pointdf.pointshape) " +
"and ST_Contains(ST_PolygonFromEnvelope(1.0,101.0,501.0,601.0), polygondf.polygonshape)")
// Write result to GeoParquet file
@@ -62,41 +66,53 @@ object SqlExample {
assert (rangeJoinDf.count()==500)
}
+ /**
+ * Demonstrates distance join query that finds all point pairs within a specified distance.
+ * Uses ST_Distance predicate with distance-based join optimization.
+ *
+ * @param sedona SparkSession with Sedona extensions enabled
+ */
def testDistanceJoinQuery(sedona: SparkSession): Unit =
{
val sedonaConf = new SedonaConf(sedona.conf)
println(sedonaConf)
- var pointCsvDF1 = sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation)
+ val pointCsvDF1 = sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation)
pointCsvDF1.createOrReplaceTempView("pointtable")
pointCsvDF1.show()
- var pointDf1 = sedona.sql("select ST_Point(cast(pointtable._c0 as Decimal(24,20)),cast(pointtable._c1 as Decimal(24,20))) as pointshape1 from pointtable")
+ val pointDf1 = sedona.sql("select ST_Point(cast(pointtable._c0 as Decimal(24,20)),cast(pointtable._c1 as Decimal(24,20))) as pointshape1 from pointtable")
pointDf1.createOrReplaceTempView("pointdf1")
pointDf1.show()
- var pointCsvDF2 = sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation)
+ val pointCsvDF2 = sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation)
pointCsvDF2.createOrReplaceTempView("pointtable")
pointCsvDF2.show()
- var pointDf2 = sedona.sql("select ST_Point(cast(pointtable._c0 as Decimal(24,20)),cast(pointtable._c1 as Decimal(24,20))) as pointshape2 from pointtable")
+ val pointDf2 = sedona.sql("select ST_Point(cast(pointtable._c0 as Decimal(24,20)),cast(pointtable._c1 as Decimal(24,20))) as pointshape2 from pointtable")
pointDf2.createOrReplaceTempView("pointdf2")
pointDf2.show()
- var distanceJoinDf = sedona.sql("select * from pointdf1, pointdf2 where ST_Distance(pointdf1.pointshape1,pointdf2.pointshape2) < 2")
+ val distanceJoinDf = sedona.sql("select * from pointdf1, pointdf2 where ST_Distance(pointdf1.pointshape1,pointdf2.pointshape2) < 2")
distanceJoinDf.explain()
distanceJoinDf.show(10)
assert (distanceJoinDf.count()==2998)
}
+ /**
+ * Demonstrates spatial aggregate function ST_Envelope_Aggr to compute bounding box of point set.
+ * Validates the computed envelope matches the expected boundary.
+ *
+ * @param sedona SparkSession with Sedona extensions enabled
+ */
def testAggregateFunction(sedona: SparkSession): Unit =
{
val sedonaConf = new SedonaConf(sedona.conf)
println(sedonaConf)
- var pointCsvDF = sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation)
+ val pointCsvDF = sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation)
pointCsvDF.createOrReplaceTempView("pointtable")
- var pointDf = sedona.sql("select ST_Point(cast(pointtable._c0 as Decimal(24,20)), cast(pointtable._c1 as Decimal(24,20))) as arealandmark from pointtable")
+ val pointDf = sedona.sql("select ST_Point(cast(pointtable._c0 as Decimal(24,20)), cast(pointtable._c1 as Decimal(24,20))) as arealandmark from pointtable")
pointDf.createOrReplaceTempView("pointdf")
- var boundary = sedona.sql("select ST_Envelope_Aggr(pointdf.arealandmark) from pointdf")
+ val boundary = sedona.sql("select ST_Envelope_Aggr(pointdf.arealandmark) from pointdf")
val coordinates:Array[Coordinate] = new Array[Coordinate](5)
coordinates(0) = new Coordinate(1.1,101.1)
coordinates(1) = new Coordinate(1.1,1100.1)
@@ -108,25 +124,92 @@ object SqlExample {
assert(boundary.take(1)(0).get(0)==geometryFactory.createPolygon(coordinates))
}
+ /**
+ * Demonstrates reading shapefiles using the modern DataFrame-based reader.
+ * Shows how to load shapefile data and query geometry and attribute fields.
+ *
+ * @param sedona SparkSession with Sedona extensions enabled
+ */
def testShapefileConstructor(sedona: SparkSession): Unit =
{
- var spatialRDD = new SpatialRDD[Geometry]
- spatialRDD = ShapefileReader.readToGeometryRDD(sedona.sparkContext, shapefileInputLocation)
- var rawSpatialDf = Adapter.toDf(spatialRDD,sedona)
- rawSpatialDf.createOrReplaceTempView("rawSpatialDf")
- var spatialDf = sedona.sql("""
+ // Read shapefile using the DataFrame-based reader
+ val spatialDf = sedona.read.format("shapefile").load(shapefileInputLocation)
+ spatialDf.createOrReplaceTempView("rawSpatialDf")
+
+ // Select specific columns
+ val resultDf = sedona.sql("""
| SELECT geometry, STATEFP, COUNTYFP
| FROM rawSpatialDf
""".stripMargin)
- spatialDf.show()
- spatialDf.printSchema()
+ resultDf.show()
+ resultDf.printSchema()
}
+ /**
+ * Demonstrates raster data I/O and map algebra operations.
+ * Loads GeoTIFF raster data and performs various raster operations.
+ *
+ * @param sedona SparkSession with Sedona extensions enabled
+ */
def testRasterIOAndMapAlgebra(sedona: SparkSession): Unit = {
- var df = sedona.read.format("binaryFile").option("dropInvalid", true).load(rasterdatalocation).selectExpr("RS_FromGeoTiff(content) as raster", "path")
+ val df = sedona.read.format("binaryFile").option("dropInvalid", true).load(rasterdatalocation).selectExpr("RS_FromGeoTiff(content) as raster", "path")
df.printSchema()
df.show()
df.selectExpr("RS_Metadata(raster) as metadata", "RS_GeoReference(raster) as georef", "RS_NumBands(raster) as numBands").show(false)
df.selectExpr("RS_AddBand(raster, raster, 1) as raster_extraband").show()
}
+
+ /**
+ * Demonstrates writing spatial DataFrame to GeoParquet format.
+ * GeoParquet is a cloud-native geospatial data format based on Apache Parquet.
+ *
+ * @param sedona SparkSession with Sedona extensions enabled
+ */
+ def testGeoParquetWriter(sedona: SparkSession): Unit = {
+ // Create a sample DataFrame with geometries
+ val pointCsvDF = sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation)
+ pointCsvDF.createOrReplaceTempView("pointtable")
+ val pointDf = sedona.sql("select ST_Point(cast(pointtable._c0 as Decimal(24,20)),cast(pointtable._c1 as Decimal(24,20))) as geometry from pointtable")
+
+ // Write to GeoParquet format
+ val geoParquetOutputPath = "target/test-classes/output/points.geoparquet"
+ pointDf.write
+ .format("geoparquet")
+ .mode(SaveMode.Overwrite)
+ .save(geoParquetOutputPath)
+
+ println(s"GeoParquet file written to: $geoParquetOutputPath")
+ pointDf.show(5)
+ }
+
+ /**
+ * Demonstrates reading GeoParquet files and performing spatial operations.
+ * Shows how to load GeoParquet data and apply spatial transformations.
+ *
+ * @param sedona SparkSession with Sedona extensions enabled
+ */
+ def testGeoParquetReader(sedona: SparkSession): Unit = {
+ // First, ensure we have a GeoParquet file by writing one
+ testGeoParquetWriter(sedona)
+
+ // Read GeoParquet file
+ val geoParquetInputPath = "target/test-classes/output/points.geoparquet"
+ val geoParquetDf = sedona.read
+ .format("geoparquet")
+ .load(geoParquetInputPath)
+
+ println(s"GeoParquet file read from: $geoParquetInputPath")
+ geoParquetDf.printSchema()
+ geoParquetDf.show(5)
+
+ // Perform spatial operations on the loaded data
+ geoParquetDf.createOrReplaceTempView("geoparquet_points")
+ val bufferedDf = sedona.sql("""
+ | SELECT ST_Buffer(geometry, 0.1) as buffered_geometry
+ | FROM geoparquet_points
+ """.stripMargin)
+
+ println("Applied spatial operations on GeoParquet data:")
+ bufferedDf.show(5)
+ }
}
diff --git a/examples/spark-sql/src/main/scala/VizExample.scala b/examples/spark-sql/src/main/scala/VizExample.scala
index b7b333a1568..1108f4a09da 100644
--- a/examples/spark-sql/src/main/scala/VizExample.scala
+++ b/examples/spark-sql/src/main/scala/VizExample.scala
@@ -17,7 +17,6 @@
* under the License.
*/
-import Main.resourceFolder
import org.apache.sedona.common.enums.FileDataSplitter
import org.apache.sedona.core.enums.{GridType, IndexType}
import org.apache.sedona.core.spatialOperator.JoinQuery
@@ -33,6 +32,8 @@ import java.awt.Color
object VizExample {
+ val resourceFolder = System.getProperty("user.dir")+"/src/test/resources/"
+
val demoOutputPath = "target/demo"
val scatterPlotOutputPath = System.getProperty("user.dir") + "/" + demoOutputPath + "/scatterplot"
@@ -55,27 +56,42 @@ object VizExample {
val PolygonNumPartitions = 5
val USMainLandBoundary = new Envelope(-126.790180, -64.630926, 24.863836, 50.000)
- def buildScatterPlot(sedona: SparkSession): Boolean = {
+ /**
+ * Creates a scatter plot visualization of polygon data.
+ * Generates a PNG image showing spatial distribution of polygons.
+ *
+ * @param sedona SparkSession with Sedona extensions enabled
+ */
+ def buildScatterPlot(sedona: SparkSession): Unit = {
val spatialRDD = new PolygonRDD(sedona.sparkContext, PolygonInputLocation, PolygonSplitter, false, PolygonNumPartitions)
- var visualizationOperator = new ScatterPlot(1000, 600, USMainLandBoundary, false)
+ val visualizationOperator = new ScatterPlot(1000, 600, USMainLandBoundary, false)
visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.GREEN, true)
visualizationOperator.Visualize(sedona.sparkContext, spatialRDD)
- var imageGenerator = new ImageGenerator
+ val imageGenerator = new ImageGenerator
imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, scatterPlotOutputPath, ImageType.PNG)
- true
}
- def buildHeatMap(sedona: SparkSession): Boolean = {
+ /**
+ * Creates a heat map visualization showing density of rectangle geometries.
+ * Generates a PNG image with heat intensity based on spatial clustering.
+ *
+ * @param sedona SparkSession with Sedona extensions enabled
+ */
+ def buildHeatMap(sedona: SparkSession): Unit = {
val spatialRDD = new RectangleRDD(sedona.sparkContext, RectangleInputLocation, RectangleSplitter, false, RectangleNumPartitions)
val visualizationOperator = new HeatMap(1000, 600, USMainLandBoundary, false, 2)
visualizationOperator.Visualize(sedona.sparkContext, spatialRDD)
val imageGenerator = new ImageGenerator
imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, heatMapOutputPath, ImageType.PNG)
- true
}
-
- def buildChoroplethMap(sedona: SparkSession): Boolean = {
+ /**
+ * Creates a choropleth map by performing spatial join and visualizing join counts.
+ * Combines heat map with polygon overlay to show spatial relationships.
+ *
+ * @param sedona SparkSession with Sedona extensions enabled
+ */
+ def buildChoroplethMap(sedona: SparkSession): Unit = {
val spatialRDD = new PointRDD(sedona.sparkContext, PointInputLocation, PointOffset, PointSplitter, false, PointNumPartitions)
val queryRDD = new PolygonRDD(sedona.sparkContext, PolygonInputLocation, PolygonSplitter, false, PolygonNumPartitions)
spatialRDD.spatialPartitioning(GridType.KDBTREE)
@@ -92,20 +108,30 @@ object VizExample {
overlayOperator.JoinImage(frontImage.rasterImage)
val imageGenerator = new ImageGenerator
imageGenerator.SaveRasterImageAsLocalFile(overlayOperator.backRasterImage, choroplethMapOutputPath, ImageType.PNG)
- true
}
- def parallelFilterRenderNoStitch(sedona: SparkSession): Boolean = {
+ /**
+ * Demonstrates parallel rendering without image stitching.
+ * Creates tiled heat map images for distributed rendering.
+ *
+ * @param sedona SparkSession with Sedona extensions enabled
+ */
+ def parallelFilterRenderNoStitch(sedona: SparkSession): Unit = {
val spatialRDD = new RectangleRDD(sedona.sparkContext, RectangleInputLocation, RectangleSplitter, false, RectangleNumPartitions)
val visualizationOperator = new HeatMap(1000, 600, USMainLandBoundary, false, 2, 4, 4, true, true)
visualizationOperator.Visualize(sedona.sparkContext, spatialRDD)
val imageGenerator = new ImageGenerator
imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.distributedRasterImage, parallelFilterRenderOutputPath, ImageType.PNG)
- true
}
- def sqlApiVisualization(sedona: SparkSession): Boolean = {
- var pointDf = sedona.read.format("csv").option("delimiter", ",").option("header", "false").load(PointInputLocation)
+ /**
+ * Demonstrates visualization using Sedona SQL API with pixelization and rendering.
+ * Creates heat map using SQL functions for rasterization and colorization.
+ *
+ * @param sedona SparkSession with Sedona extensions enabled
+ */
+ def sqlApiVisualization(sedona: SparkSession): Unit = {
+ val pointDf = sedona.read.format("csv").option("delimiter", ",").option("header", "false").load(PointInputLocation)
pointDf.selectExpr("ST_Point(cast(_c0 as Decimal(24,20)),cast(_c1 as Decimal(24,20))) as shape")
.filter("ST_Contains(ST_PolygonFromEnvelope(-126.790180,24.863836,-64.630926,50.000),shape)").createOrReplaceTempView("pointtable")
sedona.sql(
@@ -127,8 +153,8 @@ object VizExample {
|SELECT ST_Render(pixel, ST_Colorize(weight, (SELECT max(weight) FROM pixelaggregates), 'red')) AS image
|FROM pixelaggregates
""".stripMargin)
- var image = sedona.table("images").take(1)(0)(0).asInstanceOf[ImageSerializableWrapper].getImage
- var imageGenerator = new ImageGenerator
+ val image = sedona.table("images").take(1)(0)(0).asInstanceOf[ImageSerializableWrapper].getImage
+ val imageGenerator = new ImageGenerator
imageGenerator.SaveRasterImageAsLocalFile(image, sqlApiOutputPath, ImageType.PNG)
sedona.sql(
"""
@@ -137,7 +163,6 @@ object VizExample {
|FROM images
""".stripMargin)
sedona.table("imagestring").show()
- true
}
}
diff --git a/examples/spark-sql/src/test/scala/testFunctions.scala b/examples/spark-sql/src/test/scala/testFunctions.scala
new file mode 100644
index 00000000000..ac6d19a3aa8
--- /dev/null
+++ b/examples/spark-sql/src/test/scala/testFunctions.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.log4j.{Level, Logger}
+import org.apache.sedona.spark.SedonaContext
+import org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator
+import org.apache.sedona.viz.sql.utils.SedonaVizRegistrator
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.funsuite.AnyFunSuite
+import org.apache.spark.sql.SparkSession
+
+class testFunctions extends AnyFunSuite with BeforeAndAfterAll {
+
+ var sedona: SparkSession = _
+
+ override def beforeAll(): Unit = {
+ Logger.getRootLogger().setLevel(Level.WARN)
+
+ // Main object initialization happens on first access
+ // Access resourceFolder to trigger Main's initialization
+ println(s"Resource folder: ${Main.resourceFolder}")
+
+ // Create Spark session with driver JVM options for Java module access
+ val config = SedonaContext.builder().appName("SedonaSQL-test")
+ .master("local[*]")
+ .config("spark.driver.extraJavaOptions",
+ "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED " +
+ "--add-opens=java.base/java.nio=ALL-UNNAMED " +
+ "--add-opens=java.base/java.lang=ALL-UNNAMED " +
+ "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED " +
+ "--add-opens=java.base/java.util=ALL-UNNAMED " +
+ "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED " +
+ "--add-opens=java.base/java.io=ALL-UNNAMED " +
+ "--add-opens=java.base/java.net=ALL-UNNAMED " +
+ "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED " +
+ "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED " +
+ "--add-opens=java.base/sun.nio.cs=ALL-UNNAMED " +
+ "--add-opens=java.base/sun.security.action=ALL-UNNAMED " +
+ "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED")
+ .config("spark.kryo.registrator", classOf[SedonaVizKryoRegistrator].getName)
+ .getOrCreate()
+ sedona = SedonaContext.create(config)
+
+ SedonaVizRegistrator.registerAll(sedona)
+ }
+
+ override def afterAll(): Unit = {
+ if (sedona != null) {
+ sedona.stop()
+ }
+ }
+
+ test("SqlExample - testPredicatePushdownAndRangeJonQuery") {
+ SqlExample.testPredicatePushdownAndRangeJonQuery(sedona)
+ }
+
+ test("SqlExample - testDistanceJoinQuery") {
+ SqlExample.testDistanceJoinQuery(sedona)
+ }
+
+ test("SqlExample - testAggregateFunction") {
+ SqlExample.testAggregateFunction(sedona)
+ }
+
+ test("SqlExample - testShapefileConstructor") {
+ SqlExample.testShapefileConstructor(sedona)
+ }
+
+ test("SqlExample - testRasterIOAndMapAlgebra") {
+ SqlExample.testRasterIOAndMapAlgebra(sedona)
+ }
+
+ test("RddExample - visualizeSpatialColocation") {
+ RddExample.visualizeSpatialColocation(sedona)
+ }
+
+ test("RddExample - calculateSpatialColocation") {
+ RddExample.calculateSpatialColocation(sedona)
+ }
+
+ test("VizExample - buildScatterPlot") {
+ VizExample.buildScatterPlot(sedona)
+ succeed // Test passes if function completes without exception
+ }
+
+ test("VizExample - buildHeatMap") {
+ VizExample.buildHeatMap(sedona)
+ succeed // Test passes if function completes without exception
+ }
+
+ test("VizExample - buildChoroplethMap") {
+ VizExample.buildChoroplethMap(sedona)
+ succeed // Test passes if function completes without exception
+ }
+
+ test("VizExample - parallelFilterRenderNoStitch") {
+ VizExample.parallelFilterRenderNoStitch(sedona)
+ succeed // Test passes if function completes without exception
+ }
+
+ test("VizExample - sqlApiVisualization") {
+ VizExample.sqlApiVisualization(sedona)
+ succeed // Test passes if function completes without exception
+ }
+}