Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ jobs:
scala: '2.12.8'
java: '11'
python: '3.9'
- spark: '3.5.0'
scala: '2.12.8'
java: '11'
python: '3.8'
# - spark: '3.5.0'
# scala: '2.12.8'
# java: '11'
# python: '3.8'
- spark: '3.4.0'
scala: '2.12.8'
java: '11'
Expand All @@ -101,15 +101,15 @@ jobs:
scala: '2.12.8'
java: '11'
python: '3.9'
- spark: '3.4.0'
scala: '2.12.8'
java: '11'
python: '3.8'
- spark: '3.4.0'
scala: '2.12.8'
java: '11'
python: '3.8'
shapely: '1'
# - spark: '3.4.0'
# scala: '2.12.8'
# java: '11'
# python: '3.8'
# - spark: '3.4.0'
# scala: '2.12.8'
# java: '11'
# python: '3.8'
# shapely: '1'

steps:
- uses: actions/checkout@v6
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<!-- <version>3.12.0</version>-->
<version>2.10.4</version>
<executions>
<execution>
Expand Down
8 changes: 6 additions & 2 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.

[build-system]
requires = ["setuptools>=69", "wheel"]
requires = ["setuptools>=80.9.0", "wheel", "numpy"]
build-backend = "setuptools.build_meta"

[project]
Expand All @@ -26,13 +26,17 @@ description = "Apache Sedona is a cluster computing system for processing large-
readme = "README.md"
license = { text = "Apache-2.0" }
authors = [ { name = "Apache Sedona", email = "dev@sedona.apache.org" } ]
requires-python = ">=3.8"
requires-python = ">=3.9"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: Apache Software License",
]
dependencies = [
"attrs",
"pyarrow>=16.1.0",
"pyspark==3.5.4",
"sedonadb",
"setuptools==80.9.0",
"shapely>=1.7.0",
]

Expand Down
83 changes: 80 additions & 3 deletions python/sedona/spark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,23 @@

import pandas as pd

from sedona.spark.sql.types import GeometryType
from sedona.spark.utils import geometry_serde
from pyspark.sql.udf import UserDefinedFunction
from pyspark.sql.types import DataType
from shapely.geometry.base import BaseGeometry
from pyspark.sql.udf import UserDefinedFunction
import pyarrow as pa
import geoarrow.pyarrow as ga
from sedonadb import udf as sedona_udf_module
from sedona.spark.sql.types import GeometryType
from pyspark.sql.types import (
DataType,
FloatType,
DoubleType,
IntegerType,
StringType,
ByteType,
)

from sedona.spark.utils.udf import has_sedona_serializer_speedup

SEDONA_SCALAR_EVAL_TYPE = 5200
SEDONA_PANDAS_ARROW_NAME = "SedonaPandasArrowUDF"
Expand Down Expand Up @@ -142,3 +153,69 @@ def serialize_to_geometry_if_geom(data, return_type: DataType):
return geometry_serde.serialize(data)

return data


def infer_pa_type(spark_type: DataType):
if isinstance(spark_type, GeometryType):
return ga.wkb()
elif isinstance(spark_type, FloatType):
return pa.float32()
elif isinstance(spark_type, DoubleType):
return pa.float64()
elif isinstance(spark_type, IntegerType):
return pa.int32()
elif isinstance(spark_type, StringType):
return pa.string()
else:
raise NotImplementedError(f"Type {spark_type} is not supported yet.")


def infer_input_type(spark_type: DataType):
if isinstance(spark_type, GeometryType):
return sedona_udf_module.GEOMETRY
elif (
isinstance(spark_type, FloatType)
or isinstance(spark_type, DoubleType)
or isinstance(spark_type, IntegerType)
):
return sedona_udf_module.NUMERIC
elif isinstance(spark_type, StringType):
return sedona_udf_module.STRING
elif isinstance(spark_type, ByteType):
return sedona_udf_module.BINARY
else:
raise NotImplementedError(f"Type {spark_type} is not supported yet.")


def infer_input_types(spark_types: list[DataType]):
pa_types = []
for spark_type in spark_types:
pa_type = infer_input_type(spark_type)
pa_types.append(pa_type)

return pa_types


def sedona_db_vectorized_udf(
return_type: DataType,
input_types: list[DataType],
):
eval_type = 6200
if has_sedona_serializer_speedup():
eval_type = 6201

def apply_fn(fn):
out_type = infer_pa_type(return_type)
input_types_sedona_db = infer_input_types(input_types)

@sedona_udf_module.arrow_udf(out_type, input_types=input_types_sedona_db)
def shapely_udf(*args, **kwargs):
return fn(*args, **kwargs)

udf = UserDefinedFunction(
lambda: shapely_udf, return_type, "SedonaPandasArrowUDF", evalType=eval_type
)

return udf

return apply_fn
44 changes: 44 additions & 0 deletions python/sedona/spark/utils/udf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import shapely


def has_sedona_serializer_speedup():
try:
from . import geomserde_speedup
except ImportError:
return False
return True


def to_sedona(arr):
try:
from . import geomserde_speedup
except ImportError:
return shapely.to_wkb(arr)

return geomserde_speedup.to_sedona_func(arr)


def from_sedona(arr):
try:
from . import geomserde_speedup
except ImportError:
return shapely.from_wkb(arr)

return geomserde_speedup.from_sedona_func(arr)
16 changes: 16 additions & 0 deletions python/sedona/spark/worker/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Loading
Loading