Skip to content

Add support for tagless streams? #64

@brian-arnold

Description

@brian-arnold

After preparing a dataframe called 'df', the following doesn't work:

pipeline_in = op.DataFrameSource(df) # this fails when running pipeline_in to view contents
pipeline_in

With an error message that doesn't explicitly/directly refer to the fact that there are no tags:

---------------------------------------------------------------------------
ArrowInvalid                              Traceback (most recent call last)
File ~/enigma_repos/orcapod_pipeline/.venv/lib/python3.12/site-packages/IPython/core/formatters.py:406, in BaseFormatter.__call__(self, obj)
    404     method = get_real_method(obj, self.print_method)
    405     if method is not None:
--> [406](https://vscode-remote+ssh-002dremote-002bdevenv.vscode-resource.vscode-cdn.net/home/barnold/enigma_repos/orcapod_pipeline/notebooks/~/enigma_repos/orcapod_pipeline/.venv/lib/python3.12/site-packages/IPython/core/formatters.py:406)         return method()
    407     return None
    408 else:

File ~/enigma_repos/orcapod_pipeline/.venv/lib/python3.12/site-packages/orcapod/core/streams/base.py:477, in StatefulStreamBase._repr_html_(self)
    476 def _repr_html_(self) -> str:
--> [477](https://vscode-remote+ssh-002dremote-002bdevenv.vscode-resource.vscode-cdn.net/home/barnold/enigma_repos/orcapod_pipeline/notebooks/~/enigma_repos/orcapod_pipeline/.venv/lib/python3.12/site-packages/orcapod/core/streams/base.py:477)     df = self.as_polars_df()
    478     tag_map = {t: f"*{t}" for t in self.tag_keys()}
    479     # TODO: construct repr html better

File ~/enigma_repos/orcapod_pipeline/.venv/lib/python3.12/site-packages/orcapod/core/streams/base.py:375, in StatefulStreamBase.as_polars_df(self, include_data_context, include_source, include_system_tags, include_content_hash, sort_by_tags, execution_engine, execution_engine_opts)
    361 def as_polars_df(
    362     self,
    363     include_data_context: bool = False,
   (...)    369     execution_engine_opts: dict[str, Any] | None = None,
    370 ) -> "pl.DataFrame":
    371     """
    372     Convert the entire stream to a Polars DataFrame.
    373     """
    374     return pl.DataFrame(
--> [375](https://vscode-remote+ssh-002dremote-002bdevenv.vscode-resource.vscode-cdn.net/home/barnold/enigma_repos/orcapod_pipeline/notebooks/~/enigma_repos/orcapod_pipeline/.venv/lib/python3.12/site-packages/orcapod/core/streams/base.py:375)         self.as_table(
    376             include_data_context=include_data_context,
    377             include_source=include_source,
    378             include_system_tags=include_system_tags,
    379             include_content_hash=include_content_hash,
    380             sort_by_tags=sort_by_tags,
    381             execution_engine=execution_engine,
    382             execution_engine_opts=execution_engine_opts,
    383         )
    384     )

File ~/enigma_repos/orcapod_pipeline/.venv/lib/python3.12/site-packages/orcapod/core/sources/base.py:384, in SourceBase.as_table(self, include_data_context, include_source, include_system_tags, include_content_hash, sort_by_tags, execution_engine, execution_engine_opts)
    373 def as_table(
    374     self,
    375     include_data_context: bool = False,
   (...)    381     execution_engine_opts: dict[str, Any] | None = None,
    382 ) -> "pa.Table":
    383     """Delegate to the cached KernelStream."""
--> [384](https://vscode-remote+ssh-002dremote-002bdevenv.vscode-resource.vscode-cdn.net/home/barnold/enigma_repos/orcapod_pipeline/notebooks/~/enigma_repos/orcapod_pipeline/.venv/lib/python3.12/site-packages/orcapod/core/sources/base.py:384)     return self().as_table(
    385         include_data_context=include_data_context,
    386         include_source=include_source,
    387         include_system_tags=include_system_tags,
    388         include_content_hash=include_content_hash,
    389         sort_by_tags=sort_by_tags,
    390         execution_engine=execution_engine,
    391         execution_engine_opts=execution_engine_opts,
    392     )

File ~/enigma_repos/orcapod_pipeline/.venv/lib/python3.12/site-packages/orcapod/core/streams/kernel_stream.py:190, in KernelStream.as_table(self, include_data_context, include_source, include_system_tags, include_content_hash, sort_by_tags, execution_engine, execution_engine_opts)
    186 self.refresh()
    187 assert self._cached_stream is not None, (
    188     "Stream has not been updated or is empty."
    189 )
--> [190](https://vscode-remote+ssh-002dremote-002bdevenv.vscode-resource.vscode-cdn.net/home/barnold/enigma_repos/orcapod_pipeline/notebooks/~/enigma_repos/orcapod_pipeline/.venv/lib/python3.12/site-packages/orcapod/core/streams/kernel_stream.py:190) return self._cached_stream.as_table(
    191     include_data_context=include_data_context,
    192     include_source=include_source,
    193     include_system_tags=include_system_tags,
    194     include_content_hash=include_content_hash,
    195     sort_by_tags=sort_by_tags,
    196     execution_engine=execution_engine,
    197     execution_engine_opts=execution_engine_opts,
    198 )

File ~/enigma_repos/orcapod_pipeline/.venv/lib/python3.12/site-packages/orcapod/core/streams/table_stream.py:239, in TableStream.as_table(self, include_data_context, include_source, include_system_tags, include_content_hash, sort_by_tags, execution_engine, execution_engine_opts)
    235 try:
    236     target_tags = (
    237         self._all_tag_columns if include_system_tags else self._tag_columns
    238     )
--> [239](https://vscode-remote+ssh-002dremote-002bdevenv.vscode-resource.vscode-cdn.net/home/barnold/enigma_repos/orcapod_pipeline/notebooks/~/enigma_repos/orcapod_pipeline/.venv/lib/python3.12/site-packages/orcapod/core/streams/table_stream.py:239)     return table.sort_by([(column, "ascending") for column in target_tags])
    240 except pa.ArrowTypeError:
    241     # If sorting fails, fall back to unsorted table
    242     return table

File ~/enigma_repos/orcapod_pipeline/.venv/lib/python3.12/site-packages/pyarrow/table.pxi:2157, in pyarrow.lib._Tabular.sort_by()

File ~/enigma_repos/orcapod_pipeline/.venv/lib/python3.12/site-packages/pyarrow/compute.py:269, in _make_generic_wrapper.<locals>.wrapper(memory_pool, options, *args, **kwargs)
    267 if args and isinstance(args[0], Expression):
    268     return Expression._call(func_name, list(args), options)
--> [269](https://vscode-remote+ssh-002dremote-002bdevenv.vscode-resource.vscode-cdn.net/home/barnold/enigma_repos/orcapod_pipeline/notebooks/~/enigma_repos/orcapod_pipeline/.venv/lib/python3.12/site-packages/pyarrow/compute.py:269) return func.call(args, options, memory_pool)

File ~/enigma_repos/orcapod_pipeline/.venv/lib/python3.12/site-packages/pyarrow/_compute.pyx:399, in pyarrow._compute.Function.call()

File ~/enigma_repos/orcapod_pipeline/.venv/lib/python3.12/site-packages/pyarrow/error.pxi:155, in pyarrow.lib.pyarrow_internal_check_status()

File ~/enigma_repos/orcapod_pipeline/.venv/lib/python3.12/site-packages/pyarrow/error.pxi:92, in pyarrow.lib.check_status()

ArrowInvalid: Must specify one or more sort keys

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions