diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index 05782bbae9e..9f1159e57cf 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -85,10 +85,10 @@ jobs: scala: '2.12.8' java: '11' python: '3.9' - - spark: '3.5.0' - scala: '2.12.8' - java: '11' - python: '3.8' +# - spark: '3.5.0' +# scala: '2.12.8' +# java: '11' +# python: '3.8' - spark: '3.4.0' scala: '2.12.8' java: '11' @@ -101,15 +101,15 @@ jobs: scala: '2.12.8' java: '11' python: '3.9' - - spark: '3.4.0' - scala: '2.12.8' - java: '11' - python: '3.8' - - spark: '3.4.0' - scala: '2.12.8' - java: '11' - python: '3.8' - shapely: '1' +# - spark: '3.4.0' +# scala: '2.12.8' +# java: '11' +# python: '3.8' +# - spark: '3.4.0' +# scala: '2.12.8' +# java: '11' +# python: '3.8' +# shapely: '1' steps: - uses: actions/checkout@v6 diff --git a/pom.xml b/pom.xml index d6e4e81319d..071f5233d85 100644 --- a/pom.xml +++ b/pom.xml @@ -631,6 +631,7 @@ org.apache.maven.plugins maven-javadoc-plugin + 2.10.4 diff --git a/python/pyproject.toml b/python/pyproject.toml index b988966c4fc..7795f73962d 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -16,7 +16,7 @@ # under the License. [build-system] -requires = ["setuptools>=69", "wheel"] +requires = ["setuptools>=80.9.0", "wheel", "numpy"] build-backend = "setuptools.build_meta" [project] @@ -26,13 +26,17 @@ description = "Apache Sedona is a cluster computing system for processing large- readme = "README.md" license = { text = "Apache-2.0" } authors = [ { name = "Apache Sedona", email = "dev@sedona.apache.org" } ] -requires-python = ">=3.8" +requires-python = ">=3.9" classifiers = [ "Programming Language :: Python :: 3", "License :: OSI Approved :: Apache Software License", ] dependencies = [ "attrs", + "pyarrow>=16.1.0", + "pyspark==3.5.4", + "sedonadb", + "setuptools==80.9.0", "shapely>=1.7.0", ] diff --git a/python/sedona/spark/sql/functions.py b/python/sedona/spark/sql/functions.py index 2420301d52d..d8bf73c1522 100644 --- a/python/sedona/spark/sql/functions.py +++ b/python/sedona/spark/sql/functions.py @@ -21,12 +21,23 @@ import pandas as pd -from sedona.spark.sql.types import GeometryType from sedona.spark.utils import geometry_serde -from pyspark.sql.udf import UserDefinedFunction -from pyspark.sql.types import DataType from shapely.geometry.base import BaseGeometry +from pyspark.sql.udf import UserDefinedFunction +import pyarrow as pa +import geoarrow.pyarrow as ga +from sedonadb import udf as sedona_udf_module +from sedona.spark.sql.types import GeometryType +from pyspark.sql.types import ( + DataType, + FloatType, + DoubleType, + IntegerType, + StringType, + ByteType, +) +from sedona.spark.utils.udf import has_sedona_serializer_speedup SEDONA_SCALAR_EVAL_TYPE = 5200 SEDONA_PANDAS_ARROW_NAME = "SedonaPandasArrowUDF" @@ -142,3 +153,69 @@ def serialize_to_geometry_if_geom(data, return_type: DataType): return geometry_serde.serialize(data) return data + + +def infer_pa_type(spark_type: DataType): + if isinstance(spark_type, GeometryType): + return ga.wkb() + elif isinstance(spark_type, FloatType): + return pa.float32() + elif isinstance(spark_type, DoubleType): + return pa.float64() + elif isinstance(spark_type, IntegerType): + return pa.int32() + elif isinstance(spark_type, StringType): + return pa.string() + else: + raise NotImplementedError(f"Type {spark_type} is not supported yet.") + + +def infer_input_type(spark_type: DataType): + if isinstance(spark_type, GeometryType): + return sedona_udf_module.GEOMETRY + elif ( + isinstance(spark_type, FloatType) + or isinstance(spark_type, DoubleType) + or isinstance(spark_type, IntegerType) + ): + return sedona_udf_module.NUMERIC + elif isinstance(spark_type, StringType): + return sedona_udf_module.STRING + elif isinstance(spark_type, ByteType): + return sedona_udf_module.BINARY + else: + raise NotImplementedError(f"Type {spark_type} is not supported yet.") + + +def infer_input_types(spark_types: list[DataType]): + pa_types = [] + for spark_type in spark_types: + pa_type = infer_input_type(spark_type) + pa_types.append(pa_type) + + return pa_types + + +def sedona_db_vectorized_udf( + return_type: DataType, + input_types: list[DataType], +): + eval_type = 6200 + if has_sedona_serializer_speedup(): + eval_type = 6201 + + def apply_fn(fn): + out_type = infer_pa_type(return_type) + input_types_sedona_db = infer_input_types(input_types) + + @sedona_udf_module.arrow_udf(out_type, input_types=input_types_sedona_db) + def shapely_udf(*args, **kwargs): + return fn(*args, **kwargs) + + udf = UserDefinedFunction( + lambda: shapely_udf, return_type, "SedonaPandasArrowUDF", evalType=eval_type + ) + + return udf + + return apply_fn diff --git a/python/sedona/spark/utils/udf.py b/python/sedona/spark/utils/udf.py new file mode 100644 index 00000000000..0f88ef07f29 --- /dev/null +++ b/python/sedona/spark/utils/udf.py @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import shapely + + +def has_sedona_serializer_speedup(): + try: + from . import geomserde_speedup + except ImportError: + return False + return True + + +def to_sedona(arr): + try: + from . import geomserde_speedup + except ImportError: + return shapely.to_wkb(arr) + + return geomserde_speedup.to_sedona_func(arr) + + +def from_sedona(arr): + try: + from . import geomserde_speedup + except ImportError: + return shapely.from_wkb(arr) + + return geomserde_speedup.from_sedona_func(arr) diff --git a/python/sedona/spark/worker/__init__.py b/python/sedona/spark/worker/__init__.py new file mode 100644 index 00000000000..13a83393a91 --- /dev/null +++ b/python/sedona/spark/worker/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/python/sedona/spark/worker/daemon.py b/python/sedona/spark/worker/daemon.py new file mode 100644 index 00000000000..0c03dde5b8a --- /dev/null +++ b/python/sedona/spark/worker/daemon.py @@ -0,0 +1,218 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import numbers +import os +import signal +import select +import socket +import sys +import traceback +import time +import gc +from errno import EINTR, EAGAIN +from socket import AF_INET, AF_INET6, SOCK_STREAM, SOMAXCONN +from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN, SIGINT + +from sedona.spark.worker.worker import main as worker_main +from pyspark.serializers import read_int, write_int, write_with_length, UTF8Deserializer + + +def compute_real_exit_code(exit_code): + # SystemExit's code can be integer or string, but os._exit only accepts integers + if isinstance(exit_code, numbers.Integral): + return exit_code + else: + return 1 + + +def worker(sock, authenticated): + """ + Called by a worker process after the fork(). + """ + signal.signal(SIGHUP, SIG_DFL) + signal.signal(SIGCHLD, SIG_DFL) + signal.signal(SIGTERM, SIG_DFL) + # restore the handler for SIGINT, + # it's useful for debugging (show the stacktrace before exit) + signal.signal(SIGINT, signal.default_int_handler) + + # Read the socket using fdopen instead of socket.makefile() because the latter + # seems to be very slow; note that we need to dup() the file descriptor because + # otherwise writes also cause a seek that makes us miss data on the read side. + buffer_size = int(os.environ.get("SPARK_BUFFER_SIZE", 65536)) + infile = os.fdopen(os.dup(sock.fileno()), "rb", buffer_size) + outfile = os.fdopen(os.dup(sock.fileno()), "wb", buffer_size) + + if not authenticated: + client_secret = UTF8Deserializer().loads(infile) + if os.environ["PYTHON_WORKER_FACTORY_SECRET"] == client_secret: + write_with_length(b"ok", outfile) + outfile.flush() + else: + write_with_length(b"err", outfile) + outfile.flush() + sock.close() + return 1 + + exit_code = 0 + try: + worker_main(infile, outfile) + except SystemExit as exc: + exit_code = compute_real_exit_code(exc.code) + finally: + try: + outfile.flush() + except Exception: # nosec + pass + return exit_code + + +def manager(): + # Create a new process group to corral our children + os.setpgid(0, 0) + + # Create a listening socket on the loopback interface + if os.environ.get("SPARK_PREFER_IPV6", "false").lower() == "true": + listen_sock = socket.socket(AF_INET6, SOCK_STREAM) + listen_sock.bind(("::1", 0, 0, 0)) + listen_sock.listen(max(1024, SOMAXCONN)) + listen_host, listen_port, _, _ = listen_sock.getsockname() + else: + listen_sock = socket.socket(AF_INET, SOCK_STREAM) + listen_sock.bind(("127.0.0.1", 0)) + listen_sock.listen(max(1024, SOMAXCONN)) + listen_host, listen_port = listen_sock.getsockname() + + # re-open stdin/stdout in 'wb' mode + stdin_bin = os.fdopen(sys.stdin.fileno(), "rb", 4) + stdout_bin = os.fdopen(sys.stdout.fileno(), "wb", 4) + write_int(listen_port, stdout_bin) + stdout_bin.flush() + + def shutdown(code): + signal.signal(SIGTERM, SIG_DFL) + # Send SIGHUP to notify workers of shutdown + os.kill(0, SIGHUP) + sys.exit(code) + + def handle_sigterm(*args): + shutdown(1) + + signal.signal(SIGTERM, handle_sigterm) # Gracefully exit on SIGTERM + signal.signal(SIGHUP, SIG_IGN) # Don't die on SIGHUP + signal.signal(SIGCHLD, SIG_IGN) + + reuse = os.environ.get("SPARK_REUSE_WORKER") + + # Initialization complete + try: + while True: + try: + ready_fds = select.select([0, listen_sock], [], [], 1)[0] + except OSError as ex: + if ex[0] == EINTR: + continue + else: + raise + + if 0 in ready_fds: + try: + worker_pid = read_int(stdin_bin) + except EOFError: + # Spark told us to exit by closing stdin + shutdown(0) + try: + os.kill(worker_pid, signal.SIGKILL) + except OSError: + pass # process already died + + if listen_sock in ready_fds: + try: + sock, _ = listen_sock.accept() + except OSError as e: + if e.errno == EINTR: + continue + raise + + # Launch a worker process + try: + pid = os.fork() + except OSError as e: + if e.errno in (EAGAIN, EINTR): + time.sleep(1) + pid = os.fork() # error here will shutdown daemon + else: + outfile = sock.makefile(mode="wb") + write_int(e.errno, outfile) # Signal that the fork failed + outfile.flush() + outfile.close() + sock.close() + continue + + if pid == 0: + # in child process + listen_sock.close() + + # It should close the standard input in the child process so that + # Python native function executions stay intact. + # + # Note that if we just close the standard input (file descriptor 0), + # the lowest file descriptor (file descriptor 0) will be allocated, + # later when other file descriptors should happen to open. + # + # Therefore, here we redirects it to '/dev/null' by duplicating + # another file descriptor for '/dev/null' to the standard input (0). + # See SPARK-26175. + devnull = open(os.devnull) + os.dup2(devnull.fileno(), 0) + devnull.close() + + try: + # Acknowledge that the fork was successful + outfile = sock.makefile(mode="wb") + write_int(os.getpid(), outfile) + outfile.flush() + outfile.close() + authenticated = False + while True: + code = worker(sock, authenticated) + if code == 0: + authenticated = True + if not reuse or code: + # wait for closing + try: + while sock.recv(1024): + pass + except Exception: # nosec + pass + break + gc.collect() + except BaseException: + traceback.print_exc() + os._exit(1) + else: + os._exit(0) + else: + sock.close() + + finally: + shutdown(1) + + +if __name__ == "__main__": + manager() diff --git a/python/sedona/spark/worker/serde.py b/python/sedona/spark/worker/serde.py new file mode 100644 index 00000000000..52e7b663a58 --- /dev/null +++ b/python/sedona/spark/worker/serde.py @@ -0,0 +1,76 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from pyspark.serializers import write_int, SpecialLengths +from pyspark.sql.pandas.serializers import ArrowStreamPandasSerializer + +from sedona.spark.worker.udf_info import UDFInfo + + +class SedonaDBSerializer(ArrowStreamPandasSerializer): + def __init__(self, timezone, safecheck, db, udf_info: UDFInfo, cast_to_wkb=False): + super().__init__(timezone, safecheck) + self.db = db + self.udf_info = udf_info + self.cast_to_wkb = cast_to_wkb + + def load_stream(self, stream): + import pyarrow as pa + + batches = super(ArrowStreamPandasSerializer, self).load_stream(stream) + index = 0 + for batch in batches: + table = pa.Table.from_batches(batches=[batch]) + import pyarrow as pa + + df = self.db.create_data_frame(table) + table_name = f"my_table_{index}" + + df.to_view(table_name) + + sql_expression = self.udf_info.sedona_db_transformation_expr( + table_name, self.cast_to_wkb + ) + + index += 1 + + yield self.db.sql(sql_expression) + + def arrow_dump_stream(self, iterator, stream): + import pyarrow as pa + + writer = None + try: + for batch in iterator: + if writer is None: + writer = pa.RecordBatchStreamWriter(stream, batch.schema) + writer.write_batch(batch) + finally: + if writer is not None: + writer.close() + + def dump_stream(self, iterator, stream): + def init_stream_yield_batches(): + should_write_start_length = True + for batch in iterator: + if should_write_start_length: + write_int(SpecialLengths.START_ARROW_STREAM, stream) + should_write_start_length = False + + yield batch + + return self.arrow_dump_stream(init_stream_yield_batches(), stream) diff --git a/python/sedona/spark/worker/udf_info.py b/python/sedona/spark/worker/udf_info.py new file mode 100644 index 00000000000..32a0833f51f --- /dev/null +++ b/python/sedona/spark/worker/udf_info.py @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from dataclasses import dataclass + +from sedona.spark import GeometryType + + +@dataclass +class UDFInfo: + arg_offsets: list + geom_offsets: dict + function: object + return_type: object + name: str + + def get_function_call_sql(self, table_name: str, cast_to_wkb: bool = False) -> str: + arg_offset_str = ", ".join([f"_{el}" for el in self.arg_offsets]) + function_expr = f"{self.name}({arg_offset_str})" + if isinstance(self.return_type, GeometryType) and cast_to_wkb: + return f"SELECT ST_GeomToSedonaSpark({function_expr}) AS _0 FROM {table_name}" # nosec + + return f"SELECT {function_expr} AS _0 FROM {table_name}" # nosec + + def sedona_db_transformation_expr( + self, table_name: str, cast_to_wkb: bool = False + ) -> str: + fields = [] + for arg in self.arg_offsets: + if arg in self.geom_offsets and cast_to_wkb: + crs = self.geom_offsets[arg] + fields.append( + f"ST_GeomFromSedonaSpark(_{arg}, 'EPSG:{crs}') AS _{arg}" + ) # nosec + continue + + fields.append(f"_{arg}") + + fields_expr = ", ".join(fields) + return f"SELECT {fields_expr} FROM {table_name}" # nosec diff --git a/python/sedona/spark/worker/worker.py b/python/sedona/spark/worker/worker.py new file mode 100644 index 00000000000..e31d8e76d1f --- /dev/null +++ b/python/sedona/spark/worker/worker.py @@ -0,0 +1,304 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import importlib +import os +import sys +import time + +import sedonadb +from pyspark import TaskContext, shuffle, SparkFiles +from pyspark.errors import PySparkRuntimeError +from pyspark.java_gateway import local_connect_and_auth +from pyspark.resource import ResourceInformation +from pyspark.serializers import ( + read_int, + UTF8Deserializer, + read_bool, + read_long, + CPickleSerializer, + write_int, + write_long, + SpecialLengths, +) + +from sedona.spark.worker.serde import SedonaDBSerializer +from sedona.spark.worker.udf_info import UDFInfo + + +def apply_iterator(db, iterator, udf_info: UDFInfo, cast_to_wkb: bool = False): + i = 0 + for df in iterator: + i += 1 + table_name = f"output_table_{i}" + df.to_view(table_name) + + function_call_sql = udf_info.get_function_call_sql( + table_name, cast_to_wkb=cast_to_wkb + ) + + df_out = db.sql(function_call_sql) + + df_out.to_view(f"view_{i}") + at = df_out.to_arrow_table() + batches = at.combine_chunks().to_batches() + + yield from batches + + +def check_python_version(utf_serde: UTF8Deserializer, infile) -> str: + version = utf_serde.loads(infile) + + python_major, python_minor = sys.version_info[:2] + + if version != f"{python_major}.{python_minor}": + raise PySparkRuntimeError( + error_class="PYTHON_VERSION_MISMATCH", + message_parameters={ + "worker_version": str(sys.version_info[:2]), + "driver_version": str(version), + }, + ) + + return version + + +def check_barrier_flag(infile): + is_barrier = read_bool(infile) + bound_port = read_int(infile) + secret = UTF8Deserializer().loads(infile) + + if is_barrier: + raise PySparkRuntimeError( + error_class="BARRIER_MODE_NOT_SUPPORTED", + message_parameters={ + "worker_version": str(sys.version_info[:2]), + "message": "Barrier mode is not supported by SedonaDB vectorized functions.", + }, + ) + + return is_barrier + + +def assign_task_context(utf_serde: UTF8Deserializer, infile): + stage_id = read_int(infile) + partition_id = read_int(infile) + attempt_number = read_long(infile) + task_attempt_id = read_int(infile) + cpus = read_int(infile) + + task_context = TaskContext._getOrCreate() + task_context._stage_id = stage_id + task_context._partition_id = partition_id + task_context._attempt_number = attempt_number + task_context._task_attempt_id = task_attempt_id + task_context._cpus = cpus + + for r in range(read_int(infile)): + key = utf_serde.loads(infile) + name = utf_serde.loads(infile) + addresses = [] + task_context._resources = {} + for a in range(read_int(infile)): + addresses.append(utf_serde.loads(infile)) + task_context._resources[key] = ResourceInformation(name, addresses) + + task_context._localProperties = {} + for i in range(read_int(infile)): + k = utf_serde.loads(infile) + v = utf_serde.loads(infile) + task_context._localProperties[k] = v + + return task_context + + +def resolve_python_path(utf_serde: UTF8Deserializer, infile): + def add_path(path: str): + # worker can be used, so do not add path multiple times + if path not in sys.path: + # overwrite system packages + sys.path.insert(1, path) + + spark_files_dir = utf_serde.loads(infile) + + SparkFiles._root_directory = spark_files_dir + SparkFiles._is_running_on_worker = True + + add_path(spark_files_dir) # *.py files that were added will be copied here + num_python_includes = read_int(infile) + for _ in range(num_python_includes): + filename = utf_serde.loads(infile) + add_path(os.path.join(spark_files_dir, filename)) + + importlib.invalidate_caches() + + +def check_broadcast_variables(infile): + needs_broadcast_decryption_server = read_bool(infile) + num_broadcast_variables = read_int(infile) + + if needs_broadcast_decryption_server or num_broadcast_variables > 0: + raise PySparkRuntimeError( + error_class="BROADCAST_VARS_NOT_SUPPORTED", + message_parameters={ + "worker_version": str(sys.version_info[:2]), + "message": "Broadcast variables are not supported by SedonaDB vectorized functions.", + }, + ) + + +def get_runner_conf(utf_serde: UTF8Deserializer, infile): + runner_conf = {} + num_conf = read_int(infile) + for i in range(num_conf): + k = utf_serde.loads(infile) + v = utf_serde.loads(infile) + runner_conf[k] = v + return runner_conf + + +def read_command(serializer, infile): + command = serializer._read_with_length(infile) + return command + + +def read_udf(infile, pickle_ser) -> UDFInfo: + num_arg = read_int(infile) + arg_offsets = [read_int(infile) for i in range(num_arg)] + + function = None + return_type = None + + for i in range(read_int(infile)): + function, return_type = read_command(pickle_ser, infile) + + sedona_db_udf_expression = function() + + return UDFInfo( + arg_offsets=arg_offsets, + function=sedona_db_udf_expression, + return_type=return_type, + name=sedona_db_udf_expression._name, + geom_offsets=[0], + ) + + +def register_sedona_db_udf(infile, pickle_ser) -> UDFInfo: + num_udfs = read_int(infile) + + udf = None + for _ in range(num_udfs): + udf = read_udf(infile, pickle_ser) + + return udf + + +def report_times(outfile, boot, init, finish): + write_int(SpecialLengths.TIMING_DATA, outfile) + write_long(int(1000 * boot), outfile) + write_long(int(1000 * init), outfile) + write_long(int(1000 * finish), outfile) + + +def write_statistics(infile, outfile, boot_time, init_time) -> None: + TaskContext._setTaskContext(None) + finish_time = time.time() + report_times(outfile, boot_time, init_time, finish_time) + write_long(shuffle.MemoryBytesSpilled, outfile) + write_long(shuffle.DiskBytesSpilled, outfile) + + write_int(SpecialLengths.END_OF_DATA_SECTION, outfile) + + if read_int(infile) == SpecialLengths.END_OF_STREAM: + write_int(SpecialLengths.END_OF_STREAM, outfile) + outfile.flush() + else: + write_int(SpecialLengths.END_OF_DATA_SECTION, outfile) + outfile.flush() + sys.exit(-1) + + +def main(infile, outfile): + boot_time = time.time() + sedona_db = sedonadb.connect() + # + utf8_deserializer = UTF8Deserializer() + pickle_ser = CPickleSerializer() + + split_index = read_int(infile) + + check_python_version(utf8_deserializer, infile) + + check_barrier_flag(infile) + + task_context = assign_task_context(utf_serde=utf8_deserializer, infile=infile) + shuffle.MemoryBytesSpilled = 0 + shuffle.DiskBytesSpilled = 0 + + resolve_python_path(utf8_deserializer, infile) + + check_broadcast_variables(infile) + + eval_type = read_int(infile) + + runner_conf = get_runner_conf(utf8_deserializer, infile) + + udf = register_sedona_db_udf(infile, pickle_ser) + + sedona_db.register_udf(udf.function) + init_time = time.time() + + cast_to_wkb = read_bool(infile) + + serde = SedonaDBSerializer( + timezone=runner_conf.get("spark.sql.session.timeZone", "UTC"), + safecheck=False, + db=sedona_db, + udf_info=udf, + cast_to_wkb=cast_to_wkb, + ) + + number_of_geometries = read_int(infile) + geom_offsets = {} + for i in range(number_of_geometries): + geom_index = read_int(infile) + geom_srid = read_int(infile) + + geom_offsets[geom_index] = geom_srid + + udf.geom_offsets = geom_offsets + + iterator = serde.load_stream(infile) + out_iterator = apply_iterator( + db=sedona_db, iterator=iterator, udf_info=udf, cast_to_wkb=cast_to_wkb + ) + + serde.dump_stream(out_iterator, outfile) + + write_statistics(infile, outfile, boot_time=boot_time, init_time=init_time) + + +if __name__ == "__main__": + # add file handler + auth_secret = os.environ["PYTHON_WORKER_FACTORY_SECRET"] + java_port = int(os.environ["PYTHON_WORKER_FACTORY_PORT"]) + (sock_file, sc) = local_connect_and_auth(java_port, auth_secret) + + write_int(os.getpid(), sock_file) + sock_file.flush() + + main(sock_file, sock_file) diff --git a/python/setup.py b/python/setup.py new file mode 100644 index 00000000000..ae5e7bf1746 --- /dev/null +++ b/python/setup.py @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from setuptools import setup +import numpy + +setup( + include_dirs=[numpy.get_include()], +) diff --git a/python/src/geomserde_speedup_module.c b/python/src/geomserde_speedup_module.c index a95ced29e57..99358d6fd76 100644 --- a/python/src/geomserde_speedup_module.c +++ b/python/src/geomserde_speedup_module.c @@ -19,6 +19,9 @@ #define PY_SSIZE_T_CLEAN #include +#include +#include +#include #include #include "geomserde.h" @@ -262,7 +265,110 @@ static PyObject *deserialize_1(PyObject *self, PyObject *args) { return Py_BuildValue("(Kibi)", geom, geom_type_id, has_z, length); } +static PyObject *to_sedona_func(PyObject *self, PyObject *args) { + import_array(); + PyObject *input_obj = NULL; + if (!PyArg_ParseTuple(args, "O", &input_obj)) { + return NULL; + }; + + PyArrayObject *array = (PyArrayObject *)input_obj; + PyObject **objs = (PyObject **)PyArray_DATA(array); + + GEOSContextHandle_t handle = get_geos_context_handle(); + if (handle == NULL) { + return NULL; + } + + npy_intp n = PyArray_SIZE(input_obj); + npy_intp dims[1] = {n}; + PyArrayObject *out = (PyArrayObject *)PyArray_SimpleNew(1, dims, NPY_OBJECT); + for (npy_intp i = 0; i < PyArray_SIZE(array); i++) { + PyObject *obj = objs[i]; + GEOSGeometry *geos_geom = NULL; + char success = PyGEOS_GetGEOSGeometry(obj, &geos_geom); + if (!success || geos_geom == NULL) { + PyErr_SetString(PyExc_TypeError, "Invalid GEOS geometry"); + Py_DECREF(out); + return NULL; + } + + PyObject *serialized = do_serialize(geos_geom); + if (!serialized) { + Py_DECREF(out); + return NULL; + } + + if (PyArray_SETITEM(out, PyArray_GETPTR1(out, i), serialized) < 0) { + Py_DECREF(serialized); + Py_DECREF(out); + return NULL; + } + Py_DECREF(serialized); + } + + return out; +} /* Module definition for Shapely 2.x */ +static PyObject *from_sedona_func(PyObject *self, PyObject *args) { + import_array(); + PyObject *input_obj = NULL; + if (!PyArg_ParseTuple(args, "O", &input_obj)) { + return NULL; + }; + + GEOSContextHandle_t handle = get_geos_context_handle(); + + PyArrayObject *array = (PyArrayObject *)input_obj; + PyObject **objs = (PyObject **)PyArray_DATA(array); + + int p_bytes_read = 0; + + npy_intp n = PyArray_SIZE(input_obj); + + npy_intp dims[1] = {n}; + PyArrayObject *out = (PyArrayObject *)PyArray_SimpleNew(1, dims, NPY_OBJECT); + + for (npy_intp i = 0; i < PyArray_SIZE(array); i++) { + PyObject *obj = objs[i]; + if (!PyBytes_Check(obj)) { + PyErr_SetString(PyExc_TypeError, "Expected bytes"); + Py_DECREF(out); + + return NULL; + } + + char *buf = PyBytes_AS_STRING(obj); + + Py_ssize_t len = PyBytes_GET_SIZE(obj); + + GEOSGeometry *geom = NULL; + + SedonaErrorCode err = + sedona_deserialize_geom(handle, buf, len, &geom, &p_bytes_read); + if (err != SEDONA_SUCCESS) { + handle_geomserde_error(err); + Py_DECREF(out); + return NULL; + } + + PyObject *pygeom = PyGEOS_CreateGeometry(geom, handle); + if (!pygeom) { + Py_DECREF(out); + return NULL; + } + + if (PyArray_SETITEM(out, PyArray_GETPTR1(out, i), pygeom) < 0) { + Py_DECREF(pygeom); + Py_DECREF(out); + return NULL; + } + + Py_DECREF(pygeom); + } + + return out; +} static PyMethodDef geomserde_methods_shapely_2[] = { {"load_libgeos_c", load_libgeos_c, METH_VARARGS, "Load libgeos_c."}, @@ -270,6 +376,10 @@ static PyMethodDef geomserde_methods_shapely_2[] = { "Serialize geometry object as bytearray."}, {"deserialize", deserialize, METH_VARARGS, "Deserialize bytes-like object to geometry object."}, + {"from_sedona_func", from_sedona_func, METH_VARARGS, + "Deserialize bytes-like object to geometry object."}, + {"to_sedona_func", to_sedona_func, METH_VARARGS, + "Deserialize bytes-like object to geometry object."}, {NULL, NULL, 0, NULL}, /* Sentinel */ }; diff --git a/python/tests/test_base.py b/python/tests/test_base.py index cc2b09e422a..39749302072 100644 --- a/python/tests/test_base.py +++ b/python/tests/test_base.py @@ -22,7 +22,7 @@ import pyspark from pyspark.sql import DataFrame -from sedona.spark import * +from sedona.spark import SedonaContext from sedona.spark.utils.decorators import classproperty SPARK_REMOTE = os.getenv("SPARK_REMOTE") @@ -70,7 +70,15 @@ def spark(self): "spark.sedona.stac.load.itemsLimitMax", "20", ) - # Pandas on PySpark doesn't work with ANSI mode, which is enabled by default + .config("spark.executor.memory", "10G") + .config("spark.driver.memory", "10G") + .config( + "sedona.python.worker.udf.daemon.module", + "sedona.spark.worker.daemon", + ) + .config( + "sedona.python.worker.daemon.enabled", "true" + ) # Pandas on PySpark doesn't work with ANSI mode, which is enabled by default # in Spark 4 .config("spark.sql.ansi.enabled", "false") ) diff --git a/python/tests/utils/test_sedona_db_vectorized_udf.py b/python/tests/utils/test_sedona_db_vectorized_udf.py new file mode 100644 index 00000000000..eea84eec91f --- /dev/null +++ b/python/tests/utils/test_sedona_db_vectorized_udf.py @@ -0,0 +1,124 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import numpy as np + +from sedona.spark.sql.functions import sedona_db_vectorized_udf +from sedona.spark.utils.udf import to_sedona, from_sedona +from tests.test_base import TestBase +import pyarrow as pa +import shapely +from sedona.sql import GeometryType +from pyspark.sql.functions import expr, lit +from pyspark.sql.types import DoubleType, IntegerType, ByteType +from sedona.spark.sql import ST_X + + +class TestSedonaDBArrowFunction(TestBase): + def test_vectorized_udf(self): + @sedona_db_vectorized_udf( + return_type=GeometryType(), input_types=[ByteType(), IntegerType()] + ) + def my_own_function(geom, distance): + geom_wkb = pa.array(geom.storage.to_array()) + geometry_array = np.asarray(geom_wkb, dtype=object) + distance = pa.array(distance.to_array()) + geom = from_sedona(geometry_array) + result_shapely = shapely.centroid(geom) + + return pa.array(to_sedona(result_shapely)) + + df = self.spark.createDataFrame( + [ + (1, "POINT (1 1)"), + (2, "POINT (2 2)"), + (3, "POINT (3 3)"), + ], + ["id", "wkt"], + ).withColumn("wkt", expr("ST_GeomFromWKT(wkt)")) + + df.select(ST_X(my_own_function(df.wkt, lit(100)).alias("geom"))).show() + + def test_geometry_to_double(self): + @sedona_db_vectorized_udf(return_type=DoubleType(), input_types=[ByteType()]) + def geometry_to_non_geometry_udf(geom): + geom_wkb = pa.array(geom.storage.to_array()) + geometry_array = np.asarray(geom_wkb, dtype=object) + geom = from_sedona(geometry_array) + + result_shapely = shapely.get_x(shapely.centroid(geom)) + + return pa.array(result_shapely) + + df = self.spark.createDataFrame( + [(1, "POINT (1 1)"), (2, "POINT (2 2)"), (3, "POINT (3 3)")], + ["id", "wkt"], + ).withColumn("wkt", expr("ST_GeomFromWKT(wkt)")) + + values = df.select( + geometry_to_non_geometry_udf(df.wkt).alias("x_coord") + ).collect() + + values_list = [row["x_coord"] for row in values] + + assert values_list == [1.0, 2.0, 3.0] + + def test_geometry_to_int(self): + @sedona_db_vectorized_udf(return_type=IntegerType(), input_types=[ByteType()]) + def geometry_to_int(geom): + geom_wkb = pa.array(geom.storage.to_array()) + geometry_array = np.asarray(geom_wkb, dtype=object) + + geom = from_sedona(geometry_array) + + result_shapely = shapely.get_num_points(geom) + + return pa.array(result_shapely) + + df = self.spark.createDataFrame( + [(1, "POINT (1 1)"), (2, "POINT (2 2)"), (3, "POINT (3 3)")], + ["id", "wkt"], + ).withColumn("wkt", expr("ST_GeomFromWKT(wkt)")) + + values = df.select(geometry_to_int(df.wkt)).collect() + + values_list = [row[0] for row in values] + + assert values_list == [0, 0, 0] + + def test_geometry_crs_preservation(self): + @sedona_db_vectorized_udf(return_type=GeometryType(), input_types=[ByteType()]) + def return_same_geometry(geom): + geom_wkb = pa.array(geom.storage.to_array()) + geometry_array = np.asarray(geom_wkb, dtype=object) + + geom = from_sedona(geometry_array) + + return pa.array(to_sedona(geom)) + + df = self.spark.createDataFrame( + [(1, "POINT (1 1)"), (2, "POINT (2 2)"), (3, "POINT (3 3)")], + ["id", "wkt"], + ).withColumn("wkt", expr("ST_SetSRID(ST_GeomFromWKT(wkt), 3857)")) + + result_df = df.select(return_same_geometry(df.wkt).alias("geom")) + + crs_list = ( + result_df.selectExpr("ST_SRID(geom)").rdd.flatMap(lambda x: x).collect() + ) + + assert crs_list == [3857, 3857, 3857] diff --git a/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala b/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala index b0e46cf6e9e..c9e8497f7ed 100644 --- a/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala +++ b/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala @@ -72,7 +72,7 @@ object SedonaContext { val sedonaArrowStrategy = Try( Class - .forName("org.apache.spark.sql.udf.SedonaArrowStrategy") + .forName("org.apache.spark.sql.execution.python.SedonaArrowStrategy") .getDeclaredConstructor() .newInstance() .asInstanceOf[SparkStrategy]) diff --git a/spark/common/src/main/scala/org/apache/sedona/sql/UDF/PythonEvalType.scala b/spark/common/src/main/scala/org/apache/sedona/sql/UDF/PythonEvalType.scala index aece26267d9..0f1a5fe0a01 100644 --- a/spark/common/src/main/scala/org/apache/sedona/sql/UDF/PythonEvalType.scala +++ b/spark/common/src/main/scala/org/apache/sedona/sql/UDF/PythonEvalType.scala @@ -23,7 +23,16 @@ object PythonEvalType { val SQL_SCALAR_SEDONA_UDF = 5200 val SEDONA_UDF_TYPE_CONSTANT = 5000 + // sedona db eval types + val SQL_SCALAR_SEDONA_DB_UDF = 6200 + val SQL_SCALAR_SEDONA_DB_NO_SPEEDUP_UDF = 6201 + val SEDONA_DB_UDF_TYPE_CONSTANT = 6000 + def toString(pythonEvalType: Int): String = pythonEvalType match { case SQL_SCALAR_SEDONA_UDF => "SQL_SCALAR_GEO_UDF" + case SQL_SCALAR_SEDONA_DB_UDF => "SQL_SCALAR_SEDONA_DB_UDF" } + + def evals(): Set[Int] = + Set(SQL_SCALAR_SEDONA_UDF, SQL_SCALAR_SEDONA_DB_UDF, SQL_SCALAR_SEDONA_DB_NO_SPEEDUP_UDF) } diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowPythonRunner.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowPythonRunner.scala new file mode 100644 index 00000000000..3055e768b98 --- /dev/null +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowPythonRunner.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.sql.execution.python + +import org.apache.spark.api.python._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** + * Similar to `PythonUDFRunner`, but exchange data with Python worker via Arrow stream. + */ +class SedonaArrowPythonRunner( + funcs: Seq[ChainedPythonFunctions], + evalType: Int, + argOffsets: Array[Array[Int]], + protected override val schema: StructType, + protected override val timeZoneId: String, + protected override val largeVarTypes: Boolean, + protected override val workerConf: Map[String, String], + val pythonMetrics: Map[String, SQLMetric], + jobArtifactUUID: Option[String], + geometryFields: Seq[(Int, Int)], + castGeometryToWKB: Boolean = false) + extends SedonaBasePythonRunner[Iterator[InternalRow], ColumnarBatch]( + funcs, + evalType, + argOffsets, + jobArtifactUUID, + geometryFields, + castGeometryToWKB) + with SedonaBasicPythonArrowInput + with SedonaBasicPythonArrowOutput { + + override val errorOnDuplicatedFieldNames: Boolean = true + + override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize + require( + bufferSize >= 4, + "Pandas execution requires more than 4 bytes. Please set higher buffer. " + + s"Please change '${SQLConf.PANDAS_UDF_BUFFER_SIZE.key}'.") +} diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowStrategy.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowStrategy.scala new file mode 100644 index 00000000000..228ddc2cbca --- /dev/null +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowStrategy.scala @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.sql.execution.python + +import org.apache.sedona.sql.UDF.PythonEvalType +import org.apache.sedona.sql.UDF.PythonEvalType.{SQL_SCALAR_SEDONA_DB_NO_SPEEDUP_UDF, SQL_SCALAR_SEDONA_DB_UDF, SQL_SCALAR_SEDONA_UDF} +import org.apache.spark.api.python.ChainedPythonFunctions +import org.apache.spark.sql.Strategy +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericInternalRow, PythonUDF} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.udf.SedonaArrowEvalPython +import org.apache.spark.{JobArtifactSet, TaskContext} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT + +import scala.collection.JavaConverters.asScalaIteratorConverter + +// We use custom Strategy to avoid Apache Spark assert on types, we +// can consider extending this to support other engines working with +// arrow data +class SedonaArrowStrategy extends Strategy { + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case SedonaArrowEvalPython(udfs, output, child, evalType) => + SedonaArrowEvalPythonExec(udfs, output, planLater(child), evalType) :: Nil + case _ => Nil + } +} + +// It's modification og Apache Spark's ArrowEvalPythonExec, we remove the check on the types to allow geometry types +// here, it's initial version to allow the vectorized udf for Sedona geometry types. We can consider extending this +// to support other engines working with arrow data +case class SedonaArrowEvalPythonExec( + udfs: Seq[PythonUDF], + resultAttrs: Seq[Attribute], + child: SparkPlan, + evalType: Int) + extends EvalPythonExec + with PythonSQLMetrics { + + private val batchSize = conf.arrowMaxRecordsPerBatch + private val sessionLocalTimeZone = conf.sessionLocalTimeZone + private val largeVarTypes = conf.arrowUseLargeVarTypes + private val pythonRunnerConf = + Map[String, String](SQLConf.SESSION_LOCAL_TIMEZONE.key -> conf.sessionLocalTimeZone) + private[this] val jobArtifactUUID = JobArtifactSet.getCurrentJobArtifactState.map(_.uuid) + + private def inferCRS(iterator: Iterator[InternalRow], schema: StructType): Seq[(Int, Int)] = { + // this triggers the iterator + if (!iterator.hasNext) { + return Seq.empty + } + + val row = iterator.next() + + val rowMatched = row match { + case generic: GenericInternalRow => + Some(generic) + case _ => None + } + + schema + .filter { field => + field.dataType == GeometryUDT + } + .zipWithIndex + .map { case (_, index) => + if (rowMatched.isEmpty || rowMatched.get.values(index) == null) (index, 0) + else { + val geom = rowMatched.get.get(index, GeometryUDT).asInstanceOf[Array[Byte]] + val preambleByte = geom(0) & 0xff + val hasSrid = (preambleByte & 0x01) != 0 + + var srid = 0 + if (hasSrid) { + val srid2 = (geom(1) & 0xff) << 16 + val srid1 = (geom(2) & 0xff) << 8 + val srid0 = geom(3) & 0xff + srid = srid2 | srid1 | srid0 + } + + (index, srid) + } + } + } + + protected override def evaluate( + funcs: Seq[ChainedPythonFunctions], + argOffsets: Array[Array[Int]], + iter: Iterator[InternalRow], + schema: StructType, + context: TaskContext): Iterator[InternalRow] = { + val (probe, full) = iter.duplicate + + val geometryFields = inferCRS(probe, schema) + + val batchIter = if (batchSize > 0) new BatchIterator(full, batchSize) else Iterator(full) + + evalType match { + case SQL_SCALAR_SEDONA_DB_UDF | SQL_SCALAR_SEDONA_DB_NO_SPEEDUP_UDF => + val columnarBatchIter = new SedonaArrowPythonRunner( + funcs, + 200, + argOffsets, + schema, + sessionLocalTimeZone, + largeVarTypes, + pythonRunnerConf, + pythonMetrics, + jobArtifactUUID, + geometryFields, + evalType == SQL_SCALAR_SEDONA_DB_UDF) + .compute(batchIter, context.partitionId(), context) + + val result = columnarBatchIter.flatMap { batch => + batch.rowIterator.asScala + } + + result + + case SQL_SCALAR_SEDONA_UDF => + val columnarBatchIter = new ArrowPythonRunner( + funcs, + evalType - PythonEvalType.SEDONA_UDF_TYPE_CONSTANT, + argOffsets, + schema, + sessionLocalTimeZone, + largeVarTypes, + pythonRunnerConf, + pythonMetrics, + jobArtifactUUID).compute(batchIter, context.partitionId(), context) + + val iter = columnarBatchIter.flatMap { batch => + batch.rowIterator.asScala + } + + iter + } + } + + override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = + copy(child = newChild) +} diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala new file mode 100644 index 00000000000..055d5db15f1 --- /dev/null +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.sql.execution.python + +import java.net._ +import java.util.concurrent.atomic.AtomicBoolean +import scala.collection.JavaConverters._ +import org.apache.spark._ +import org.apache.spark.api.python.{BasePythonRunner, ChainedPythonFunctions} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.EXECUTOR_CORES +import org.apache.spark.internal.config.Python._ +import org.apache.spark.resource.ResourceProfile.{EXECUTOR_CORES_LOCAL_PROPERTY, PYSPARK_MEMORY_LOCAL_PROPERTY} +import org.apache.spark.util._ + +private object SedonaBasePythonRunner { + + private lazy val faultHandlerLogDir = Utils.createTempDir(namePrefix = "faulthandler") +} + +private[spark] abstract class SedonaBasePythonRunner[IN, OUT]( + funcs: Seq[ChainedPythonFunctions], + evalType: Int, + argOffsets: Array[Array[Int]], + jobArtifactUUID: Option[String], + val geometryFields: Seq[(Int, Int)] = Seq.empty, + val castGeometryToWKB: Boolean = false) + extends BasePythonRunner[IN, OUT](funcs, evalType, argOffsets, jobArtifactUUID) + with Logging { + + require(funcs.length == argOffsets.length, "argOffsets should have the same length as funcs") + + private val conf = SparkEnv.get.conf + private val reuseWorker = + conf.getBoolean(PYTHON_WORKER_REUSE.key, PYTHON_WORKER_REUSE.defaultValue.get) + private val faultHandlerEnabled = conf.get(PYTHON_WORKER_FAULTHANLDER_ENABLED) + + private def getWorkerMemoryMb(mem: Option[Long], cores: Int): Option[Long] = { + mem.map(_ / cores) + } + + import java.io._ + + override def compute( + inputIterator: Iterator[IN], + partitionIndex: Int, + context: TaskContext): Iterator[OUT] = { + val startTime = System.currentTimeMillis + val env = SparkEnv.get + + val execCoresProp = Option(context.getLocalProperty(EXECUTOR_CORES_LOCAL_PROPERTY)) + val memoryMb = Option(context.getLocalProperty(PYSPARK_MEMORY_LOCAL_PROPERTY)).map(_.toLong) + + if (simplifiedTraceback) { + envVars.put("SPARK_SIMPLIFIED_TRACEBACK", "1") + } + // SPARK-30299 this could be wrong with standalone mode when executor + // cores might not be correct because it defaults to all cores on the box. + val execCores = execCoresProp.map(_.toInt).getOrElse(conf.get(EXECUTOR_CORES)) + val workerMemoryMb = getWorkerMemoryMb(memoryMb, execCores) + if (workerMemoryMb.isDefined) { + envVars.put("PYSPARK_EXECUTOR_MEMORY_MB", workerMemoryMb.get.toString) + } + envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString) + envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString) + if (faultHandlerEnabled) { + envVars.put("PYTHON_FAULTHANDLER_DIR", SedonaBasePythonRunner.faultHandlerLogDir.toString) + } + + if (reuseWorker) { + envVars.put("SPARK_REUSE_WORKER", "1") + } + + envVars.put("SPARK_JOB_ARTIFACT_UUID", jobArtifactUUID.getOrElse("default")) + + val (worker: Socket, pid: Option[Int]) = { + WorkerContext.createPythonWorker(pythonExec, envVars.asScala.toMap) + } + + val releasedOrClosed = new AtomicBoolean(false) + + // Start a thread to feed the process input from our parent's iterator + val writerThread = newWriterThread(env, worker, inputIterator, partitionIndex, context) + + context.addTaskCompletionListener[Unit] { _ => + writerThread.shutdownOnTaskCompletion() + + if (!reuseWorker || releasedOrClosed.compareAndSet(false, true)) { + try { + worker.close() + } catch { + case e: Exception => + logWarning("Failed to close worker socket", e) + } + } + } + + writerThread.start() + + val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize)) + + val stdoutIterator = newReaderIterator( + stream, + writerThread, + startTime, + env, + worker, + pid, + releasedOrClosed, + context) + new InterruptibleIterator(context, stdoutIterator) + } +} diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaDBWorkerFactory.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaDBWorkerFactory.scala new file mode 100644 index 00000000000..459388856b0 --- /dev/null +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaDBWorkerFactory.scala @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.sql.execution.python + +import org.apache.spark.{SparkException, SparkFiles} +import org.apache.spark.api.python.PythonUtils +import org.apache.spark.util.Utils + +import java.io.{DataInputStream, DataOutputStream, EOFException, File, InputStream} +import java.net.{InetAddress, ServerSocket, Socket, SocketException} +import java.util.Arrays +import scala.collection.JavaConverters._ +import scala.collection.mutable +import org.apache.spark._ +import org.apache.spark.errors.SparkCoreErrors +import org.apache.spark.internal.Logging +import org.apache.spark.security.SocketAuthHelper +import org.apache.spark.sql.execution.python.SedonaPythonWorkerFactory.PROCESS_WAIT_TIMEOUT_MS +import org.apache.spark.util.RedirectThread + +import javax.annotation.concurrent.GuardedBy + +class SedonaDBWorkerFactory(pythonExec: String, envVars: Map[String, String]) extends Logging { + self => + + private val simpleWorkers = new mutable.WeakHashMap[Socket, Process]() + private val authHelper = new SocketAuthHelper(SparkEnv.get.conf) + @GuardedBy("self") + private var daemon: Process = null + val daemonHost = InetAddress.getLoopbackAddress() + @GuardedBy("self") + private var daemonPort: Int = 0 + @GuardedBy("self") + private val daemonWorkers = new mutable.WeakHashMap[Socket, Int]() + @GuardedBy("self") + private val idleWorkers = new mutable.Queue[Socket]() + @GuardedBy("self") + private var lastActivityNs = 0L + + private val useDaemon: Boolean = + SparkEnv.get.conf.getBoolean("sedona.python.worker.daemon.enabled", false) + + private val sedonaUDFWorkerModule = + SparkEnv.get.conf.get("sedona.python.worker.udf.module", "sedona.spark.worker.worker") + + private val sedonaDaemonModule = + SparkEnv.get.conf.get("sedona.python.worker.udf.daemon.module", "sedona.spark.worker.daemon") + + private val pythonPath = PythonUtils.mergePythonPaths( + PythonUtils.sparkPythonPath, + envVars.getOrElse("PYTHONPATH", ""), + sys.env.getOrElse("PYTHONPATH", "")) + + def create(): (Socket, Option[Int]) = { + if (useDaemon) { + self.synchronized { + if (idleWorkers.nonEmpty) { + val worker = idleWorkers.dequeue() + return (worker, daemonWorkers.get(worker)) + } + } + + createThroughDaemon() + } else { + createSimpleWorker(sedonaUDFWorkerModule) + } + } + + private def createSimpleWorker(workerModule: String): (Socket, Option[Int]) = { + var serverSocket: ServerSocket = null + try { + serverSocket = new ServerSocket(0, 1, InetAddress.getLoopbackAddress()) + + // Create and start the worker + val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", workerModule)) + val jobArtifactUUID = envVars.getOrElse("SPARK_JOB_ARTIFACT_UUID", "default") + if (jobArtifactUUID != "default") { + val f = new File(SparkFiles.getRootDirectory(), jobArtifactUUID) + f.mkdir() + pb.directory(f) + } + val workerEnv = pb.environment() + workerEnv.putAll(envVars.asJava) + workerEnv.put("PYTHONPATH", pythonPath) + // This is equivalent to setting the -u flag; we use it because ipython doesn't support -u: + workerEnv.put("PYTHONUNBUFFERED", "YES") + workerEnv.put("PYTHON_WORKER_FACTORY_PORT", serverSocket.getLocalPort.toString) + workerEnv.put("PYTHON_WORKER_FACTORY_SECRET", authHelper.secret) + if (Utils.preferIPv6) { + workerEnv.put("SPARK_PREFER_IPV6", "True") + } + val worker = pb.start() + + // Redirect worker stdout and stderr + redirectStreamsToStderr(worker.getInputStream, worker.getErrorStream) + + // Wait for it to connect to our socket, and validate the auth secret. + serverSocket.setSoTimeout(10000) + + try { + val socket = serverSocket.accept() + authHelper.authClient(socket) + // TODO: When we drop JDK 8, we can just use worker.pid() + val pid = new DataInputStream(socket.getInputStream).readInt() + if (pid < 0) { + throw new IllegalStateException("Python failed to launch worker with code " + pid) + } + self.synchronized { + simpleWorkers.put(socket, worker) + } + + (socket, Some(pid)) + } catch { + case e: Exception => + throw new SparkException("Python worker failed to connect back.", e) + } + } finally { + if (serverSocket != null) { + serverSocket.close() + } + } + } + + private def redirectStreamsToStderr(stdout: InputStream, stderr: InputStream): Unit = { + try { + new RedirectThread(stdout, System.err, "stdout reader for " + pythonExec).start() + new RedirectThread(stderr, System.err, "stderr reader for " + pythonExec).start() + } catch { + case e: Exception => + logError("Exception in redirecting streams", e) + } + } + + private def createThroughDaemon(): (Socket, Option[Int]) = { + + def createSocket(): (Socket, Option[Int]) = { + val socket = new Socket(daemonHost, daemonPort) + val pid = new DataInputStream(socket.getInputStream).readInt() + if (pid < 0) { + throw new IllegalStateException("Python daemon failed to launch worker with code " + pid) + } + + authHelper.authToServer(socket) + daemonWorkers.put(socket, pid) + (socket, Some(pid)) + } + + self.synchronized { + // Start the daemon if it hasn't been started + startDaemon() + + // Attempt to connect, restart and retry once if it fails + try { + createSocket() + } catch { + case exc: SocketException => + logWarning("Failed to open socket to Python daemon:", exc) + logWarning("Assuming that daemon unexpectedly quit, attempting to restart") + stopDaemon() + startDaemon() + createSocket() + } + } + } + + private def stopDaemon(): Unit = { + self.synchronized { + if (useDaemon) { + cleanupIdleWorkers() + + // Request shutdown of existing daemon by sending SIGTERM + if (daemon != null) { + daemon.destroy() + } + + daemon = null + daemonPort = 0 + } else { + simpleWorkers.mapValues(_.destroy()) + } + } + } + + private def startDaemon(): Unit = { + self.synchronized { + // Is it already running? + if (daemon != null) { + return + } + + try { + // Create and start the daemon + val command = Arrays.asList(pythonExec, "-m", sedonaDaemonModule) + val pb = new ProcessBuilder(command) + val jobArtifactUUID = envVars.getOrElse("SPARK_JOB_ARTIFACT_UUID", "default") + if (jobArtifactUUID != "default") { + val f = new File(SparkFiles.getRootDirectory(), jobArtifactUUID) + f.mkdir() + pb.directory(f) + } + val workerEnv = pb.environment() + workerEnv.putAll(envVars.asJava) + workerEnv.put("PYTHONPATH", pythonPath) + workerEnv.put("PYTHON_WORKER_FACTORY_SECRET", authHelper.secret) + if (Utils.preferIPv6) { + workerEnv.put("SPARK_PREFER_IPV6", "True") + } + // This is equivalent to setting the -u flag; we use it because ipython doesn't support -u: + workerEnv.put("PYTHONUNBUFFERED", "YES") + daemon = pb.start() + + val in = new DataInputStream(daemon.getInputStream) + try { + daemonPort = in.readInt() + } catch { + case _: EOFException if daemon.isAlive => + throw SparkCoreErrors.eofExceptionWhileReadPortNumberError(sedonaDaemonModule) + case _: EOFException => + throw SparkCoreErrors.eofExceptionWhileReadPortNumberError( + sedonaDaemonModule, + Some(daemon.exitValue)) + } + + // test that the returned port number is within a valid range. + // note: this does not cover the case where the port number + // is arbitrary data but is also coincidentally within range + if (daemonPort < 1 || daemonPort > 0xffff) { + val exceptionMessage = f""" + |Bad data in $sedonaDaemonModule's standard output. Invalid port number: + | $daemonPort (0x$daemonPort%08x) + |Python command to execute the daemon was: + | ${command.asScala.mkString(" ")} + |Check that you don't have any unexpected modules or libraries in + |your PYTHONPATH: + | $pythonPath + |Also, check if you have a sitecustomize.py module in your python path, + |or in your python installation, that is printing to standard output""" + throw new SparkException(exceptionMessage.stripMargin) + } + + // Redirect daemon stdout and stderr + redirectStreamsToStderr(in, daemon.getErrorStream) + } catch { + case e: Exception => + // If the daemon exists, wait for it to finish and get its stderr + val stderr = Option(daemon) + .flatMap { d => Utils.getStderr(d, PROCESS_WAIT_TIMEOUT_MS) } + .getOrElse("") + + stopDaemon() + + if (stderr != "") { + val formattedStderr = stderr.replace("\n", "\n ") + val errorMessage = s""" + |Error from python worker: + | $formattedStderr + |PYTHONPATH was: + | $pythonPath + |$e""" + + // Append error message from python daemon, but keep original stack trace + val wrappedException = new SparkException(errorMessage.stripMargin) + wrappedException.setStackTrace(e.getStackTrace) + throw wrappedException + } else { + throw e + } + } + + // Important: don't close daemon's stdin (daemon.getOutputStream) so it can correctly + // detect our disappearance. + } + } + + private def cleanupIdleWorkers(): Unit = { + while (idleWorkers.nonEmpty) { + val worker = idleWorkers.dequeue() + try { + // the worker will exit after closing the socket + worker.close() + } catch { + case e: Exception => + logWarning("Failed to close worker socket", e) + } + } + } + + def releaseWorker(worker: Socket): Unit = { + if (useDaemon) { + self.synchronized { + lastActivityNs = System.nanoTime() + idleWorkers.enqueue(worker) + } + } else { + // Cleanup the worker socket. This will also cause the Python worker to exit. + try { + worker.close() + } catch { + case e: Exception => + logWarning("Failed to close worker socket", e) + } + } + } + + def stopWorker(worker: Socket): Unit = { + self.synchronized { + if (useDaemon) { + if (daemon != null) { + daemonWorkers.get(worker).foreach { pid => + // tell daemon to kill worker by pid + val output = new DataOutputStream(daemon.getOutputStream) + output.writeInt(pid) + output.flush() + daemon.getOutputStream.flush() + } + } + } else { + simpleWorkers.get(worker).foreach(_.destroy()) + } + } + worker.close() + } +} + +private object SedonaPythonWorkerFactory { + val PROCESS_WAIT_TIMEOUT_MS = 10000 +} diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala new file mode 100644 index 00000000000..66029673519 --- /dev/null +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.sql.execution.python + +import org.apache.arrow.vector.VectorSchemaRoot +import org.apache.arrow.vector.ipc.ArrowStreamWriter +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.arrow.ArrowWriter +import org.apache.spark.sql.util.ArrowUtils +import org.apache.spark.util.Utils +import org.apache.spark.{SparkEnv, TaskContext} + +import java.io.DataOutputStream +import java.net.Socket + +private[python] trait SedonaPythonArrowInput[IN] extends PythonArrowInput[IN] { + self: SedonaBasePythonRunner[IN, _] => + protected override def newWriterThread( + env: SparkEnv, + worker: Socket, + inputIterator: Iterator[IN], + partitionIndex: Int, + context: TaskContext): WriterThread = { + new WriterThread(env, worker, inputIterator, partitionIndex, context) { + + protected override def writeCommand(dataOut: DataOutputStream): Unit = { + handleMetadataBeforeExec(dataOut) + writeUDF(dataOut, funcs, argOffsets) + + // if speedup is not available and we need to use casting + dataOut.writeBoolean(self.castGeometryToWKB) + + // write + dataOut.writeInt(self.geometryFields.length) + // write geometry field indices and their SRIDs + geometryFields.foreach { case (index, srid) => + dataOut.writeInt(index) + dataOut.writeInt(srid) + } + } + + protected override def writeIteratorToStream(dataOut: DataOutputStream): Unit = { + val arrowSchema = + ArrowUtils.toArrowSchema(schema, timeZoneId, errorOnDuplicatedFieldNames, largeVarTypes) + val allocator = ArrowUtils.rootAllocator.newChildAllocator( + s"stdout writer for $pythonExec", + 0, + Long.MaxValue) + val root = VectorSchemaRoot.create(arrowSchema, allocator) + + Utils.tryWithSafeFinally { + val writer = new ArrowStreamWriter(root, null, dataOut) + writer.start() + + writeIteratorToArrowStream(root, writer, dataOut, inputIterator) + + writer.end() + } { + root.close() + allocator.close() + } + } + } + } +} + +private[python] trait SedonaBasicPythonArrowInput + extends SedonaPythonArrowInput[Iterator[InternalRow]] { + self: SedonaBasePythonRunner[Iterator[InternalRow], _] => + + protected def writeIteratorToArrowStream( + root: VectorSchemaRoot, + writer: ArrowStreamWriter, + dataOut: DataOutputStream, + inputIterator: Iterator[Iterator[InternalRow]]): Unit = { + val arrowWriter = ArrowWriter.create(root) + while (inputIterator.hasNext) { + val startData = dataOut.size() + val nextBatch = inputIterator.next() + + while (nextBatch.hasNext) { + arrowWriter.write(nextBatch.next()) + } + + arrowWriter.finish() + writer.writeBatch() + arrowWriter.reset() + val deltaData = dataOut.size() - startData + pythonMetrics("pythonDataSent") += deltaData + } + } +} diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala new file mode 100644 index 00000000000..d0316052601 --- /dev/null +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.sql.execution.python + +import java.io.DataInputStream +import java.net.Socket +import java.util.concurrent.atomic.AtomicBoolean +import scala.collection.JavaConverters._ +import org.apache.arrow.vector.VectorSchemaRoot +import org.apache.arrow.vector.ipc.ArrowStreamReader +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python.{BasePythonRunner, SpecialLengths} +import org.apache.spark.internal.config.Python.PYTHON_WORKER_REUSE +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.ArrowUtils +import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnVector, ColumnarBatch} + +private[python] trait SedonaPythonArrowOutput[OUT <: AnyRef] { self: BasePythonRunner[_, OUT] => + + private val reuseWorker = + SparkEnv.get.conf.getBoolean(PYTHON_WORKER_REUSE.key, PYTHON_WORKER_REUSE.defaultValue.get) + + protected def pythonMetrics: Map[String, SQLMetric] + + protected def deserializeColumnarBatch(batch: ColumnarBatch, schema: StructType): OUT + + protected def newReaderIterator( + stream: DataInputStream, + writerThread: WriterThread, + startTime: Long, + env: SparkEnv, + worker: Socket, + pid: Option[Int], + releasedOrClosed: AtomicBoolean, + context: TaskContext): Iterator[OUT] = { + + new ReaderIterator( + stream, + writerThread, + startTime, + env, + worker, + pid, + releasedOrClosed, + context) { + + private val allocator = ArrowUtils.rootAllocator.newChildAllocator( + s"stdin reader for $pythonExec", + 0, + Long.MaxValue) + + private var reader: ArrowStreamReader = _ + private var root: VectorSchemaRoot = _ + private var schema: StructType = _ + private var vectors: Array[ColumnVector] = _ + private var eos = false + private var nextObj: OUT = _ + + context.addTaskCompletionListener[Unit] { _ => + if (reader != null) { + reader.close(false) + } + allocator.close() + } + + private var batchLoaded = true + + protected def handleEndOfDataSectionSedona(): Unit = { + // Check whether the worker is ready to be reused. + if (stream.readInt() == SpecialLengths.END_OF_STREAM) { + if (reuseWorker && releasedOrClosed.compareAndSet(false, true)) { + WorkerContext.releasePythonWorker(pythonExec, envVars.asScala.toMap, worker) + } + } + eos = true + } + + protected override def handleEndOfDataSection(): Unit = { + handleEndOfDataSectionSedona() + } + + override def hasNext: Boolean = nextObj != null || { + if (!eos) { + nextObj = read() + hasNext + } else { + false + } + } + + override def next(): OUT = { + if (hasNext) { + val obj = nextObj + nextObj = null.asInstanceOf[OUT] + obj + } else { + Iterator.empty.next() + } + } + + protected override def read(): OUT = { + if (writerThread.exception.isDefined) { + throw writerThread.exception.get + } + try { + if (reader != null && batchLoaded) { + val bytesReadStart = reader.bytesRead() + batchLoaded = reader.loadNextBatch() + if (batchLoaded) { + val batch = new ColumnarBatch(vectors) + val rowCount = root.getRowCount + batch.setNumRows(root.getRowCount) + val bytesReadEnd = reader.bytesRead() + pythonMetrics("pythonNumRowsReceived") += rowCount + pythonMetrics("pythonDataReceived") += bytesReadEnd - bytesReadStart + deserializeColumnarBatch(batch, schema) + } else { + reader.close(false) + allocator.close() + read() + } + } else { + val specialSign = stream.readInt() + + specialSign match { + case SpecialLengths.START_ARROW_STREAM => + reader = new ArrowStreamReader(stream, allocator) + root = reader.getVectorSchemaRoot() + schema = ArrowUtils.fromArrowSchema(root.getSchema()) + vectors = root + .getFieldVectors() + .asScala + .map { vector => + new ArrowColumnVector(vector) + } + .toArray[ColumnVector] + + read() + case SpecialLengths.TIMING_DATA => + handleTimingData() + read() + case SpecialLengths.PYTHON_EXCEPTION_THROWN => + throw handlePythonException() + case SpecialLengths.END_OF_DATA_SECTION => + handleEndOfDataSection() + null.asInstanceOf[OUT] + } + } + } catch handleException + } + } + } +} + +private[python] trait SedonaBasicPythonArrowOutput + extends SedonaPythonArrowOutput[ColumnarBatch] { + self: BasePythonRunner[_, ColumnarBatch] => + + protected def deserializeColumnarBatch( + batch: ColumnarBatch, + schema: StructType): ColumnarBatch = batch +} diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/WorkerContext.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/WorkerContext.scala new file mode 100644 index 00000000000..6411bec97e4 --- /dev/null +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/WorkerContext.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.sql.execution.python + +import java.net.Socket +import scala.collection.mutable + +object WorkerContext { + + def createPythonWorker( + pythonExec: String, + envVars: Map[String, String]): (java.net.Socket, Option[Int]) = { + synchronized { + val key = (pythonExec, envVars) + pythonWorkers.getOrElseUpdate(key, new SedonaDBWorkerFactory(pythonExec, envVars)).create() + } + } + + def destroyPythonWorker( + pythonExec: String, + envVars: Map[String, String], + worker: Socket): Unit = { + synchronized { + val key = (pythonExec, envVars) + pythonWorkers.get(key).foreach(_.stopWorker(worker)) + } + } + + def releasePythonWorker( + pythonExec: String, + envVars: Map[String, String], + worker: Socket): Unit = { + synchronized { + val key = (pythonExec, envVars) + pythonWorkers.get(key).foreach(_.releaseWorker(worker)) + } + } + + private val pythonWorkers = + mutable.HashMap[(String, Map[String, String]), SedonaDBWorkerFactory]() + +} diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/ExtractSedonaUDFRule.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/ExtractSedonaUDFRule.scala index 3d3301580cc..3584cb01bd7 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/ExtractSedonaUDFRule.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/ExtractSedonaUDFRule.scala @@ -44,9 +44,8 @@ class ExtractSedonaUDFRule extends Rule[LogicalPlan] with Logging { } def isScalarPythonUDF(e: Expression): Boolean = { - e.isInstanceOf[PythonUDF] && e - .asInstanceOf[PythonUDF] - .evalType == PythonEvalType.SQL_SCALAR_SEDONA_UDF + e.isInstanceOf[PythonUDF] && + PythonEvalType.evals.contains(e.asInstanceOf[PythonUDF].evalType) } private def collectEvaluableUDFsFromExpressions( @@ -168,13 +167,12 @@ class ExtractSedonaUDFRule extends Rule[LogicalPlan] with Logging { evalTypes.mkString(",")) } val evalType = evalTypes.head - val evaluation = evalType match { - case PythonEvalType.SQL_SCALAR_SEDONA_UDF => - SedonaArrowEvalPython(validUdfs, resultAttrs, child, evalType) - case _ => - throw new IllegalStateException("Unexpected UDF evalType") + if (!PythonEvalType.evals().contains(evalType)) { + throw new IllegalStateException(s"Unexpected UDF evalType: $evalType") } + val evaluation = SedonaArrowEvalPython(validUdfs, resultAttrs, child, evalType) + attributeMap ++= validUdfs.map(canonicalizeDeterministic).zip(resultAttrs) evaluation } else { diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/SedonaArrowStrategy.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/SedonaArrowStrategy.scala deleted file mode 100644 index a403fa6b9eb..00000000000 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/SedonaArrowStrategy.scala +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.spark.sql.udf - -import org.apache.sedona.sql.UDF.PythonEvalType -import org.apache.spark.api.python.ChainedPythonFunctions -import org.apache.spark.{JobArtifactSet, TaskContext} -import org.apache.spark.sql.Strategy -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, PythonUDF} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.python.{ArrowPythonRunner, BatchIterator, EvalPythonExec, PythonSQLMetrics} -import org.apache.spark.sql.types.StructType - -import scala.collection.JavaConverters.asScalaIteratorConverter - -// We use custom Strategy to avoid Apache Spark assert on types, we -// can consider extending this to support other engines working with -// arrow data -class SedonaArrowStrategy extends Strategy { - override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case SedonaArrowEvalPython(udfs, output, child, evalType) => - SedonaArrowEvalPythonExec(udfs, output, planLater(child), evalType) :: Nil - case _ => Nil - } -} - -// It's modification og Apache Spark's ArrowEvalPythonExec, we remove the check on the types to allow geometry types -// here, it's initial version to allow the vectorized udf for Sedona geometry types. We can consider extending this -// to support other engines working with arrow data -case class SedonaArrowEvalPythonExec( - udfs: Seq[PythonUDF], - resultAttrs: Seq[Attribute], - child: SparkPlan, - evalType: Int) - extends EvalPythonExec - with PythonSQLMetrics { - - private val batchSize = conf.arrowMaxRecordsPerBatch - private val sessionLocalTimeZone = conf.sessionLocalTimeZone - private val largeVarTypes = conf.arrowUseLargeVarTypes - private val pythonRunnerConf = ArrowPythonRunner.getPythonRunnerConfMap(conf) - private[this] val jobArtifactUUID = JobArtifactSet.getCurrentJobArtifactState.map(_.uuid) - - protected override def evaluate( - funcs: Seq[ChainedPythonFunctions], - argOffsets: Array[Array[Int]], - iter: Iterator[InternalRow], - schema: StructType, - context: TaskContext): Iterator[InternalRow] = { - - val batchIter = if (batchSize > 0) new BatchIterator(iter, batchSize) else Iterator(iter) - - val columnarBatchIter = new ArrowPythonRunner( - funcs, - evalType - PythonEvalType.SEDONA_UDF_TYPE_CONSTANT, - argOffsets, - schema, - sessionLocalTimeZone, - largeVarTypes, - pythonRunnerConf, - pythonMetrics, - jobArtifactUUID).compute(batchIter, context.partitionId(), context) - - columnarBatchIter.flatMap { batch => - batch.rowIterator.asScala - } - } - - override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = - copy(child = newChild) -} diff --git a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala index 28943ff11da..50d751f4842 100644 --- a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala +++ b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala @@ -46,6 +46,9 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll { // We need to be explicit about broadcasting in tests. .config("sedona.join.autoBroadcastJoinThreshold", "-1") .config("spark.sql.extensions", "org.apache.sedona.sql.SedonaSqlExtensions") + .config("sedona.python.worker.udf.module", "sedona.spark.worker.worker") + .config("sedona.python.worker.udf.daemon.module", "sedonaworker.daemon") + .config("sedona.python.worker.daemon.enabled", "false") .config(keyParserExtension, ThreadLocalRandom.current().nextBoolean()) .getOrCreate() diff --git a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala index 8d41848de98..94ce194c654 100644 --- a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala +++ b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala @@ -20,8 +20,8 @@ package org.apache.spark.sql.udf import org.apache.sedona.sql.TestBaseScala import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.functions.col -import org.apache.spark.sql.udf.ScalarUDF.geoPandasScalaFunction +import org.apache.spark.sql.functions.{col, expr, lit} +import org.apache.spark.sql.udf.ScalarUDF.{geoPandasScalaFunction, sedonaDBGeometryToGeometryFunction} import org.locationtech.jts.io.WKTReader import org.scalatest.matchers.should.Matchers @@ -35,7 +35,7 @@ class StrategySuite extends TestBaseScala with Matchers { import spark.implicits._ - it("sedona geospatial UDF") { + it("sedona geospatial UDF - geopandas") { val df = Seq( (1, "value", wktReader.read("POINT(21 52)")), (2, "value1", wktReader.read("POINT(20 50)")), @@ -43,11 +43,14 @@ class StrategySuite extends TestBaseScala with Matchers { (4, "value3", wktReader.read("POINT(20 48)")), (5, "value4", wktReader.read("POINT(20 47)"))) .toDF("id", "value", "geom") + + val geopandasUDFDF = df .withColumn("geom_buffer", geoPandasScalaFunction(col("geom"))) - df.count shouldEqual 5 + geopandasUDFDF.count shouldEqual 5 - df.selectExpr("ST_AsText(ST_ReducePrecision(geom_buffer, 2))") + geopandasUDFDF + .selectExpr("ST_AsText(ST_ReducePrecision(geom_buffer, 2))") .as[String] .collect() should contain theSameElementsAs Seq( "POLYGON ((20 51, 20 53, 22 53, 22 51, 20 51))", @@ -56,4 +59,25 @@ class StrategySuite extends TestBaseScala with Matchers { "POLYGON ((19 47, 19 49, 21 49, 21 47, 19 47))", "POLYGON ((19 46, 19 48, 21 48, 21 46, 19 46))") } + + it("sedona geospatial UDF - sedona db") { + val df = Seq( + (1, "value", wktReader.read("POINT(21 52)")), + (2, "value1", wktReader.read("POINT(20 50)")), + (3, "value2", wktReader.read("POINT(20 49)")), + (4, "value3", wktReader.read("POINT(20 48)")), + (5, "value4", wktReader.read("POINT(20 47)"))) + .toDF("id", "value", "geom") + + val dfVectorized = df + .withColumn("geometry", expr("ST_SetSRID(geom, '4326')")) + .select(sedonaDBGeometryToGeometryFunction(col("geometry"), lit(100)).alias("geom")) + + dfVectorized + .selectExpr("ST_X(ST_Centroid(geom)) AS x") + .selectExpr("sum(x)") + .as[Double] + .collect() + .head shouldEqual 101 + } } diff --git a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala index c0a2d8f260d..d2c0d71c703 100644 --- a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala +++ b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala @@ -19,11 +19,13 @@ package org.apache.spark.sql.udf import org.apache.sedona.sql.UDF -import org.apache.spark.TestUtils +import org.apache.spark.{SparkEnv, TestUtils} import org.apache.spark.api.python._ import org.apache.spark.broadcast.Broadcast +import org.apache.spark.internal.config.Python.{PYTHON_USE_DAEMON, PYTHON_WORKER_MODULE} import org.apache.spark.sql.execution.python.UserDefinedPythonFunction import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT +import org.apache.spark.sql.types.FloatType import org.apache.spark.util.Utils import java.io.File @@ -54,7 +56,7 @@ object ScalarUDF { private lazy val isPythonAvailable: Boolean = TestUtils.testCommandAvailable(pythonExec) - lazy val pythonVer: String = if (isPythonAvailable) { + val pythonVer: String = if (isPythonAvailable) { Process( Seq(pythonExec, "-c", "import sys; print('%d.%d' % sys.version_info[:2])"), None, @@ -70,31 +72,85 @@ object ScalarUDF { finally Utils.deleteRecursively(path) } - val pandasFunc: Array[Byte] = { + val additionalModule = "spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf" + + val vectorizedFunction: Array[Byte] = { + var binaryPandasFunc: Array[Byte] = null + withTempPath { path => + Process( + Seq( + pythonExec, + "-c", + f""" + |from pyspark.sql.types import FloatType + |from pyspark.serializers import CloudPickleSerializer + |f = open('$path', 'wb'); + | + |def apply_function_on_number(x): + | return x + 1.0 + |f.write(CloudPickleSerializer().dumps((apply_function_on_number, FloatType()))) + |""".stripMargin), + None, + "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!! + binaryPandasFunc = Files.readAllBytes(path.toPath) + } + assert(binaryPandasFunc != null) + binaryPandasFunc + } + + val sedonaDBGeometryToGeometryFunctionBytes: Array[Byte] = { var binaryPandasFunc: Array[Byte] = null withTempPath { path => - println(path) Process( Seq( pythonExec, "-c", f""" - |from pyspark.sql.types import IntegerType - |from shapely.geometry import Point - |from sedona.sql.types import GeometryType - |from pyspark.serializers import CloudPickleSerializer - |from sedona.utils import geometry_serde - |from shapely import box - |f = open('$path', 'wb'); - |def w(x): - | def apply_function(w): - | geom, offset = geometry_serde.deserialize(w) - | bounds = geom.buffer(1).bounds - | x = box(*bounds) - | return geometry_serde.serialize(x) - | return x.apply(apply_function) - |f.write(CloudPickleSerializer().dumps((w, GeometryType()))) - |""".stripMargin), + |import pyarrow as pa + |import shapely + |import geoarrow.pyarrow as ga + |from sedonadb import udf + |from sedona.sql.types import GeometryType + |from pyspark.serializers import CloudPickleSerializer + |from pyspark.sql.types import DoubleType, IntegerType + |from sedonadb import udf as sedona_udf_module + | + |@sedona_udf_module.arrow_udf(ga.wkb(), [udf.GEOMETRY, udf.NUMERIC]) + |def geometry_udf(geom, distance): + | geom_wkb = pa.array(geom.storage.to_array()) + | distance = pa.array(distance.to_array()) + | geom = shapely.from_wkb(geom_wkb) + | result_shapely = shapely.buffer(geom, distance) + | + | return pa.array(shapely.to_wkb(result_shapely)) + | + |f = open('$path', 'wb'); + |f.write(CloudPickleSerializer().dumps((lambda: geometry_udf, GeometryType()))) + |""".stripMargin), + None, + "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!! + binaryPandasFunc = Files.readAllBytes(path.toPath) + } + assert(binaryPandasFunc != null) + binaryPandasFunc + } + + val geopandasNonGeometryToGeometryFunction: Array[Byte] = { + var binaryPandasFunc: Array[Byte] = null + withTempPath { path => + Process( + Seq( + pythonExec, + "-c", + f""" + |from sedona.sql.types import GeometryType + |from shapely.wkt import loads + |from pyspark.serializers import CloudPickleSerializer + |f = open('$path', 'wb'); + |def apply_geopandas(x): + | return x.apply(lambda wkt: loads(wkt).buffer(1)) + |f.write(CloudPickleSerializer().dumps((apply_geopandas, GeometryType()))) + |""".stripMargin), None, "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!! binaryPandasFunc = Files.readAllBytes(path.toPath) @@ -104,7 +160,39 @@ object ScalarUDF { } private val workerEnv = new java.util.HashMap[String, String]() - workerEnv.put("PYTHONPATH", s"$pysparkPythonPath:$pythonPath") + + val pandasFunc: Array[Byte] = { + var binaryPandasFunc: Array[Byte] = null + withTempPath { path => + println(path) + Process( + Seq( + pythonExec, + "-c", + f""" + |from pyspark.sql.types import IntegerType + |from shapely.geometry import Point + |from sedona.sql.types import GeometryType + |from pyspark.serializers import CloudPickleSerializer + |from sedona.utils import geometry_serde + |from shapely import box + |f = open('$path', 'wb'); + |def w(x): + | def apply_function(w): + | geom, offset = geometry_serde.deserialize(w) + | bounds = geom.buffer(1).bounds + | x = box(*bounds) + | return geometry_serde.serialize(x) + | return x.apply(apply_function) + |f.write(CloudPickleSerializer().dumps((w, GeometryType()))) + |""".stripMargin), + None, + "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!! + binaryPandasFunc = Files.readAllBytes(path.toPath) + } + assert(binaryPandasFunc != null) + binaryPandasFunc + } val geoPandasScalaFunction: UserDefinedPythonFunction = UserDefinedPythonFunction( name = "geospatial_udf", @@ -119,4 +207,33 @@ object ScalarUDF { dataType = GeometryUDT, pythonEvalType = UDF.PythonEvalType.SQL_SCALAR_SEDONA_UDF, udfDeterministic = true) + + val nonGeometryVectorizedUDF: UserDefinedPythonFunction = UserDefinedPythonFunction( + name = "vectorized_udf", + func = SimplePythonFunction( + command = vectorizedFunction, + envVars = workerEnv.clone().asInstanceOf[java.util.Map[String, String]], + pythonIncludes = List.empty[String].asJava, + pythonExec = pythonExec, + pythonVer = pythonVer, + broadcastVars = List.empty[Broadcast[PythonBroadcast]].asJava, + accumulator = null), + dataType = FloatType, + pythonEvalType = PythonEvalType.SQL_SCALAR_PANDAS_UDF, + udfDeterministic = false) + + val sedonaDBGeometryToGeometryFunction: UserDefinedPythonFunction = UserDefinedPythonFunction( + name = "geospatial_udf", + func = SimplePythonFunction( + command = sedonaDBGeometryToGeometryFunctionBytes, + envVars = workerEnv.clone().asInstanceOf[java.util.Map[String, String]], + pythonIncludes = List.empty[String].asJava, + pythonExec = pythonExec, + pythonVer = pythonVer, + broadcastVars = List.empty[Broadcast[PythonBroadcast]].asJava, + accumulator = null), + dataType = GeometryUDT, + pythonEvalType = UDF.PythonEvalType.SQL_SCALAR_SEDONA_DB_UDF, + udfDeterministic = true) + }