diff --git a/src/orcapod/core/pods.py b/src/orcapod/core/pods.py index 7587881..0379bb5 100644 --- a/src/orcapod/core/pods.py +++ b/src/orcapod/core/pods.py @@ -432,7 +432,7 @@ def call( values = execution_engine.submit_sync( self.function, fn_kwargs=input_dict, - engine_opts=execution_engine_opts, + **(execution_engine_opts or {}), ) else: values = self.function(**input_dict) @@ -490,7 +490,9 @@ async def async_call( if execution_engine is not None: # use the provided execution engine to run the function values = await execution_engine.submit_async( - self.function, fn_kwargs=input_dict, engine_opts=execution_engine_opts + self.function, + fn_kwargs=input_dict, + **(execution_engine_opts or {}) ) else: values = self.function(**input_dict) diff --git a/src/orcapod/core/streams/pod_node_stream.py b/src/orcapod/core/streams/pod_node_stream.py index d0e624c..496bbc6 100644 --- a/src/orcapod/core/streams/pod_node_stream.py +++ b/src/orcapod/core/streams/pod_node_stream.py @@ -95,11 +95,18 @@ async def run_async( .drop_columns([constants.INPUT_PACKET_HASH]) ) - existing = ( - all_results.filter(pc.is_valid(pc.field("_exists"))) - .drop_columns(target_entries.column_names) - .drop_columns(["_exists"]) + existing = all_results.filter( + pc.is_valid(pc.field("_exists")) + ).drop_columns( + [ + "_exists", + constants.INPUT_PACKET_HASH, + constants.PACKET_RECORD_ID, + *self.input_stream.keys()[1], # remove the input packet keys + ] + # TODO: look into NOT fetching back the record ID ) + renamed = [ c.removesuffix("_right") if c.endswith("_right") else c for c in existing.column_names