diff --git a/amber/src/main/python/core/architecture/managers/executor_manager.py b/amber/src/main/python/core/architecture/managers/executor_manager.py index 53e5a8903da..eb1363d0a68 100644 --- a/amber/src/main/python/core/architecture/managers/executor_manager.py +++ b/amber/src/main/python/core/architecture/managers/executor_manager.py @@ -132,13 +132,25 @@ class declaration. :param language: The language of the operator code. :return: """ - assert language not in [ - "r-tuple", - "r-table", - ], "R language is not supported by default. Please consult third party plugin." - executor: type(Operator) = self.load_executor_definition(code) - self.executor = executor() - self.executor.is_source = is_source + if language in ("r-tuple", "r-table"): + # R support is provided by an optional plugin (texera-rudf) + executor_type = "Tuple" if language == "r-tuple" else "Table" + try: + import texera_r + + class_suffix = "SourceExecutor" if is_source else "Executor" + executor_class = getattr(texera_r, f"R{executor_type}{class_suffix}") + except ImportError as e: + raise ImportError( + "R operators require the texera-rudf package.\n" + "Install with: pip install git+https://github.com/Texera/texera-rudf.git\n" + f"Import error: {e}" + ) + self.executor = executor_class(code) + else: + executor: type(Operator) = self.load_executor_definition(code) + self.executor = executor() + self.executor.is_source = is_source assert isinstance(self.executor, SourceOperator) == self.executor.is_source, ( "Please use SourceOperator API for source operators." ) diff --git a/amber/src/main/python/core/architecture/managers/test_executor_manager.py b/amber/src/main/python/core/architecture/managers/test_executor_manager.py index 7728c509b4d..901f768a216 100644 --- a/amber/src/main/python/core/architecture/managers/test_executor_manager.py +++ b/amber/src/main/python/core/architecture/managers/test_executor_manager.py @@ -15,7 +15,9 @@ # specific language governing permissions and limitations # under the License. +import sys import pytest +from unittest.mock import MagicMock from core.architecture.managers.executor_manager import ExecutorManager @@ -39,7 +41,7 @@ def produce(self) -> Iterator[Union[TupleLike, TableLike, None]]: class TestExecutorManager: - """Test suite for ExecutorManager, focusing on R UDF support removal.""" + """Test suite for ExecutorManager, focusing on R UDF plugin support.""" @pytest.fixture def executor_manager(self): @@ -50,6 +52,34 @@ def executor_manager(self): if hasattr(manager, "_fs"): manager.close() + def _mock_r_plugin(self, executor_class_name, is_source): + """ + Helper to mock the texera_r plugin module. + + :param executor_class_name: Name of the executor class (e.g., 'RTupleExecutor') + :param is_source: Whether the executor is a source operator + :return: Tuple of (mock_texera_r, mock_executor_instance) + """ + from core.models import SourceOperator, Operator + + mock_texera_r = MagicMock() + mock_executor_class = MagicMock() + setattr(mock_texera_r, executor_class_name, mock_executor_class) + + # Use appropriate spec based on operator type + spec_class = SourceOperator if is_source else Operator + mock_executor_instance = MagicMock(spec=spec_class) + mock_executor_instance.is_source = is_source + mock_executor_class.return_value = mock_executor_instance + + sys.modules["texera_r"] = mock_texera_r + return mock_texera_r, mock_executor_instance + + def _cleanup_r_plugin(self): + """Remove the mocked texera_r module from sys.modules.""" + if "texera_r" in sys.modules: + del sys.modules["texera_r"] + def test_initialization(self, executor_manager): """Test that ExecutorManager initializes correctly.""" assert executor_manager.executor is None @@ -57,29 +87,73 @@ def test_initialization(self, executor_manager): assert executor_manager.executor_version == 0 def test_reject_r_tuple_language(self, executor_manager): - """Test that 'r-tuple' language is rejected with AssertionError.""" - with pytest.raises(AssertionError) as exc_info: + """Test that 'r-tuple' language is rejected with ImportError when plugin is not available.""" + with pytest.raises(ImportError) as exc_info: executor_manager.initialize_executor( code=SAMPLE_OPERATOR_CODE, is_source=False, language="r-tuple" ) - # Verify the error message mentions R UDF support has been dropped - assert "not supported" in str(exc_info.value) or "dropped" in str( + # Verify the error message mentions R operators require the texera-rudf package + assert "texera-rudf" in str(exc_info.value) or "R operators require" in str( exc_info.value ) def test_reject_r_table_language(self, executor_manager): - """Test that 'r-table' language is rejected with AssertionError.""" - with pytest.raises(AssertionError) as exc_info: + """Test that 'r-table' language is rejected with ImportError when plugin is not available.""" + with pytest.raises(ImportError) as exc_info: executor_manager.initialize_executor( code=SAMPLE_OPERATOR_CODE, is_source=False, language="r-table" ) - # Verify the error message mentions R UDF support has been dropped - assert "not supported" in str(exc_info.value) or "dropped" in str( + # Verify the error message mentions R operators require the texera-rudf package + assert "texera-rudf" in str(exc_info.value) or "R operators require" in str( exc_info.value ) + def test_accept_r_tuple_language_with_plugin(self, executor_manager): + """Test that 'r-tuple' language is accepted when plugin is available.""" + _, mock_executor = self._mock_r_plugin("RTupleExecutor", is_source=False) + try: + executor_manager.initialize_executor( + code="# R code", is_source=False, language="r-tuple" + ) + assert executor_manager.executor == mock_executor + finally: + self._cleanup_r_plugin() + + def test_accept_r_table_language_with_plugin(self, executor_manager): + """Test that 'r-table' language is accepted when plugin is available.""" + _, mock_executor = self._mock_r_plugin("RTableExecutor", is_source=False) + try: + executor_manager.initialize_executor( + code="# R code", is_source=False, language="r-table" + ) + assert executor_manager.executor == mock_executor + finally: + self._cleanup_r_plugin() + + def test_accept_r_tuple_source_with_plugin(self, executor_manager): + """Test that 'r-tuple' source operators work when plugin is available.""" + _, mock_executor = self._mock_r_plugin("RTupleSourceExecutor", is_source=True) + try: + executor_manager.initialize_executor( + code="# R code", is_source=True, language="r-tuple" + ) + assert executor_manager.executor == mock_executor + finally: + self._cleanup_r_plugin() + + def test_accept_r_table_source_with_plugin(self, executor_manager): + """Test that 'r-table' source operators work when plugin is available.""" + _, mock_executor = self._mock_r_plugin("RTableSourceExecutor", is_source=True) + try: + executor_manager.initialize_executor( + code="# R code", is_source=True, language="r-table" + ) + assert executor_manager.executor == mock_executor + finally: + self._cleanup_r_plugin() + def test_accept_python_language_regular_operator(self, executor_manager): """Test that 'python' language is accepted for regular operators.""" # This should not raise any assertion error