Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
894 changes: 502 additions & 392 deletions notebooks/tutorials/01_introduction_to_orcapod.ipynb

Large diffs are not rendered by default.

46 changes: 40 additions & 6 deletions src/orcapod/core/pods.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ def call(
packet: cp.Packet,
record_id: str | None = None,
execution_engine: cp.ExecutionEngine | None = None,
execution_engine_opts: dict[str, Any] | None = None,
) -> tuple[cp.Tag, cp.Packet | None]: ...

@abstractmethod
Expand All @@ -216,6 +217,7 @@ async def async_call(
packet: cp.Packet,
record_id: str | None = None,
execution_engine: cp.ExecutionEngine | None = None,
execution_engine_opts: dict[str, Any] | None = None,
) -> tuple[cp.Tag, cp.Packet | None]: ...

def track_invocation(self, *streams: cp.Stream, label: str | None = None) -> None:
Expand Down Expand Up @@ -408,6 +410,7 @@ def call(
packet: cp.Packet,
record_id: str | None = None,
execution_engine: cp.ExecutionEngine | None = None,
execution_engine_opts: dict[str, Any] | None = None,
) -> tuple[cp.Tag, DictPacket | None]:
if not self.is_active():
logger.info(
Expand All @@ -426,7 +429,11 @@ def call(
with self._tracker_manager.no_tracking():
if execution_engine is not None:
# use the provided execution engine to run the function
values = execution_engine.submit_sync(self.function, **input_dict)
values = execution_engine.submit_sync(
self.function,
fn_kwargs=input_dict,
engine_opts=execution_engine_opts,
)
else:
values = self.function(**input_dict)

Expand Down Expand Up @@ -458,6 +465,7 @@ async def async_call(
packet: cp.Packet,
record_id: str | None = None,
execution_engine: cp.ExecutionEngine | None = None,
execution_engine_opts: dict[str, Any] | None = None,
) -> tuple[cp.Tag, cp.Packet | None]:
"""
Asynchronous call to the function pod. This is a placeholder for future implementation.
Expand All @@ -481,7 +489,9 @@ async def async_call(
input_dict = packet
if execution_engine is not None:
# use the provided execution engine to run the function
values = await execution_engine.submit_async(self.function, **input_dict)
values = await execution_engine.submit_async(
self.function, fn_kwargs=input_dict, engine_opts=execution_engine_opts
)
else:
values = self.function(**input_dict)

Expand Down Expand Up @@ -607,9 +617,14 @@ def call(
packet: cp.Packet,
record_id: str | None = None,
execution_engine: cp.ExecutionEngine | None = None,
execution_engine_opts: dict[str, Any] | None = None,
) -> tuple[cp.Tag, cp.Packet | None]:
return self.pod.call(
tag, packet, record_id=record_id, execution_engine=execution_engine
tag,
packet,
record_id=record_id,
execution_engine=execution_engine,
execution_engine_opts=execution_engine_opts,
)

async def async_call(
Expand All @@ -618,9 +633,14 @@ async def async_call(
packet: cp.Packet,
record_id: str | None = None,
execution_engine: cp.ExecutionEngine | None = None,
execution_engine_opts: dict[str, Any] | None = None,
) -> tuple[cp.Tag, cp.Packet | None]:
return await self.pod.async_call(
tag, packet, record_id=record_id, execution_engine=execution_engine
tag,
packet,
record_id=record_id,
execution_engine=execution_engine,
execution_engine_opts=execution_engine_opts,
)

def kernel_identity_structure(
Expand Down Expand Up @@ -683,6 +703,7 @@ def call(
packet: cp.Packet,
record_id: str | None = None,
execution_engine: cp.ExecutionEngine | None = None,
execution_engine_opts: dict[str, Any] | None = None,
skip_cache_lookup: bool = False,
skip_cache_insert: bool = False,
) -> tuple[cp.Tag, cp.Packet | None]:
Expand All @@ -700,7 +721,11 @@ def call(
print(f"Cache hit for {packet}!")
if output_packet is None:
tag, output_packet = super().call(
tag, packet, record_id=record_id, execution_engine=execution_engine
tag,
packet,
record_id=record_id,
execution_engine=execution_engine,
execution_engine_opts=execution_engine_opts,
)
if (
output_packet is not None
Expand All @@ -717,6 +742,7 @@ async def async_call(
packet: cp.Packet,
record_id: str | None = None,
execution_engine: cp.ExecutionEngine | None = None,
execution_engine_opts: dict[str, Any] | None = None,
skip_cache_lookup: bool = False,
skip_cache_insert: bool = False,
) -> tuple[cp.Tag, cp.Packet | None]:
Expand All @@ -732,14 +758,19 @@ async def async_call(
output_packet = self.get_cached_output_for_packet(packet)
if output_packet is None:
tag, output_packet = await super().async_call(
tag, packet, record_id=record_id, execution_engine=execution_engine
tag,
packet,
record_id=record_id,
execution_engine=execution_engine,
execution_engine_opts=execution_engine_opts,
)
if output_packet is not None and not skip_cache_insert:
self.record_packet(
packet,
output_packet,
record_id=record_id,
execution_engine=execution_engine,
execution_engine_opts=execution_engine_opts,
)

return tag, output_packet
Expand All @@ -754,11 +785,14 @@ def record_packet(
output_packet: cp.Packet,
record_id: str | None = None,
execution_engine: cp.ExecutionEngine | None = None,
execution_engine_opts: dict[str, Any] | None = None,
skip_duplicates: bool = False,
) -> cp.Packet:
"""
Record the output packet against the input packet in the result store.
"""

# TODO: consider incorporating execution_engine_opts into the record
data_table = output_packet.as_table(include_context=True, include_source=True)

for i, (k, v) in enumerate(self.tiered_pod_id.items()):
Expand Down
66 changes: 56 additions & 10 deletions src/orcapod/core/sources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,13 @@ def __iter__(self) -> Iterator[tuple[cp.Tag, cp.Packet]]:
def iter_packets(
self,
execution_engine: cp.ExecutionEngine | None = None,
execution_engine_opts: dict[str, Any] | None = None,
) -> Iterator[tuple[cp.Tag, cp.Packet]]:
"""Delegate to the cached KernelStream."""
return self().iter_packets(execution_engine=execution_engine)
return self().iter_packets(
execution_engine=execution_engine,
execution_engine_opts=execution_engine_opts,
)

def as_table(
self,
Expand All @@ -131,6 +135,7 @@ def as_table(
include_content_hash: bool | str = False,
sort_by_tags: bool = True,
execution_engine: cp.ExecutionEngine | None = None,
execution_engine_opts: dict[str, Any] | None = None,
) -> "pa.Table":
"""Delegate to the cached KernelStream."""
return self().as_table(
Expand All @@ -140,39 +145,57 @@ def as_table(
include_content_hash=include_content_hash,
sort_by_tags=sort_by_tags,
execution_engine=execution_engine,
execution_engine_opts=execution_engine_opts,
)

def flow(
self, execution_engine: cp.ExecutionEngine | None = None
self,
execution_engine: cp.ExecutionEngine | None = None,
execution_engine_opts: dict[str, Any] | None = None,
) -> Collection[tuple[cp.Tag, cp.Packet]]:
"""Delegate to the cached KernelStream."""
return self().flow(execution_engine=execution_engine)
return self().flow(
execution_engine=execution_engine,
execution_engine_opts=execution_engine_opts,
)

def run(
self,
*args: Any,
execution_engine: cp.ExecutionEngine | None = None,
execution_engine_opts: dict[str, Any] | None = None,
**kwargs: Any,
) -> None:
"""
Run the source node, executing the contained source.

This is a no-op for sources since they are not executed like pods.
"""
self().run(*args, execution_engine=execution_engine, **kwargs)
self().run(
*args,
execution_engine=execution_engine,
execution_engine_opts=execution_engine_opts,
**kwargs,
)

async def run_async(
self,
*args: Any,
execution_engine: cp.ExecutionEngine | None = None,
execution_engine_opts: dict[str, Any] | None = None,
**kwargs: Any,
) -> None:
"""
Run the source node asynchronously, executing the contained source.

This is a no-op for sources since they are not executed like pods.
"""
await self().run_async(*args, execution_engine=execution_engine, **kwargs)
await self().run_async(
*args,
execution_engine=execution_engine,
execution_engine_opts=execution_engine_opts,
**kwargs,
)

# ==================== LiveStream Protocol (Delegation) ====================

Expand Down Expand Up @@ -339,9 +362,13 @@ def __iter__(self) -> Iterator[tuple[cp.Tag, cp.Packet]]:
def iter_packets(
self,
execution_engine: cp.ExecutionEngine | None = None,
execution_engine_opts: dict[str, Any] | None = None,
) -> Iterator[tuple[cp.Tag, cp.Packet]]:
"""Delegate to the cached KernelStream."""
return self().iter_packets(execution_engine=execution_engine)
return self().iter_packets(
execution_engine=execution_engine,
execution_engine_opts=execution_engine_opts,
)

def as_table(
self,
Expand All @@ -351,6 +378,7 @@ def as_table(
include_content_hash: bool | str = False,
sort_by_tags: bool = True,
execution_engine: cp.ExecutionEngine | None = None,
execution_engine_opts: dict[str, Any] | None = None,
) -> "pa.Table":
"""Delegate to the cached KernelStream."""
return self().as_table(
Expand All @@ -360,39 +388,57 @@ def as_table(
include_content_hash=include_content_hash,
sort_by_tags=sort_by_tags,
execution_engine=execution_engine,
execution_engine_opts=execution_engine_opts,
)

def flow(
self, execution_engine: cp.ExecutionEngine | None = None
self,
execution_engine: cp.ExecutionEngine | None = None,
execution_engine_opts: dict[str, Any] | None = None,
) -> Collection[tuple[cp.Tag, cp.Packet]]:
"""Delegate to the cached KernelStream."""
return self().flow(execution_engine=execution_engine)
return self().flow(
execution_engine=execution_engine,
execution_engine_opts=execution_engine_opts,
)

def run(
self,
*args: Any,
execution_engine: cp.ExecutionEngine | None = None,
execution_engine_opts: dict[str, Any] | None = None,
**kwargs: Any,
) -> None:
"""
Run the source node, executing the contained source.

This is a no-op for sources since they are not executed like pods.
"""
self().run(*args, execution_engine=execution_engine, **kwargs)
self().run(
*args,
execution_engine=execution_engine,
execution_engine_opts=execution_engine_opts,
**kwargs,
)

async def run_async(
self,
*args: Any,
execution_engine: cp.ExecutionEngine | None = None,
execution_engine_opts: dict[str, Any] | None = None,
**kwargs: Any,
) -> None:
"""
Run the source node asynchronously, executing the contained source.

This is a no-op for sources since they are not executed like pods.
"""
await self().run_async(*args, execution_engine=execution_engine, **kwargs)
await self().run_async(
*args,
execution_engine=execution_engine,
execution_engine_opts=execution_engine_opts,
**kwargs,
)

# ==================== LiveStream Protocol (Delegation) ====================

Expand Down
Loading
Loading