Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ For usage within a JavaScript project:
npm install @mat3ra/wode
```

For usage within a Python project:

```bash
pip install mat3ra-wode
```

For development:

```bash
Expand All @@ -49,6 +55,8 @@ See [ESSE](https://github.com/Exabyte-io/esse) for additional context regarding

Useful commands for development:

**JavaScript:**

```bash
# run linter without persistence
npm run lint
Expand All @@ -63,6 +71,18 @@ npm run transpile
npm run test
```

**Python:**

```bash
# run tests
pytest tests/py

# run linter
black src/py tests/py
ruff check src/py tests/py
isort src/py tests/py
```

### Using Linter

Linter setup will prevent committing files that don't adhere to the code standard. It will
Expand Down
6 changes: 5 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ classifiers = [
"Topic :: Software Development",
]
dependencies = [
"numpy",
"mat3ra-esse>=2025.10.8",
"mat3ra-mode>=2025.11.6",
"mat3ra-ade>=2025.10.2",
"mat3ra-utils>=2025.9.20",
"pydantic>=2.0.0",
]

[project.optional-dependencies]
Expand Down
1 change: 0 additions & 1 deletion src/py/mat3ra/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
"""mat3ra namespace package."""

55 changes: 52 additions & 3 deletions src/py/mat3ra/wode/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,55 @@
import numpy as np
"""
WOrkflow DEfinitions - Python implementation.

This package provides Python classes for workflow definitions, including:
- Workflow: A complete computational workflow
- Subworkflow: A logical collection of units
- Units: Individual workflow steps (Execution, Assignment, I/O, etc.)
"""

def get_length(vec: np.ndarray) -> float:
return float(np.linalg.norm(vec))
from .enums import (
IO_ID_COLUMN,
UNIT_NAME_INVALID_CHARS,
UnitStatus,
UnitTag,
UnitType,
WorkflowStatus,
)
from .subworkflows import Subworkflow
from .units import (
AssertionUnit,
AssignmentUnit,
BaseUnit,
ConditionUnit,
ExecutionUnit,
IOUnit,
MapUnit,
ProcessingUnit,
ReduceUnit,
SubworkflowUnit,
)
from .workflows import Workflow

__all__ = [
# Enums and constants
"UnitType",
"UnitStatus",
"UnitTag",
"WorkflowStatus",
"IO_ID_COLUMN",
"UNIT_NAME_INVALID_CHARS",
# Main classes
"Workflow",
"Subworkflow",
# Unit classes
"BaseUnit",
"ExecutionUnit",
"AssignmentUnit",
"ConditionUnit",
"IOUnit",
"MapUnit",
"ReduceUnit",
"AssertionUnit",
"ProcessingUnit",
"SubworkflowUnit",
]
51 changes: 51 additions & 0 deletions src/py/mat3ra/wode/enums.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""
Enums and constants for workflow definitions.
Shared across the codebase and tests.
"""

from enum import Enum


class UnitType(str, Enum):
"""Types of workflow units."""

CONVERGENCE = "convergence"
EXIT = "exit"
EXECUTION = "execution"
MAP = "map"
REDUCE = "reduce"
ASSIGNMENT = "assignment"
CONDITION = "condition"
SUBWORKFLOW = "subworkflow"
PROCESSING = "processing"
IO = "io"
ASSERTION = "assertion"


class UnitStatus(str, Enum):
"""Status of workflow units."""

IDLE = "idle"
ACTIVE = "active"
FINISHED = "finished"
ERROR = "error"
WARNING = "warning"


class UnitTag(str, Enum):
"""Tags for workflow units."""

HAS_CONVERGENCE_PARAM = "hasConvergenceParam"
HAS_CONVERGENCE_RESULT = "hasConvergenceResult"


class WorkflowStatus(str, Enum):
"""Status of workflows."""

UP_TO_DATE = "up-to-date"
OUTDATED = "outdated"


# Constants
IO_ID_COLUMN = "exabyteId"
UNIT_NAME_INVALID_CHARS = "/"
7 changes: 7 additions & 0 deletions src/py/mat3ra/wode/subworkflows/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""Subworkflows module."""

from .subworkflow import Subworkflow

__all__ = [
"Subworkflow",
]
38 changes: 38 additions & 0 deletions src/py/mat3ra/wode/subworkflows/subworkflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""Subworkflow class."""

from typing import Any, Dict, List, Optional
from uuid import uuid4

from pydantic import BaseModel, ConfigDict, Field


class Subworkflow(BaseModel):
"""
Subworkflow class representing a logical collection of units.

A subworkflow groups units together with a specific application and model.
"""

model_config = ConfigDict(extra="allow", populate_by_name=True)

id: str = Field(default_factory=lambda: str(uuid4()), alias="_id", description="Unique ID for the subworkflow")
name: str = Field(..., description="Name of the subworkflow")
application: Dict[str, Any] = Field(..., description="Application configuration")
model: Dict[str, Any] = Field(..., description="Model configuration")
method: Optional[Dict[str, Any]] = Field(default=None, description="Method configuration")
units: List[Dict[str, Any]] = Field(default_factory=list, description="List of units in the subworkflow")
properties: List[str] = Field(default_factory=list, description="Properties of the subworkflow")
repetition: int = Field(default=0, description="Repetition number")

def model_dump(self, **kwargs) -> Dict[str, Any]:
"""Export subworkflow to dictionary format."""
# Use by_alias to maintain _id field name in output
if "by_alias" not in kwargs:
kwargs["by_alias"] = True
return super().model_dump(**kwargs)

def model_dump_json(self, **kwargs) -> str:
"""Export subworkflow to JSON format."""
if "by_alias" not in kwargs:
kwargs["by_alias"] = True
return super().model_dump_json(**kwargs)
25 changes: 25 additions & 0 deletions src/py/mat3ra/wode/units/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""Workflow units module."""

from .assertion import AssertionUnit
from .assignment import AssignmentUnit
from .base import BaseUnit
from .condition import ConditionUnit
from .execution import ExecutionUnit
from .io import IOUnit
from .map import MapUnit
from .processing import ProcessingUnit
from .reduce import ReduceUnit
from .subworkflow import SubworkflowUnit

__all__ = [
"BaseUnit",
"ExecutionUnit",
"AssignmentUnit",
"ConditionUnit",
"IOUnit",
"MapUnit",
"ReduceUnit",
"AssertionUnit",
"ProcessingUnit",
"SubworkflowUnit",
]
19 changes: 19 additions & 0 deletions src/py/mat3ra/wode/units/assertion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""Assertion unit class."""

from typing import Literal, Optional

from pydantic import Field

from ..enums import UnitType
from .base import BaseUnit


class AssertionUnit(BaseUnit):
"""
Assertion unit for validating expressions.

This unit type asserts that an expression evaluates to true.
"""

type: Literal[UnitType.ASSERTION] = Field(default=UnitType.ASSERTION)
expression: Optional[str] = Field(default=None, description="Expression to assert")
20 changes: 20 additions & 0 deletions src/py/mat3ra/wode/units/assignment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""Assignment unit class."""

from typing import Any, Literal, Optional

from pydantic import Field

from ..enums import UnitType
from .base import BaseUnit


class AssignmentUnit(BaseUnit):
"""
Assignment unit for setting values.

This unit type assigns values to variables in the workflow context.
"""

type: Literal[UnitType.ASSIGNMENT] = Field(default=UnitType.ASSIGNMENT)
value: Optional[Any] = Field(default=None, description="Value to assign")
operand: Optional[str] = Field(default=None, description="Target operand")
50 changes: 50 additions & 0 deletions src/py/mat3ra/wode/units/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""Base unit class for workflow units."""

from typing import Any, Dict, List, Optional
from uuid import uuid4

from pydantic import BaseModel, ConfigDict, Field

from ..enums import UnitStatus


class BaseUnit(BaseModel):
"""
Base class for workflow units.

A unit is the fundamental building block of a workflow, representing
a single step or operation in a computational workflow.
"""

model_config = ConfigDict(use_enum_values=True, extra="allow")

name: str = Field(..., description="Human-readable name of the unit")
type: str = Field(..., description="Type of the unit")
flowchartId: str = Field(default_factory=lambda: str(uuid4()), description="Unique ID for flowchart visualization")
head: bool = Field(default=False, description="Whether this is the head unit")
next: Optional[str] = Field(default=None, description="ID of the next unit in the workflow")
status: UnitStatus = Field(default=UnitStatus.IDLE, description="Current status of the unit")
statusTrack: List[Dict[str, Any]] = Field(default_factory=list, description="History of status changes")
tags: List[str] = Field(default_factory=list, description="Tags associated with the unit")
isDraft: bool = Field(default=False, description="Whether this is a draft unit")
repetition: int = Field(default=0, description="Repetition number for the unit")

def is_in_status(self, status: UnitStatus) -> bool:
"""
Check whether a unit is currently in a given status.

Args:
status: Status to check

Returns:
True if unit is in the given status
"""
return self.status == status

def model_dump(self, **kwargs) -> Dict[str, Any]:
"""Export unit to dictionary format."""
return super().model_dump(**kwargs)

def model_dump_json(self, **kwargs) -> str:
"""Export unit to JSON format."""
return super().model_dump_json(**kwargs)
19 changes: 19 additions & 0 deletions src/py/mat3ra/wode/units/condition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""Condition unit class."""

from typing import Literal, Optional

from pydantic import Field

from ..enums import UnitType
from .base import BaseUnit


class ConditionUnit(BaseUnit):
"""
Condition unit for evaluating conditions.

This unit type evaluates conditions to control workflow flow.
"""

type: Literal[UnitType.CONDITION] = Field(default=UnitType.CONDITION)
condition: Optional[str] = Field(default=None, description="Condition to evaluate")
25 changes: 25 additions & 0 deletions src/py/mat3ra/wode/units/execution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""Execution unit class."""

from typing import Any, Dict, Literal, Optional

from pydantic import Field

from ..enums import UnitType
from .base import BaseUnit


class ExecutionUnit(BaseUnit):
"""
Execution unit for running applications.

This unit type executes an application with specific parameters.
"""

type: Literal[UnitType.EXECUTION] = Field(default=UnitType.EXECUTION)
application: Optional[Dict[str, Any]] = Field(default=None, description="Application to execute")
executable: Optional[Dict[str, Any]] = Field(default=None, description="Executable configuration")
flavor: Optional[Dict[str, Any]] = Field(default=None, description="Flavor configuration")
preProcessors: list = Field(default_factory=list, description="Pre-processors to run")
postProcessors: list = Field(default_factory=list, description="Post-processors to run")
monitors: list = Field(default_factory=list, description="Monitors to use")
results: list = Field(default_factory=list, description="Results produced by the unit")
Loading