diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 8e75f096..f63a0551 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -1,7 +1,7 @@ # This workflow installs required Python dependencies and then runs the available tests. # For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python -name: Run Acceptance and Unit tests +name: Run Acceptance and Unit tests (with and without visualisation dependencies) on: pull_request: @@ -21,10 +21,18 @@ jobs: uses: actions/setup-python@v3 with: python-version: "3.10" # Only the oldest supported Python version is included here - - name: Install dependencies - run: | - python -m pip install --upgrade pip # upgrade pip to latest version - pip install . # install pyproject.toml dependencies - excludes optional dependencies (such as visualisation) - - name: Run tests + + - name: Install dependencies (without extra visualisation dependencies) run: | - python run_tests.py + python -m pip install --upgrade pip # upgrade pip to latest version + pip install . # no additional [visualization] dependencies + + - name: Run tests (without visualisation dependencies) + run: python run_tests.py + + - name: Install extra visualisation dependencies + run: pip install ".[visualization]" # extra [visualization] dependencies in pyproject.toml + + - name: Run Tests (with visualisation dependencies) + run: python run_tests.py + diff --git a/.gitignore b/.gitignore index 727c718a..ce6f945a 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,10 @@ __pycache__/ # C extensions *.so +# VPP (Visual Paradigm) files +*.vpp.bak_* +*.vpp.lck + # Distribution / packaging .Python build/ @@ -35,6 +39,9 @@ results/ *.manifest *.spec +# Ignore pyenv Pipfile +Pipfile + # Installer logs pip-log.txt pip-delete-this-directory.txt @@ -152,4 +159,7 @@ cython_debug/ # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ +.idea/ + +# json documents for graphs +json/ \ No newline at end of file diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 8c7559fe..1da21949 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -33,7 +33,6 @@ When reporting a defect, be precise and concise in your description. Write in wa Note that all information in the issue tracker is public. *Do not include any confidential information there*. Be sure to add information about: - - The applicable version(s) of RobotMBT (use `pip list` and check for `robotframework-mbt`) - Your Robot Framework version (use `pip list` and check for `robotframework`) - Your Python version (check using `python --version`) @@ -114,3 +113,117 @@ Researchers have suggested that longer lines are better suited for cases when th - Information that is useful for analysing failed tests is logged at debug-level. - Be careful not to make assumptions in what you log: Recheck log statements if your changes affect the context in which the code is run, and only report about what you know to be true. + +### Creating new graphs + +Extending the functionality of the visualizer with new graph types can result in better insights into created tests. The visualizer makes use of an abstract graph class that makes it easy to create new graph types. + +To create a new graph type, create an instance of `robotmbt/visualise/graphs/AbstractGraph`, instantiating the abstract methods. Please place the graph under `robotmbt/visualise/graphs/`. + +**NOTE**: when manually altering the `networkx` field, ensure its IDs remain as a serializable and hashable type when the constructor finishes. + +As an example, we show the implementation of the scenario graph below. In this graph type, nodes represent scenarios encountered in exploration, and edges show the flow between these scenarios. +It does not enable tooltips. + +```python +class ScenarioGraph(AbstractGraph[ScenarioInfo, None]): + @staticmethod + def select_node_info(trace: list[tuple[ScenarioInfo, StateInfo]], index: int) -> ScenarioInfo: + return trace[index][0] + + @staticmethod + def select_edge_info(pair: tuple[ScenarioInfo, StateInfo]) -> None: + return None + + @staticmethod + def create_node_description(trace: list[tuple[ScenarioInfo, StateInfo]], index: int) -> str: + return '' + + @staticmethod + def create_node_label(info: ScenarioInfo) -> str: + return info.name + + @staticmethod + def create_edge_label(info: None) -> str: + return '' + + @staticmethod + def get_legend_info_final_trace_node() -> str: + return "Executed Scenario (in final trace)" + + @staticmethod + def get_legend_info_other_node() -> str: + return "Executed Scenario (backtracked)" + + @staticmethod + def get_legend_info_final_trace_edge() -> str: + return "Execution Flow (final trace)" + + @staticmethod + def get_legend_info_other_edge() -> str: + return "Execution Flow (backtracked)" + + @staticmethod + def get_tooltip_name() -> str: + return "" +``` + +Once you have created a new Graph class, you can direct the visualizer to use it when your type is selected. +Simply add your class to the `GRAPHS` dictionary in `robotmbt/visualise/visualiser.py` like the others. For our example: + +```python +GRAPHS = { + ... + 'scenario': ScenarioGraph, + ... +} +``` + +Now, when selecting your graph type (in our example `Treat this test suite Model-based graph=scenario`), your graph will get constructed! + + +## Development Tips +### Python virtual environment +Installing the proper virtual environment can be done with the default `python -m venv ./.venv` command built into python. However, if you have another version of python on your system, this might break dependencies. + +#### Pipenv+Pyenv (verified on Windows and Linux) +For the optimal experience (at least on Linux), we suggest installing the following packages: +- [`pyenv`](https://github.com/pyenv/pyenv) (Linux/Mac) or [`pyenv-win`](https://github.com/pyenv-win/pyenv-win) (Windows) +- [`pipenv`](https://github.com/pypa/pipenv) + +Then, you can install a python virtual environment with: + +```bash +pipenv --python +``` +..where the python version can be found in the `pyproject.toml`. For example, for 3.10: `pipenv --python 3.10`. + +You might need to manually make the folder `.venv` by doing `mkdir .venv`. + +You can verify if the installation went correctly with: +```bash +pipenv check +``` +This should return `Passed!` + +Errors related to minor versions (for example `3.10.0rc2` != `3.10.0`) can be ignored. + +Now activate the virtual environment by running +```bash +pipenv shell +``` + +..and you should have a virtual env! If you run +```bash +python --version +``` +..while in your virtual environment, it should show the `` from before. + + +### Installing dependencies +***NOTE: making sure that you are in the virtual environment***. + +It is recommended that you also include the optional dependencies for visualisation, e.g.: +```bash +pip install ".[visualization]" +``` diff --git a/README.md b/README.md index 6c3e9309..9fd5b542 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,7 @@ The recommended installation method is using [pip](http://pip-installer.org) After installation include `robotmbt` as library in your robot file to get access to the new functionality. To run your test suite model-based, use the __Treat this test suite model-based__ keyword as suite setup. Check the _How to model_ section to learn how to make your scenarios suitable for running model-based. -``` +```robotframework *** Settings *** Library robotmbt Suite Setup Treat this test suite model-based @@ -50,7 +50,8 @@ Modelling can be done directly from [Robot framework](https://robotframework.org Consider these two scenarios: -``` +```robotframework +*** Test Cases *** Buying a postcard When you buy a new postcard then you have a blank postcard @@ -63,7 +64,8 @@ Preparing for a birthday party Mapping the dependencies between scenarios is done by annotating the steps with modelling info. Modelling info is added to the documentation of the step as shown below. Regular documentation can still be added, as long as `*model info*` starts on a new line and a white line is included after the last `:OUT:` expression. -``` +```robotframework +*** Keywords *** you buy a new postcard [Documentation] *model info* ... :IN: None @@ -117,7 +119,8 @@ All example scenarios naturally contain data. This information is embedded in th #### Step argument modifiers -``` +```robotframework +*** Test Cases *** Personalising a birthday card Given there is a birthday card when Johan writes their name on the birthday card @@ -126,7 +129,8 @@ Personalising a birthday card The above scenario uses the name `Johan` to create a concrete example. But now suppose that from a testing perspective `Johan` and `Frederique` are part of the same equivalence class. Then the step `Frederique writes their name on the birthday card` would yield an equally valid scenario. This can be achieved by adding a modifier (`:MOD:`) to the model info of the step. The format of a modifier is a Robot argument to which you assign a list of options. The modifier updates the argument value to a randomly chosen value from the specified options. -``` +```robotframework +*** Keywords *** ${person} writes their name on the birthday card [Documentation] *model info* ... :MOD: ${person}= [Johan, Frederique] @@ -138,7 +142,8 @@ ${person} writes their name on the birthday card When constructing examples, they often express relations between multiple actors, where each actor can appear in multiple steps. This makes it important to know how modifiers behave when there are multiple modifiers in a scenario. -``` +```robotframework +*** Test Cases *** Addressing a birthday card Given Tannaz is having their birthday and Johan has a birthday card @@ -148,7 +153,8 @@ Addressing a birthday card Have a look at the when-step above. We will assume the model already contains a domain term with two properties: `birthday.celebrant = Tannaz` and `birthday.guests = [Johan, Frederique]`. -``` +```robotframework +*** Keywords *** ${sender} writes the address of ${receiver} on the birthday card [Documentation] *model info* ... :MOD: ${sender}= birthday.guests @@ -175,10 +181,12 @@ It is not possible to add new options to an existing example value. Any constrai It is possible for a step to keep the same options. The special `.*` notation lets you keep the available options as-is. Preceding steps must then supply the possible options. Some steps can, or must, deal with multiple independent sets of options that must not be mixed, because the expected results should differ. Suppose you have a set of valid and invalid passwords. You might be reluctant to include the superset of these as options to an authentication step. Instead, you can use `:MOD: ${password}= .*` as the modifier for that step. Like in the when-step for this scenario: -``` -Given 'secret' is too weak a password -When user tries to update their password to 'secret' -then the password is rejected +```robotframework +*** Test Cases *** +Reject password + Given 'secret' is too weak a password + When user tries to update their password to 'secret' + then the password is rejected ``` In a then-step, modifiers behave slightly different. In then-steps no new option constraints are accepted for an argument. Its value must already have been determined during the given- and when-steps. In other words, regardless of the actual modifier, the expression behaves as if it were `.*`. The exception to this is when a then-step signals the first use of a new example value. In that case the argument value from the original scenario text is used. @@ -193,12 +201,45 @@ For now, variable data considers strict equivalence classes only. This means tha By default, trace generation is random. The random seed used for the trace is logged by _Treat this test suite model-based_. This seed can be used to rerun the same trace, if no external random factors influence the test run. To activate the seed, pass it as argument: -``` +```robotframework Treat this test suite model-based seed=eag-etou-cxi-leamv-jsi ``` Using `seed=new` will force generation of a new reusable seed and is identical to omitting the seed argument. To completely bypass seed generation and use the system's random source, use `seed=None`. This has even more variation but does not produce a reusable seed. + +### Graphs + +By default, no graphs are generated for test-runs. For development purposes, having a visual representation of the test-suite you are working on can be very useful. To have robotmbt generate a graph, ensure you have installed the optional dependencies (`pip install .[visualization]`) and pass the type as an argument: + +```robotframework +Treat this test suite Model-based graph= +``` + +Here, `` can be any of the supported graph types, which can be seen in `robotmbt/visualise/visualiser.py`. + +Once the test suite has run, a graph will be included in the test's log, under the suite's `Treat this test suite Model-based` setup header. + +#### JSON exporting + +It is possible to extract the exploration data after the library has found a covering trace. To enable this feature, set the following argument to true: + +```robotframework +Treat this test suite Model-based export_graph_data= +``` + +A JSON file named after the test suite will be created containing said information. + +#### JSON importing + +It is possible to skip running the exploration step and produce a graph (e.g. of another type) from previously exported data. + +```robotframework +Treat this test suite Model-based graph= import_graph_data=.json +``` + +A graph will be created from the imported data. + ### Option management If you want to set configuration options for use in multiple test suites without having to repeat them, the keywords __Set model-based options__ and __Update model-based options__ can be used to configure RobotMBT library options. _Set_ takes the provided options and discards any previously set options. _Update_ allows you to modify existing options or add new ones. Reset all options by calling _Set_ without arguments. Direct options provided to __Treat this test suite model-based__ take precedence over library options and affect only the current test suite. diff --git a/atest/resources/helpers/__init__.py b/atest/resources/helpers/__init__.py new file mode 100644 index 00000000..c7ec90cd --- /dev/null +++ b/atest/resources/helpers/__init__.py @@ -0,0 +1,29 @@ +# BSD 3-Clause License +# +# Copyright (c) 2026, T.B. Dubbeling, J. Foederer, T.S. Kas, D.R. Osinga, D.F. Serra e Silva, J.C. Willegers +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/atest/resources/helpers/modelgenerator.py b/atest/resources/helpers/modelgenerator.py new file mode 100644 index 00000000..18b938fa --- /dev/null +++ b/atest/resources/helpers/modelgenerator.py @@ -0,0 +1,309 @@ +# BSD 3-Clause License +# +# Copyright (c) 2026, T.B. Dubbeling, J. Foederer, T.S. Kas, D.R. Osinga, D.F. Serra e Silva, J.C. Willegers +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from robot.api.deco import keyword # type:ignore +import os +from robotmbt.suiteprocessors import SuiteProcessors + +visualisation_deps_present = True +try: + import networkx as nx + from robotmbt.visualise.models import TraceInfo, ScenarioInfo, StateInfo + from robotmbt.visualise.visualiser import Visualiser + from robotmbt.visualise.graphs.abstractgraph import AbstractGraph + from robotmbt.visualise.networkvisualiser import NetworkVisualiser, Node + +except ImportError: + visualisation_deps_present = False + + jsonpickle = None + nx = None + TraceInfo = None + ScenarioInfo = None + StateInfo = None + Visualiser = None + AbstractGraph = None + NetworkVisualiser = None + Node = None + + +class ModelGenerator: + @keyword(name='Requirements Present') # type: ignore + def check_requirements(self) -> bool: + return visualisation_deps_present + + @keyword(name='Generate Trace Information') # type:ignore + def generate_trace_information(self) -> TraceInfo: + return TraceInfo() + + @keyword(name='The Algorithm Inserts') # type:ignore + def insert_trace_info(self, trace_info: TraceInfo, scenario_name: str, state_str: str | None = None) -> TraceInfo: + ''' + State should be of format + "name: key=value key2=value2 ..." + ''' + + (scen_info, state_info) = self.__convert_to_info_tuple(scenario_name, state_str) + trace_info.update_trace(scen_info, state_info, trace_info.previous_length+1) + + return trace_info + + @keyword(name='All Traces Contains List') # type:ignore + def all_traces_contains_list(self, trace_info: TraceInfo) -> TraceInfo: + trace_info.all_traces.append([]) + return trace_info + + @keyword(name='All Traces Contains') # type:ignore + def all_traces_contains(self, trace_info: TraceInfo, scenario_name: str, state_str: str | None) -> TraceInfo: + ''' + State should be of format + "scenario: key=value" + ''' + (scen_info, state_info) = self.__convert_to_info_tuple(scenario_name, state_str) + + for trace in trace_info.all_traces: + trace.append((scen_info, state_info)) + + return trace_info + + @keyword(name='Generate Graph') # type:ignore + def generate_graph(self, trace_info: TraceInfo, graph_type: str) -> AbstractGraph: + return Visualiser(graph_type=graph_type, trace_info=trace_info)._get_graph() + + @keyword(name="Generate Network Graph") + def generate_networkgraph(self, graph: AbstractGraph) -> NetworkVisualiser: + return NetworkVisualiser(graph=graph, suite_name="") + + @keyword(name='Export Graph') # type:ignore + def export_graph(self, suite: str, trace_info: TraceInfo) -> str: + return trace_info.export_graph(suite, atest=True) + + @keyword(name='Import Graph') # type:ignore + def import_graph(self, filepath: str) -> TraceInfo: + suiteprocessor = SuiteProcessors() + suiteprocessor._load_graph('scenario', 'atest', filepath) + return suiteprocessor.visualiser.trace_info + + @keyword(name='Check File Exists') # type:ignore + def check_file_exists(self, filepath: str) -> bool: + return os.path.exists(filepath) + + @keyword(name='Compare Trace Info') # type:ignore + def compare_trace_info(self, t1: TraceInfo, t2: TraceInfo) -> bool: + return repr(t1) == repr(t2) + + @keyword(name='Delete File') # type:ignore + def delete_file(self, filepath: str): + os.remove(filepath) + + @keyword(name='Graph Contains Vertex With No Text') # type:ignore + def graph_contains_no_text(self, graph: AbstractGraph, label: str) -> bool: + return label in graph.networkx.nodes() + + @keyword(name='Graph Contains Vertex With Text') # type:ignore + def graph_contains_vertex_with_text(self, graph: AbstractGraph, title: str, text: str | None = None) -> str | None: + """ + Returns the label of the complete node or None if it doesn't exist + """ + ATTRIBUTE = "label" + attr = nx.get_node_attributes(graph.networkx, ATTRIBUTE) + + (_, state_info) = self.__convert_to_info_tuple(title, text) + parts = state_info.properties[text.split(":")[0]] \ + if text is not None else [] + + for nodename, label in attr.items(): + if title in label: + if text is None: + # we sanitise because newlines in text go badly with eval() in Robot framework + return nodename + + count = 0 + for s in parts: + if f"{s}={parts[s]}" in label: + count += 1 + if count == len(parts): + # we sanitise because newlines in text go badly with eval() in Robot framework + return nodename + + return None + + @keyword(name="Vertices Are Connected") # type:ignore + def vertices_connected(self, graph: AbstractGraph, node_key1: str | None, node_key2: str | None) -> bool: + if node_key1 is None or node_key2 is None: + return False + return graph.networkx.has_edge(node_key1, node_key2) + + @keyword(name="Get NodeID") # type:ignore + def get_nodeid(self, graph: AbstractGraph, node_title: str) -> str | None: + return self.graph_contains_vertex_with_text(graph, node_title, text=None) + + @keyword(name="Get Vertex Y Position") # type:ignore + def get_y(self, network_vis: NetworkVisualiser, id: str) -> int | None: + try: + node: Node | None = network_vis.node_dict[id] + return node.y + except KeyError: + return None + + @keyword(name='Graph Contains Vertices Starting With') # type:ignore + def scen_graph_contains_vertices_starting_with(self, graph: AbstractGraph, vertices_str: str) -> bool | str: + ''' + vertices_str should be of format "'vertex1', 'vertex2'" etc + ''' + attr: dict[str, str] = nx.get_node_attributes(graph.networkx, "label") + + vertices = vertices_str.split("'") + for i in range(1, len(vertices), 2): + found = False + for _, label in attr.items(): + if label.startswith(vertices[i]): + found = True + break + + if not found: + return False + + return True + + @keyword(name='Backtrack') # type:ignore + def backtrack(self, trace_info: TraceInfo, steps: int) -> TraceInfo: + scenario, state = trace_info.current_trace[-steps - 1] + trace_info.update_trace(scenario, state, len(trace_info.current_trace) - steps) + return trace_info + + @keyword(name='Get Length Current Trace') # type:ignore + def get_length_current_trace(self, trace_info: TraceInfo) -> int: + return len(trace_info.current_trace) + + @keyword(name='Get Number of Backtracked Traces') # type:ignore + def get_number_of_backtracked_traces(self, trace_info: TraceInfo) -> int: + return len(trace_info.all_traces) + + # ============= # + # == HELPERS == # + # ============= # + + @staticmethod + def __convert_to_info_tuple(scenario_name: str, keyvaluestr: str | None) -> tuple[ScenarioInfo, StateInfo]: + """ + Format: + "domain1: key1=value1, key2=value2" + """ + scenario_name = scenario_name.strip() + if keyvaluestr is None: + return (ScenarioInfo(scenario_name), StateInfo._create_state_with_prop("", [])) + + keyvaluestr = keyvaluestr.strip() + + split_domain: list[str] = keyvaluestr.split(':') + domain = split_domain[0] + keyvaluestr = ":".join(split_domain[1:]) if len(split_domain) > 2 else split_domain[1] + + # contains ["key1", "value1 key2", "value2"]-like structure + split_eq: list[str] = keyvaluestr.split("=") + if len(split_eq) < 2: + raise ValueError( + "Please input a valid state information string of format \"scenario1: key1=value1 key2=value2\"" + ) + + keyvalues: list[tuple[str, str]] = [] + + prev_key = split_eq[0] + for index in range(1, len(split_eq)): + splits: list[str] = ModelGenerator.__split_top_level(split_eq[index]) + splits = [s.strip() for s in splits] + prev_value: str = " ".join(splits[:-1]) if len(splits) > 1 else splits[0] + new_key: str = splits[-1] + + keyvalues.append((prev_key, prev_value)) + prev_key = new_key + + return (ScenarioInfo(scenario_name), StateInfo._create_state_with_prop(domain, keyvalues)) + + @staticmethod + def __split_top_level(text): + parts = [] + buf = [] + + depth = 0 + string_char = None + escape = False + + i = 0 + n = len(text) + + while i < n: + ch = text[i] + + # Inside string + if string_char: + buf.append(ch) + if escape: + escape = False + elif ch == "\\": + escape = True + elif ch == string_char: + string_char = None + i += 1 + continue + + # Start of string + if ch in ("'", '"'): + string_char = ch + buf.append(ch) + i += 1 + continue + + # Nesting + if ch in "([{": + depth += 1 + buf.append(ch) + i += 1 + continue + + if ch in ")]}": + depth -= 1 + buf.append(ch) + i += 1 + continue + + # Split condition: ", " at top level + if ch == "," and i + 1 < n and text[i + 1] == " " and depth == 0: + parts.append("".join(buf)) + buf.clear() + i += 2 # skip ", " + continue + + buf.append(ch) + i += 1 + + parts.append("".join(buf)) + return parts diff --git a/atest/resources/visualisation.resource b/atest/resources/visualisation.resource new file mode 100644 index 00000000..9a3c01fa --- /dev/null +++ b/atest/resources/visualisation.resource @@ -0,0 +1,242 @@ +*** Settings *** +Documentation Resource file for testing the visualisation of RobotMBT +Library atest.resources.helpers.modelgenerator.ModelGenerator +Library Collections + + +*** Keywords *** +Check requirements + [Documentation] *model info* + ... :IN: None + ... :OUT: None + ${result} = Requirements Present + Run Keyword If ${result} == ${False} + ... Skip Visualisation requirements not installed. Please read the README for information on how to do this. + +test suite ${suite} has trace info ${trace} + [Documentation] *model info* + ... :IN: new trace_info | trace_info.current_trace=0 | trace_info.all_traces=0 | + ... trace_info.name = ${trace} | new graph + ... :OUT: None + ${trace_info} = Generate Trace Information + Set Suite Variable ${trace_info} + +trace info ${trace} + [Documentation] *model info* + ... :IN: trace_info.name == ${trace} + ... :OUT: trace_info.name == ${trace} + No operation + +the algorithm inserts '${scenario_name}' with state "${state_str}" + [Documentation] *model info* + ... :IN: trace_info + ... :OUT: trace_info.current_trace+=1 + Variable Should Exist ${trace_info} + ${trace_info} = The Algorithm Inserts ${trace_info} ${scenario_name} ${state_str} + +test suite ${suite} has ${n} steps in its current trace + [Documentation] *model info* + ... :IN: trace_info.current_trace==${n} + ... :OUT: trace_info.current_trace==${n} + No operation + +test suite ${suite} has ${n} total traces + [Documentation] *model info* + ... :IN: trace_info.current_trace | trace_info.all_traces==${n}-1 + ... :OUT: trace_info.current_trace | trace_info.all_traces==${n}-1 + No operation + +${graph_type} graph ${name} is generated + [Documentation] *model info* + ... :IN: trace_info + ... :OUT: graph.name=${name} | graph.type=${graph_type} + Variable Should Exist ${trace_info} + + ${graph} = Generate Graph ${trace_info} ${graph_type} + ${network_visualiser} = Generate Network Graph ${graph} + Set Suite Variable ${graph} + Set Suite Variable ${network_visualiser} + +graph ${name} contains vertex '${scenario}' + [Documentation] *model info* + ... :IN: graph.name==${name} + ... :OUT: graph.name==${name} + Variable Should Exist ${graph} + + ${id} = Get NodeID ${graph} ${scenario} + ${result} = Graph Contains Vertex With No Text ${graph} ${id} + Run Keyword If ${result} == False + ... Fail Fail: Graph does not contain '${scenario}' + +graph ${name} contains vertex '${scenario}' with text "${text}" + [Documentation] *model info* + ... :IN: graph.name==${name} + ... :OUT: graph.name==${name} + Variable Should Exist ${graph} + + ${result} = Graph Contains Vertex With Text ${graph} ${scenario} ${text} + Run Keyword If "${result}" == "None" + ... Fail Fail: Graph does not contain '${scenario}' with "${text}" + +graph ${name} does not contain vertex '${scenario}' + [Documentation] *model info* + ... :IN: graph.name==${name} + ... :OUT: graph.name==${name} + Variable Should Exist ${graph} + + ${id} = Get NodeID ${graph} ${scenario} + ${result} = Graph Contains Vertex With No Text ${graph} ${id} + Run Keyword If "${result}" != "None" + ... Fail Fail: Graph contains '${scenario}'" + +graph ${name} does not contain vertex '${scenario}' with text "${text}" + [Documentation] *model info* + ... :IN: graph.name==${name} + ... :OUT: graph.name==${name} + Variable Should Exist ${graph} + + ${result} = Graph Contains Vertex With Text ${graph} ${scenario} ${text} + Run Keyword If "${result}" != "None" + ... Fail Fail: Graph contains '${scenario}' with "${text}" + +graph ${name} has an edge from '${start_title}' to '${end_title}' + [Documentation] *model info* + ... :IN: graph.name==${name} + ... :OUT: graph.name==${name} + Variable Should Exist ${graph} + ${start_node} = Graph Contains Vertex With Text ${graph} ${start_title} + ${end_node} = Graph Contains Vertex With Text ${graph} ${end_title} + + Should Not Be Equal ${start_node} ${None} Start node with title '${start_title}' not found + Should Not Be Equal ${end_node} ${None} End node with title '${end_title}' not found + + ${result} = Vertices Are Connected ${graph} ${start_node} ${end_node} + Run Keyword If ${result} != True + ... Fail Fail: Edges '${start_title}' and '${end_title}' exist in graph but are not connected! + +graph ${name} does not have an edge from '${start_title}' to '${end_title}' + [Documentation] *model info* + ... :IN: graph.name==${name} + ... :OUT: graph.name==${name} + Variable Should Exist ${graph} + ${start_node} = Graph Contains Vertex With Text ${graph} ${start_title} + ${end_node} = Graph Contains Vertex With Text ${graph} ${end_title} + + ${result} = Vertices Are Connected ${graph} ${start_node} ${end_node} + Run Keyword If ${result} == True + ... Fail Fail: Edges '${start_title}' and '${end_title}' exist in graph and are connected, but shouldn't be! + +graph ${name} has vertices ${vertices_string} + [Documentation] *model info* + ... :IN: graph.name==${name} + ... :OUT: graph.name==${name} + Variable Should Exist ${graph} + ${result} = Graph Contains Vertices Starting With ${graph} ${vertices_string} + Run Keyword If ${result} != True + ... Fail Graph did not contain all vertices ${vertices_string} + +vertex '${start_vertex}' is placed above '${end_vertex}' + [Documentation] *model info* + ... :IN: None + ... :OUT: None + Variable Should Exist ${graph} + Variable Should Exist ${network_visualiser} + ${node1} = Get NodeID ${graph} ${start_vertex} + + Run Keyword If "${node1}" == "None" + ... Fail Vertex '${start_vertex}' does not exist + + ${node2} = Get NodeID ${graph} ${end_vertex} + + Run Keyword If "${node2}" == "None" + ... Fail Vertex '${end_vertex}' does not exist + + ${result1} = Get Vertex Y position ${network_visualiser} ${node1} + ${result2} = Get Vertex Y position ${network_visualiser} ${node2} + + Run Keyword If ${result1} < ${result2} + ... Fail Vertex '${start_vertex}' is below '${end_vertex}' + +the algorithm backtracks by ${n} step(s) + [Documentation] *model info* + ... :IN: trace_info.current_trace >= ${n} + ... :OUT: trace_info.current_trace-=${n} | trace_info.all_traces+=1 + Variable Should Exist ${trace_info} + + ${a} = Get Length Current Trace ${trace_info} + ${b} = Get Number of Backtracked Traces ${trace_info} + + ${trace_info} = Backtrack ${trace_info} ${n} + + ${i} = Get Length Current Trace ${trace_info} + ${j} = Get Number of Backtracked Traces ${trace_info} + + Run Keyword If ${a} - ${n} != ${i} + ... Fail Fail: Backtracking did not shorten current trace correctly + + Run Keyword If ${b} + 1 != ${j} + ... Fail Fail: Backtracking did not add new trace to all traces + +test suite ${suite} is exported to json + [Documentation] *model info* + ... :IN: trace_info.current_trace, trace_info.all_traces>0 + ... :OUT: new filepath + Variable Should Exist ${suite} + Variable Should Exist ${trace_info} + + ${filepath} = Export Graph ${suite} ${trace_info} + Set Suite Variable ${filepath} + +the file ${filename} exists + [Documentation] *model info* + ... :IN: filepath + ... :OUT: filepath + Variable Should Exist ${filepath} + + ${result} = Check File Exists ${filepath} + Run Keyword If ${result} == False + ... Fail Fail: File does not exist + +${filename} is imported + [Documentation] *model info* + ... :IN: filepath + ... :OUT: new new_trace_info + Variable Should Exist ${filepath} + + ${new_trace_info} = Import Graph ${filepath} + Set Suite Variable ${new_trace_info} + +trace info from ${filename} is the same as trace info ${trace} + [Documentation] *model info* + ... :IN: new_trace_info, trace_info + ... :OUT: new flag_cleanup + Variable Should Exist ${trace_info} + Variable Should Exist ${new_trace_info} + + ${result} = Compare Trace Info ${trace_info} ${new_trace_info} + Run Keyword If ${result} == False + ... Fail Fail: Exported and Imported trace info are not the same + +${file} has been imported + [Documentation] *model info* + ... :IN: flag_cleanup + ... :OUT: flag_cleanup + No operation + +${filename} is deleted + [Documentation] *model info* + ... :IN: filepath + ... :OUT: filepath + Variable Should Exist ${filepath} + + Delete File ${filepath} + +${filename} does not exist + [Documentation] *model info* + ... :IN: filepath + ... :OUT: filepath + Variable Should Exist ${filepath} + + ${result} = Check File Exists ${filepath} + Run Keyword If ${result} == True + ... Fail Fail: File exist \ No newline at end of file diff --git a/atest/robotMBT tests/10__visualisation/01__visualisation_representations/01__setup.robot b/atest/robotMBT tests/10__visualisation/01__visualisation_representations/01__setup.robot new file mode 100644 index 00000000..2c2da0bf --- /dev/null +++ b/atest/robotMBT tests/10__visualisation/01__visualisation_representations/01__setup.robot @@ -0,0 +1,8 @@ +*** Settings *** +Library robotmbt processor=flatten + +*** Test Cases *** +Feature Setup + Given test suite s has trace info t + Then the algorithm inserts 'A1' with state "attr: states = ['a1'], special='!'" + And the algorithm inserts 'A2' with state "attr: states = ['a1', 'a2'], special='!'" \ No newline at end of file diff --git a/atest/robotMBT tests/10__visualisation/01__visualisation_representations/02__scenario.robot b/atest/robotMBT tests/10__visualisation/01__visualisation_representations/02__scenario.robot new file mode 100644 index 00000000..34d159e5 --- /dev/null +++ b/atest/robotMBT tests/10__visualisation/01__visualisation_representations/02__scenario.robot @@ -0,0 +1,26 @@ +*** Settings *** +Library robotmbt processor=flatten + +*** Test Cases *** +Vertex Scenario graph + Given trace info t + When scenario graph g is generated + Then graph g contains vertex 'start' + And graph g contains vertex 'A1' + And graph g contains vertex 'A2' + +Edge Scenario graph + Given trace info t + When scenario graph g is generated + Then graph g has an edge from 'start' to 'A1' + And graph g has an edge from 'A1' to 'A2' + And graph g does not have an edge from 'start' to 'A2' + And graph g does not have an edge from 'A2' to 'A1' + And graph g does not have an edge from 'A2' to 'start' + +Visual location of vertices scenario + Given trace info t + When scenario graph g is generated + Then graph g has vertices 'start', 'A1', 'A2' + And vertex 'start' is placed above 'A1' + And vertex 'A1' is placed above 'A2' diff --git a/atest/robotMBT tests/10__visualisation/01__visualisation_representations/03__scenario-delta-value.robot b/atest/robotMBT tests/10__visualisation/01__visualisation_representations/03__scenario-delta-value.robot new file mode 100644 index 00000000..4f7322ef --- /dev/null +++ b/atest/robotMBT tests/10__visualisation/01__visualisation_representations/03__scenario-delta-value.robot @@ -0,0 +1,27 @@ +*** Settings *** +Library robotmbt processor=flatten + +*** Test Cases *** +Vertex Scenario-Delta-Value graph + Given trace info t + When scenario-delta-value graph g is generated + Then graph g contains vertex 'start' + And graph g contains vertex 'A1' with text "attr: states = ['a1'], special='!'" + And graph g contains vertex 'A2' with text "attr: states = ['a1', 'a2']" + And graph g does not contain vertex 'A2' with text "attr: states = ['a1', 'a2'], special='!'" + +Edge Scenario-Delta-Value graph + Given trace info t + When scenario-delta-value graph g is generated + Then graph g has an edge from 'start' to 'A1' + And graph g has an edge from 'A1' to 'A2' + And graph g does not have an edge from 'start' to 'A2' + And graph g does not have an edge from 'A2' to 'A1' + And graph g does not have an edge from 'A2' to 'start' + +Visual location of vertices scenario-delta-value + Given trace info t + When scenario-delta-value graph g is generated + Then graph g has vertices 'start', 'A1', 'A2' + And vertex 'start' is placed above 'A1' + And vertex 'A1' is placed above 'A2' diff --git a/atest/robotMBT tests/10__visualisation/01__visualisation_representations/__init__.robot b/atest/robotMBT tests/10__visualisation/01__visualisation_representations/__init__.robot new file mode 100644 index 00000000..6d745c41 --- /dev/null +++ b/atest/robotMBT tests/10__visualisation/01__visualisation_representations/__init__.robot @@ -0,0 +1,11 @@ +*** Settings *** +Documentation Test correctness all graph representations +Suite Setup Enter test suite +Resource ../../../resources/visualisation.resource +Library robotmbt processor=flatten + + +*** Keywords *** +Enter test suite + Check requirements + Treat this test suite Model-based \ No newline at end of file diff --git a/atest/robotMBT tests/10__visualisation/02__visualisation_execution_path/01_repetition.robot b/atest/robotMBT tests/10__visualisation/02__visualisation_execution_path/01_repetition.robot new file mode 100644 index 00000000..2105a4d2 --- /dev/null +++ b/atest/robotMBT tests/10__visualisation/02__visualisation_execution_path/01_repetition.robot @@ -0,0 +1,34 @@ +*** Settings *** +Documentation +Suite Setup Enter test suite +Resource ../../../resources/visualisation.resource +Library robotmbt + +*** Keywords *** +Enter test suite + Check requirements + Treat this test suite Model-based + +*** Test Cases *** + +Setup + Given test suite s has trace info t + Then the algorithm inserts 'A1' with state "attr: states=['a1'],special='!'" + And the algorithm inserts 'A1' with state "attr: states=['a1'],special='!'" + And the algorithm inserts 'B1' with state "attr: states=['a1','b1'], special='!'" + And the algorithm inserts 'B2' with state "attr: states=['a1','b1','b2'], special='!'" + And the algorithm inserts 'B1' with state "attr: states=['a1','b1','b2'], special='!'" + +Self-loop + Given trace info t + When scenario graph g is generated + Then graph g contains vertex 'A1' + And graph g has an edge from 'A1' to 'A1' + +Two-vertex loop + Given trace info t + When scenario graph g is generated + Then graph g contains vertex 'B1' + And graph g contains vertex 'B2' + And graph g has an edge from 'B1' to 'B2' + And graph g has an edge from 'B2' to 'B1' \ No newline at end of file diff --git a/atest/robotMBT tests/10__visualisation/03__visualisation_export/01__export to JSON.robot b/atest/robotMBT tests/10__visualisation/03__visualisation_export/01__export to JSON.robot new file mode 100644 index 00000000..3edd5dbf --- /dev/null +++ b/atest/robotMBT tests/10__visualisation/03__visualisation_export/01__export to JSON.robot @@ -0,0 +1,42 @@ +*** Settings *** +Documentation Export and import a test suite from and to JSON +... and check that the imported suite equals the +... exported suite. +Suite Setup Enter test suite +Resource ../../../resources/visualisation.resource +Library robotmbt + +*** Keywords *** +Enter test suite + Check requirements + Treat this test suite Model-based + +*** Test Cases *** +Setup + Given test suite s has trace info t + When the algorithm inserts 'A1' with state "attr: states = ['a1'], special='!'" + And the algorithm inserts 'A2' with state "attr: states = ['a1', 'a2'], special='!'" + Then test suite s has 2 steps in its current trace + +Backtrack and Insert + Given test suite s has 2 steps in its current trace + When the algorithm backtracks by 1 step(s) + And the algorithm inserts 'B1' with state "attr: states=['a1', 'b1'], special='!'" + Then test suite s has 2 total traces + And test suite s has 2 steps in its current trace + +Export test suite to json file + Given test suite s has 2 total traces + When test suite s is exported to json + Then the file s.json exists + +Load json file into robotmbt + Given the file s.json exists + When s.sjon is imported + Then trace info from s.json is the same as trace info t + +Clean-up + Given the file s.json exists + And s.json has been imported + When s.sjon is deleted + Then s.json does not exist \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 25e5cd49..bc5c7e73 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,8 +23,11 @@ dependencies = [ ] requires-python = ">=3.10" -[tool.setuptools] -packages = ["robotmbt"] +[tool.setuptools.packages.find] +include = ["robotmbt*"] [project.urls] Homepage = "https://github.com/JFoederer/robotframeworkMBT" + +[project.optional-dependencies] +visualization = ["networkx", "bokeh", "grandalf", "jsonpickle"] diff --git a/robotmbt/modeller.py b/robotmbt/modeller.py index f3505b0f..e92e8c66 100644 --- a/robotmbt/modeller.py +++ b/robotmbt/modeller.py @@ -36,7 +36,7 @@ from robot.utils import is_list_like from .modelspace import ModelSpace -from .steparguments import StepArgument, StepArguments, ArgKind +from .steparguments import StepArguments, ArgKind from .substitutionmap import SubstitutionMap from .suitedata import Scenario, Step from .tracestate import TraceState, TraceSnapShot @@ -66,7 +66,7 @@ def try_to_fit_in_scenario(candidate: Scenario, tracestate: TraceState): tracestate.push_partial_scenario(inserted.src_id, inserted, model, remainder) -def process_scenario(scenario: Scenario, model: ModelSpace) -> tuple[Scenario, Scenario, dict[str, Any]]: +def process_scenario(scenario: Scenario, model: ModelSpace) -> tuple[Scenario | None, Scenario | None, dict[str, Any]]: for step in scenario.steps: if 'error' in step.model_info: return None, None, dict(fail_masg=f"Error in scenario {scenario.name} " @@ -162,7 +162,7 @@ def handle_refinement_exit(inserted_refinement: Scenario, tracestate: TraceState tracestate.push_partial_scenario(tail_inserted.src_id, tail_inserted, model, remainder) -def generate_scenario_variant(scenario: Scenario, model: ModelSpace) -> Scenario: +def generate_scenario_variant(scenario: Scenario, model: ModelSpace) -> Scenario | None: scenario = scenario.copy() # collect set of constraints subs = SubstitutionMap() diff --git a/robotmbt/modelspace.py b/robotmbt/modelspace.py index b3238670..e5cac2a9 100644 --- a/robotmbt/modelspace.py +++ b/robotmbt/modelspace.py @@ -215,3 +215,17 @@ def __iter__(self): def __bool__(self): return any(True for _ in self) + + def __eq__(self, other): + self_set = set([(attr, getattr(self, attr)) for attr in dir(self._outer_scope) + dir(self) + if not attr.startswith('__') and attr != '_outer_scope']) + other_set = set([(attr, getattr(other, attr)) for attr in dir(other._outer_scope) + dir(other) + if not attr.startswith('__') and attr != '_outer_scope']) + return self_set == other_set + + def __str__(self): + res = "{" + for k, v in self: + res += f"{k}={v}, " + res += "}" + return res diff --git a/robotmbt/steparguments.py b/robotmbt/steparguments.py index 4a8d96be..c4f89866 100644 --- a/robotmbt/steparguments.py +++ b/robotmbt/steparguments.py @@ -40,7 +40,7 @@ class StepArguments(list): def __init__(self, iterable=[]): super().__init__(item.copy() for item in iterable) - def fill_in_args(self, text: str, as_code: bool = False): + def fill_in_args(self, text: str, as_code: bool = False) -> str: result = text for arg in self: sub = arg.codestring if as_code else str(arg.value) diff --git a/robotmbt/substitutionmap.py b/robotmbt/substitutionmap.py index 5336d4e1..d5bc66a1 100644 --- a/robotmbt/substitutionmap.py +++ b/robotmbt/substitutionmap.py @@ -66,7 +66,7 @@ def substitute(self, example_value: str, constraint: list[Any]): def solve(self) -> dict[str, str]: self.solution = {} - solution = dict() + solution: dict[str, str] = dict() substitutions = self.copy().substitutions unsolved_subs = list(substitutions) subs_stack = [] @@ -117,12 +117,13 @@ def __init__(self, constraint: list[Any]): try: # Keep the items in optionset unique. Refrain from using Python sets # due to non-deterministic behaviour when using random seeding. - self.optionset: list[Any] = list(dict.fromkeys(constraint)) + self.optionset: list[Any] | None = list(dict.fromkeys(constraint)) except: - self.optionset = None + self.optionset: list[Any] | None = None if not self.optionset or isinstance(constraint, str): raise ValueError(f"Invalid option set for initial constraint: {constraint}") - self.removed_stack = [] + + self.removed_stack: list[str | Placeholder] = [] def __repr__(self): return f'Constraint([{", ".join([str(e) for e in self.optionset])}])' diff --git a/robotmbt/suitedata.py b/robotmbt/suitedata.py index e64b2b38..01f79734 100644 --- a/robotmbt/suitedata.py +++ b/robotmbt/suitedata.py @@ -31,7 +31,7 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import copy -from typing import Literal +from typing import Literal, Any from robot.running.arguments.argumentspec import ArgumentSpec from robot.running.arguments.argumentvalidator import ArgumentValidator @@ -183,7 +183,7 @@ def posnom_args_str(self) -> tuple[str, ...]: """A tuple with all arguments in Robot accepted text format ('posA' , 'posB', 'named1=namedA')""" if self.detached or not self.args.modified: return self.org_pn_args - result = [] + result: list[Any] = [] for arg in self.args: if arg.is_default: continue @@ -197,8 +197,6 @@ def posnom_args_str(self) -> tuple[str, ...]: elif arg.kind == ArgKind.FREE_NAMED: for name, value in arg.value.items(): result.append(f"{name}={value}") - else: - continue return tuple(result) @property diff --git a/robotmbt/suiteprocessors.py b/robotmbt/suiteprocessors.py index ebed36dd..88dd7c0f 100644 --- a/robotmbt/suiteprocessors.py +++ b/robotmbt/suiteprocessors.py @@ -32,7 +32,6 @@ import copy import random -from typing import Any from robot.api import logger @@ -42,8 +41,20 @@ from .tracestate import TraceState +try: + from .visualise.visualiser import Visualiser + from .visualise.models import TraceInfo + import jsonpickle + visualisation_deps_present = True +except ImportError: + Visualiser = None + TraceInfo = None + visualisation_deps_present = False + + class SuiteProcessors: - def echo(self, in_suite: Suite) -> Suite: + @staticmethod + def echo(in_suite: Suite) -> Suite: return in_suite def flatten(self, in_suite: Suite) -> Suite: @@ -74,13 +85,36 @@ def flatten(self, in_suite: Suite) -> Suite: out_suite.suites = [] return out_suite - def process_test_suite(self, in_suite: Suite, *, seed: str | int | bytes | bytearray = 'new') -> Suite: + def process_test_suite(self, in_suite: Suite, *, seed: str | int | bytes | bytearray = 'new', + graph: str = '', export_graph_data: str = '', import_graph_data: str = '') -> Suite: self.out_suite = Suite(in_suite.name) self.out_suite.filename = in_suite.filename self.out_suite.parent = in_suite.parent self._fail_on_step_errors(in_suite) self.flat_suite = self.flatten(in_suite) + if import_graph_data != '': + self._load_graph(graph, in_suite.name, import_graph_data) + + else: + self._run_test_suite(seed, graph, in_suite.name, export_graph_data) + + self.__write_visualisation() + + return self.out_suite + + def _load_graph(self, graph: str, suite_name: str, file_path: str): + """ + Imports a JSON encoding of a graph and reconstructs the graph from it. The reconstructed graph overrides the + current graph instance this method is called on. + file_path: the relative path to the graph JSON. + """ + with open(file_path, "r") as f: + string = f.read() + traceinfo = jsonpickle.decode(string) + self.visualiser = Visualiser(graph, suite_name, trace_info=traceinfo) + + def _run_test_suite(self, seed: str | int | bytes | bytearray, graph: str, suite_name: str, export_dir: str): for id, scenario in enumerate(self.flat_suite.scenarios, start=1): scenario.src_id = id self.scenarios: list[Scenario] = self.flat_suite.scenarios[:] @@ -91,6 +125,21 @@ def process_test_suite(self, in_suite: Suite, *, seed: str | int | bytes | bytea self.shuffled: list[int] = [s.src_id for s in self.scenarios] random.shuffle(self.shuffled) # Keep a single shuffle for all TraceStates (non-essential) + self.visualiser = None + if visualisation_deps_present and (graph or export_dir): + try: + self.visualiser = Visualiser(graph, suite_name, export_dir) + except Exception as e: + self.visualiser = None + logger.warn(f'Could not initialise visualiser due to error!\n{e}') + + elif graph and not visualisation_deps_present: + logger.warn(f'Visualisation {graph} requested, but required dependencies are not installed. ' + 'Refer to the README on how to install these dependencies. ') + elif export_dir and not visualisation_deps_present: + logger.warn(f'Visualization export to {export_dir} requested, but required dependencies are not installed. ' + 'Refer to the README on how to install these dependencies. ') + # a short trace without the need for repeating scenarios is preferred tracestate = self._try_to_reach_full_coverage(allow_duplicate_scenarios=False) @@ -102,7 +151,6 @@ def process_test_suite(self, in_suite: Suite, *, seed: str | int | bytes | bytea self.out_suite.scenarios = tracestate.get_trace() self._report_tracestate_wrapup(tracestate) - return self.out_suite def _try_to_reach_full_coverage(self, allow_duplicate_scenarios: bool) -> TraceState: tracestate = TraceState(self.shuffled) @@ -118,9 +166,11 @@ def _try_to_reach_full_coverage(self, allow_duplicate_scenarios: bool) -> TraceS candidate = self._select_scenario_variant(candidate_id, tracestate) if not candidate: # No valid variant available in the current state tracestate.reject_scenario(candidate_id) + self.__update_visualisation(tracestate) continue previous_len = len(tracestate) modeller.try_to_fit_in_scenario(candidate, tracestate) + self.__update_visualisation(tracestate) self._report_tracestate_to_user(tracestate) if len(tracestate) > previous_len: logger.debug(f"last state:\n{tracestate.model.get_status_text()}") @@ -134,8 +184,25 @@ def _try_to_reach_full_coverage(self, allow_duplicate_scenarios: bool) -> TraceS modeller.rewind(tracestate, drought_recovery=True) self._report_tracestate_to_user(tracestate) logger.debug(f"last state:\n{tracestate.model.get_status_text()}") + self.__update_visualisation(tracestate) + self.__update_visualisation(tracestate) return tracestate + def __update_visualisation(self, tracestate: TraceState): + if self.visualiser is not None: + try: + self.visualiser.update_trace(tracestate) + except Exception as e: + logger.warn(f'Could not update visualisation due to error!\n{e}') + + def __write_visualisation(self): + if self.visualiser is not None: + try: + text, html = self.visualiser.generate_visualisation() + logger.info(text, html=html) + except Exception as e: + logger.warn(f'Could not generate visualisation due to error!\n{e}') + @staticmethod def __last_candidate_changed_nothing(tracestate: TraceState) -> bool: if len(tracestate) < 2: @@ -164,6 +231,7 @@ def _scenario_with_repeat_counter(self, index: int, tracestate: TraceState) -> S @staticmethod def _fail_on_step_errors(suite: Suite): error_list = suite.steps_with_errors() + if error_list: err_msg = "Steps with errors in their model info found:\n" err_msg += '\n'.join([f"{s.keyword} [{s.model_info['error']}] used in {s.parent.name}" @@ -188,7 +256,7 @@ def _init_randomiser(seed: str | int | bytes | bytearray): seed = seed.strip() if str(seed).lower() == 'none': logger.info( - f"Using system's random seed for trace generation. This trace cannot be rerun. Use `seed=new` to generate a reusable seed.") + "Using system's random seed for trace generation. This trace cannot be rerun. Use `seed=new` to generate a reusable seed.") elif str(seed).lower() == 'new': new_seed = SuiteProcessors._generate_seed() logger.info(f"seed={new_seed} (use seed to rerun this trace)") diff --git a/robotmbt/suitereplacer.py b/robotmbt/suitereplacer.py index 7163a5cd..faa3fe81 100644 --- a/robotmbt/suitereplacer.py +++ b/robotmbt/suitereplacer.py @@ -36,7 +36,7 @@ from .suiteprocessors import SuiteProcessors from robot.api import logger from robot.api.deco import library, keyword -from typing import Any, Literal +from typing import Any from robot.libraries.BuiltIn import BuiltIn Robot = BuiltIn() diff --git a/robotmbt/visualise/__init__.py b/robotmbt/visualise/__init__.py new file mode 100644 index 00000000..c7ec90cd --- /dev/null +++ b/robotmbt/visualise/__init__.py @@ -0,0 +1,29 @@ +# BSD 3-Clause License +# +# Copyright (c) 2026, T.B. Dubbeling, J. Foederer, T.S. Kas, D.R. Osinga, D.F. Serra e Silva, J.C. Willegers +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/robotmbt/visualise/graphs/__init__.py b/robotmbt/visualise/graphs/__init__.py new file mode 100644 index 00000000..c7ec90cd --- /dev/null +++ b/robotmbt/visualise/graphs/__init__.py @@ -0,0 +1,29 @@ +# BSD 3-Clause License +# +# Copyright (c) 2026, T.B. Dubbeling, J. Foederer, T.S. Kas, D.R. Osinga, D.F. Serra e Silva, J.C. Willegers +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/robotmbt/visualise/graphs/abstractgraph.py b/robotmbt/visualise/graphs/abstractgraph.py new file mode 100644 index 00000000..4214d0db --- /dev/null +++ b/robotmbt/visualise/graphs/abstractgraph.py @@ -0,0 +1,186 @@ +# BSD 3-Clause License +# +# Copyright (c) 2026, T.B. Dubbeling, J. Foederer, T.S. Kas, D.R. Osinga, D.F. Serra e Silva, J.C. Willegers +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from abc import ABC, abstractmethod +from typing import Generic, TypeVar + +from robotmbt.visualise.models import TraceInfo, ScenarioInfo, StateInfo +import networkx as nx + + +NodeInfo = TypeVar('NodeInfo') +EdgeInfo = TypeVar('EdgeInfo') + + +class AbstractGraph(ABC, Generic[NodeInfo, EdgeInfo]): + def __init__(self, info: TraceInfo): + """ + Note that networkx's ids have to be of a serializable and hashable type after construction. + """ + # The underlying storage - a NetworkX DiGraph + self.networkx: nx.DiGraph = nx.DiGraph() + + # Keep track of node IDs + self.ids: dict[str, tuple[NodeInfo, str]] = {} + + # Add the start node + self.networkx.add_node('start', label='start', description='') + self.start_node = 'start' + + # Add nodes and edges for all traces + for trace in info.all_traces: + for i in range(len(trace)): + if i > 0: + from_node = self._get_or_create_id( + self.select_node_info(trace, i - 1), + self.create_node_description(trace, i - 1)) + else: + from_node = 'start' + to_node = self._get_or_create_id( + self.select_node_info(trace, i), + self.create_node_description(trace, i)) + self._add_node(from_node) + self._add_node(to_node) + self._add_edge(from_node, to_node, + self.create_edge_label(self.select_edge_info(trace[i]))) + + # Set the final trace and add any missing nodes/edges + self.final_trace = ['start'] + for i in range(len(info.current_trace)): + if i > 0: + from_node = self._get_or_create_id( + self.select_node_info(info.current_trace, i - 1), + self.create_node_description(info.current_trace, i - 1)) + else: + from_node = 'start' + to_node = self._get_or_create_id( + self.select_node_info(info.current_trace, i), + self.create_node_description(info.current_trace, i)) + self.final_trace.append(to_node) + self._add_node(from_node) + self._add_node(to_node) + self._add_edge(from_node, to_node, + self.create_edge_label(self.select_edge_info(info.current_trace[i]))) + + def get_final_trace(self) -> list[str]: + """ + Get the final trace as ordered node ids. + Edges are subsequent entries in the list. + """ + return self.final_trace + + def _get_or_create_id(self, info: NodeInfo, description: str) -> str: + """ + Get the ID for a state that has been added before, or create and store a new one. + """ + for i in self.ids.keys(): + if self.ids[i][0] == info: + return i + + new_id = f"node{len(self.ids)}" + self.ids[new_id] = info, description + return new_id + + def _add_node(self, node: str): + """ + Add node if it doesn't already exist. + """ + if node not in self.networkx.nodes: + self.networkx.add_node( + node, label=self.create_node_label(self.ids[node][0]), description=self.ids[node][1]) + + def _add_edge(self, from_node: str, to_node: str, label: str): + """ + Add edge if it doesn't already exist. + If edge exists, update the label information + """ + if (from_node, to_node) in self.networkx.edges: + if label == '': + return + old_label = nx.get_edge_attributes(self.networkx, 'label')[ + (from_node, to_node)] + if label in old_label.split('\n'): + return + new_label = old_label + '\n' + label + attr = {(from_node, to_node): {'label': new_label}} + nx.set_edge_attributes(self.networkx, attr) + else: + self.networkx.add_edge( + from_node, to_node, label=label) + + @staticmethod + @abstractmethod + def select_node_info(trace: list[tuple[ScenarioInfo, StateInfo]], index: int) -> NodeInfo: + """Select the info to use to compare nodes and generate their labels for a specific graph type.""" + + @staticmethod + @abstractmethod + def select_edge_info(pair: tuple[ScenarioInfo, StateInfo]) -> EdgeInfo: + """Select the info to use to generate the label for each edge for a specific graph type.""" + + @staticmethod + @abstractmethod + def create_node_description(trace: list[tuple[ScenarioInfo, StateInfo]], index: int) -> str: + """Create the description to be shown in a tooltip for a node given the full trace and its index.""" + + @staticmethod + @abstractmethod + def create_node_label(info: NodeInfo) -> str: + """Create the label for a node given its chosen information.""" + + @staticmethod + @abstractmethod + def create_edge_label(info: EdgeInfo) -> str: + """Create the label for an edge given its chosen information.""" + + @staticmethod + @abstractmethod + def get_legend_info_final_trace_node() -> str: + """Get the information to include in the legend for nodes that appear in the final trace.""" + + @staticmethod + @abstractmethod + def get_legend_info_other_node() -> str: + """Get the information to include in the legend for nodes that do not appear in the final trace.""" + + @staticmethod + @abstractmethod + def get_legend_info_final_trace_edge() -> str: + """Get the information to include in the legend for edges that appear in the final trace.""" + + @staticmethod + @abstractmethod + def get_legend_info_other_edge() -> str: + """Get the information to include in the legend for edges that do not appear in the final trace.""" + + @staticmethod + @abstractmethod + def get_tooltip_name() -> str: + """Get the name/title of the tooltip.""" diff --git a/robotmbt/visualise/graphs/scenariodeltavaluegraph.py b/robotmbt/visualise/graphs/scenariodeltavaluegraph.py new file mode 100644 index 00000000..f78baebf --- /dev/null +++ b/robotmbt/visualise/graphs/scenariodeltavaluegraph.py @@ -0,0 +1,89 @@ +# BSD 3-Clause License +# +# Copyright (c) 2026, T.B. Dubbeling, J. Foederer, T.S. Kas, D.R. Osinga, D.F. Serra e Silva, J.C. Willegers +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from robotmbt.modelspace import ModelSpace + +from robotmbt.visualise.graphs.abstractgraph import AbstractGraph +from robotmbt.visualise.models import ScenarioInfo, StateInfo + + +class ScenarioDeltaValueGraph(AbstractGraph[tuple[ScenarioInfo, set[tuple[str, str]]], None]): + """ + The Scenario-delta-Value graph keeps track of both the scenarios and state updates encountered. + Its nodes are scenarios together with the property assignments after the scenario has run. + Its edges represent steps in the trace. + """ + + @staticmethod + def select_node_info(trace: list[tuple[ScenarioInfo, StateInfo]], index: int) \ + -> tuple[ScenarioInfo, set[tuple[str, str]]]: + if index == 0: + return trace[0][0], StateInfo(ModelSpace()).difference(trace[0][1]) + else: + return trace[index][0], trace[index-1][1].difference(trace[index][1]) + + @staticmethod + def select_edge_info(pair: tuple[ScenarioInfo, StateInfo]) -> None: + return None + + @staticmethod + def create_node_description(trace: list[tuple[ScenarioInfo, StateInfo]], index: int) -> str: + return str(trace[index][1]).replace('\n', '
') + + @staticmethod + def create_node_label(info: tuple[ScenarioInfo, set[tuple[str, str]]]) -> str: + res = "" + for assignment in info[1]: + res += "\n\n"+assignment[0]+":"+assignment[1] + return f"{info[0].name}{res}" + + @staticmethod + def create_edge_label(info: None) -> str: + return '' + + @staticmethod + def get_legend_info_final_trace_node() -> str: + return "Executed Scenario w/ Changes in Execution State (in final trace)" + + @staticmethod + def get_legend_info_other_node() -> str: + return "Executed Scenario w/ Changes in Execution State (backtracked)" + + @staticmethod + def get_legend_info_final_trace_edge() -> str: + return "Execution Flow (final trace)" + + @staticmethod + def get_legend_info_other_edge() -> str: + return "Execution Flow (backtracked)" + + @staticmethod + def get_tooltip_name() -> str: + return "Full state" diff --git a/robotmbt/visualise/graphs/scenariograph.py b/robotmbt/visualise/graphs/scenariograph.py new file mode 100644 index 00000000..dee8c726 --- /dev/null +++ b/robotmbt/visualise/graphs/scenariograph.py @@ -0,0 +1,79 @@ +# BSD 3-Clause License +# +# Copyright (c) 2026, T.B. Dubbeling, J. Foederer, T.S. Kas, D.R. Osinga, D.F. Serra e Silva, J.C. Willegers +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from robotmbt.visualise.graphs.abstractgraph import AbstractGraph +from robotmbt.visualise.models import ScenarioInfo, StateInfo + + +class ScenarioGraph(AbstractGraph[ScenarioInfo, None]): + """ + The scenario graph is the most basic representation of trace exploration. + It represents scenarios as nodes, and the trace as edges. + """ + + @staticmethod + def select_node_info(trace: list[tuple[ScenarioInfo, StateInfo]], index: int) -> ScenarioInfo: + return trace[index][0] + + @staticmethod + def select_edge_info(pair: tuple[ScenarioInfo, StateInfo]) -> None: + return None + + @staticmethod + def create_node_description(trace: list[tuple[ScenarioInfo, StateInfo]], index: int) -> str: + return '' + + @staticmethod + def create_node_label(info: ScenarioInfo) -> str: + return info.name + + @staticmethod + def create_edge_label(info: None) -> str: + return '' + + @staticmethod + def get_legend_info_final_trace_node() -> str: + return "Executed Scenario (in final trace)" + + @staticmethod + def get_legend_info_other_node() -> str: + return "Executed Scenario (backtracked)" + + @staticmethod + def get_legend_info_final_trace_edge() -> str: + return "Execution Flow (final trace)" + + @staticmethod + def get_legend_info_other_edge() -> str: + return "Execution Flow (backtracked)" + + @staticmethod + def get_tooltip_name() -> str: + return "" diff --git a/robotmbt/visualise/models.py b/robotmbt/visualise/models.py new file mode 100644 index 00000000..fcf94276 --- /dev/null +++ b/robotmbt/visualise/models.py @@ -0,0 +1,298 @@ +# BSD 3-Clause License +# +# Copyright (c) 2026, T.B. Dubbeling, J. Foederer, T.S. Kas, D.R. Osinga, D.F. Serra e Silva, J.C. Willegers +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from typing import Any + +from robot.api import logger + +from robotmbt.modelspace import ModelSpace +from robotmbt.suitedata import Scenario + +import jsonpickle +import tempfile +import os + +DESIRED_NAME_LINE_LENGTH = 20 + + +class ScenarioInfo: + """ + This contains all information we need from scenarios, abstracting away from the actual Scenario class: + - name + - src_id + """ + + def __init__(self, scenario: Scenario | str): + if isinstance(scenario, Scenario): + # default case + self.name = self._split_name(scenario.name) + self.src_id = scenario.src_id + else: + # unit tests + self.name = scenario + self.src_id = scenario + + def __str__(self): + return f"Scenario {self.src_id}: {self.name}" + + def __eq__(self, other): + return self.src_id == other.src_id + + @staticmethod + def _split_name(name: str) -> str: + """ + Split a name into separate lines where each line is as close to the desired line length as possible. + """ + # Split into words + words = name.split(" ") + + # If any word is longer than the desired length, use that as the desired length instead + # (otherwise, we will always get a line (much) longer than the desired length, while the other lines will + # be constrained by the desired length) + desired_length = DESIRED_NAME_LINE_LENGTH + for i in words: + if len(i) > desired_length: + desired_length = len(i) + + res = "" + line = words[0] + for i in words[1:]: + # If the previous line was fully appended, simply take the current word as the new line + if line == '\n': + line += i + continue + + app_len = len(line + ' ' + i) + + # If the word fully fits into the line, simply append it + if app_len < desired_length: + line = line + ' ' + i + continue + + app_diff = abs(desired_length - app_len) + curr_diff = abs(desired_length - len(line)) + + # If the current line is closer to the desired length, use that + if curr_diff < app_diff: + res += line + line = '\n' + i + # If the current line with the new word is closer to the desired length, use that + else: + res += line + ' ' + i + line = '\n' + + # Append the final line if it wasn't empty + if line != '\n': + res += line + + return res + + +class StateInfo: + """ + This contains all information we need from states, abstracting away from the actual ModelSpace class: + - domain + - properties + """ + + @classmethod + def _create_state_with_prop(cls, name: str, attrs: list[tuple[str, Any]]): + space = ModelSpace() + prop = ModelSpace() + for (key, val) in attrs: + prop.__setattr__(key, val) + space.props[name] = prop + return cls(space) + + def difference(self, new_state) -> set[tuple[str, str]]: + """ + new_state: the new StateInfo to be compared to the self. + returns: a set of tuples with properties and their assignment. + """ + old: dict[str, dict | str] = self.properties.copy() + new: dict[str, dict | str] = new_state.properties.copy() + temp = StateInfo._dict_deep_diff(old, new) + for key in temp.keys(): + res = "" + for k, v in sorted(temp[key].items()): + res += f"\n\t{k}={v}" + temp[key] = res + return set(temp.items()) # type inference goes wacky here + + @staticmethod + def _dict_deep_diff(old_state: dict[str, any], new_state: dict[str, any]) -> dict[str, any]: + res = {} + for key in new_state.keys(): + if key not in old_state: + res[key] = new_state[key] + elif isinstance(old_state[key], dict): + diff = StateInfo._dict_deep_diff(old_state[key], new_state[key]) + if len(diff) != 0: + res[key] = diff + elif old_state[key] != new_state[key]: + res[key] = new_state[key] + return res + + def __init__(self, state: ModelSpace): + self.domain = state.ref_id + + # Extract all attributes/properties stored in the model space and store them in the temp dict + # Similar in workings to ModelSpace's get_status_text + temp = {} + for p in state.props: + temp[p] = {} + if p == 'scenario': + temp['scenario'] = dict(state.props['scenario']) + else: + for attr in dir(state.props[p]): + temp[p][attr] = getattr(state.props[p], attr) + + # Filter empty entries + self.properties = {} + for p in temp.keys(): + if len(temp[p]) > 0: + self.properties[p] = temp[p].copy() + + def __eq__(self, other): + return self.domain == other.domain and self.properties == other.properties + + def __str__(self): + res = "" + for p in self.properties: + if res != "": + res += "\n\n" + res += f"{p}:" + for k, v in self.properties[p].items(): + res += f"\n\t{k}={v}" + return res + + +class TraceInfo: + """ + This keeps track of all information we need from all steps in trace exploration: + - current_trace: the trace currently being built up, a list of scenario/state pairs in order of execution + - all_traces: all valid traces encountered in trace exploration, up until the point they could not go any further + - previous_length: used to identify backtracking + """ + + def __init__(self): + self.current_trace: list[tuple[ScenarioInfo, StateInfo]] = [] + self.all_traces: list[list[tuple[ScenarioInfo, StateInfo]]] = [] + self.previous_length: int = 0 + self.pushed: bool = False + + def update_trace(self, scenario: ScenarioInfo | None, state: StateInfo, length: int): + """ + Updates TraceInfo instance with the information that a scenario has run resulting in the given state as the nth + scenario of the trace, where n is the value of the length parameter. If length is greater than the previous + length of the trace to be updated, adds the given scenario/state to the trace. If length is smaller than the + previous length of the trace, roll back the trace until the step indicated by length. + scenario: the scenario that has run. + state: the state after scenario has run. + length: the step in the trace the scenario occurred in. + """ + if length > self.previous_length: + # New state - push + self._push(scenario, state, length - self.previous_length) + self.previous_length = length + elif length < self.previous_length: + # Backtrack - pop + self._pop(self.previous_length - length) + self.previous_length = length + + # Sanity check + if len(self.current_trace) > 0: + self._sanity_check(scenario, state, 'popping') + else: + # No change - sanity check + if len(self.current_trace) > 0: + self._sanity_check(scenario, state, 'nothing') + + def _push(self, scenario: ScenarioInfo, state: StateInfo, n: int): + if n > 1: + logger.warn( + f"Pushing multiple scenario/state pairs at once to trace info ({n})! Some info might be lost!") + for _ in range(n): + self.current_trace.append((scenario, state)) + self.pushed = True + + def _pop(self, n: int): + if self.pushed: + self.all_traces.append(self.current_trace.copy()) + for _ in range(n): + self.current_trace.pop() + self.pushed = False + + def __repr__(self) -> str: + return f"TraceInfo(traces=[{[f'[{[self.stringify_pair(pair) for pair in trace]}]' for trace in self.all_traces]}], current=[{[self.stringify_pair(pair) for pair in self.current_trace]}])" + + def _sanity_check(self, scen: ScenarioInfo, state: StateInfo, after: str): + (prev_scen, prev_state) = self.current_trace[-1] + if prev_scen != scen: + logger.warn( + f'TraceInfo got out of sync after {after}\nExpected scenario: {prev_scen}\nActual scenario: {scen}') + if prev_state != state: + logger.warn( + f'TraceInfo got out of sync after {after}\nExpected state: {prev_state}\nActual state: {state}') + + def export_graph(self, suite_name: str, dir: str = '', atest: bool = False) -> str | None: + encoded_instance = jsonpickle.encode(self) + name = suite_name.lower().replace(' ', '_') + if atest: + ''' + temporary file to not accidentally overwrite an existing file + mkstemp() is not ideal but given Python's limitations this is the easiest solution + as temporary file, a different method, is problematic on Windows + https://stackoverflow.com/a/57015383 + ''' + fd, dir = tempfile.mkstemp() + with os.fdopen(fd, "w") as f: + f.write(encoded_instance) + return dir + + if dir[-1] != '/': + dir += '/' + + # create folders if they do not exist + if not os.path.exists(dir): + os.makedirs(dir) + + with open(f"{dir}{name}.json", "w") as f: + f.write(encoded_instance) + return None + + @staticmethod + def stringify_pair(pair: tuple[ScenarioInfo, StateInfo]) -> str: + """ + Takes a pair of a scenario and a state and returns a string describing both. + pair: a tuple consisting of a scenario and a state. + returns: formatted string based on the given scenario and state. + """ + return f"Scenario={pair[0].name}, State={pair[1]}" diff --git a/robotmbt/visualise/networkvisualiser.py b/robotmbt/visualise/networkvisualiser.py new file mode 100644 index 00000000..648f3d89 --- /dev/null +++ b/robotmbt/visualise/networkvisualiser.py @@ -0,0 +1,678 @@ +# BSD 3-Clause License +# +# Copyright (c) 2026, T.B. Dubbeling, J. Foederer, T.S. Kas, D.R. Osinga, D.F. Serra e Silva, J.C. Willegers +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from bokeh.core.enums import PlaceType, LegendLocationType +from bokeh.core.property.vectorization import value +from bokeh.embed import file_html +from bokeh.models import ColumnDataSource, Rect, Text, ResetTool, SaveTool, WheelZoomTool, PanTool, Plot, Range1d, \ + Title, FullscreenTool, CustomJS, Segment, Arrow, NormalHead, Bezier, Legend, ZoomInTool, ZoomOutTool, HoverTool + +from grandalf.graphs import Vertex as GVertex, Edge as GEdge, Graph as GGraph +from grandalf.layouts import SugiyamaLayout + +from networkx import DiGraph + +from robotmbt.visualise.graphs.abstractgraph import AbstractGraph + +# Padding within the nodes between the borders and inner text +HORIZONTAL_PADDING_WITHIN_NODES: int = 5 +VERTICAL_PADDING_WITHIN_NODES: int = 5 + +# Colors for different parts of the graph +FINAL_TRACE_NODE_COLOR: str = '#CCCC00' +OTHER_NODE_COLOR: str = '#999989' +FINAL_TRACE_EDGE_COLOR: str = '#444422' +OTHER_EDGE_COLOR: str = '#BBBBAA' + +# Legend placement +# Alignment within graph ('center' is in the middle, 'top_right' is the top right, etc.) +LEGEND_LOCATION: LegendLocationType | tuple[float, float] = 'top_right' +# Where it appears relative to graph ('center' is within, 'below' is below, etc.) +LEGEND_PLACE: PlaceType = 'center' + +# Dimensions of the plot in the window +INNER_WINDOW_WIDTH: int = 720 +INNER_WINDOW_HEIGHT: int = 480 +OUTER_WINDOW_WIDTH: int = INNER_WINDOW_WIDTH + (280 if LEGEND_PLACE == 'left' or LEGEND_PLACE == 'right' else 30) +OUTER_WINDOW_HEIGHT: int = INNER_WINDOW_HEIGHT + (150 if LEGEND_PLACE == 'below' or LEGEND_PLACE == 'center' else 30) + +# Font sizes +MAJOR_FONT_SIZE: int = 16 +MINOR_FONT_SIZE: int = 8 + + +class Node: + """ + Contains the information we need to add a node to the graph. + """ + + def __init__(self, node_id: str, label: str, x: int, y: int, width: float, height: float, description: str): + self.node_id = node_id + self.label = label + self.x = x + self.y = y + self.width = width + self.height = height + self.description = description + + +class Edge: + """ + Contains the information we need to add an edge to the graph. + """ + + def __init__(self, from_node: str, to_node: str, label: str, points: list[tuple[float, float]]): + self.from_node = from_node + self.to_node = to_node + self.label = label + self.points = points + + +class NetworkVisualiser: + """ + A container for a Bokeh graph, which can be created from any abstract graph. + """ + + def __init__(self, graph: AbstractGraph, suite_name: str): + # Extract what we need from the graph + self.networkx: DiGraph = graph.networkx + self.final_trace = graph.get_final_trace() + self.start = graph.start_node + + # Set up a Bokeh figure + self.plot = Plot(width=OUTER_WINDOW_WIDTH, height=OUTER_WINDOW_HEIGHT) + self.plot.output_backend = "svg" + + # Create Sugiyama layout + nodes, edges = self._create_layout() + self.node_dict: dict[str, Node] = {} + for node in nodes: + self.node_dict[node.node_id] = node + + # Keep track of arrows in the graph for scaling + self.arrows = [] + + # Create the hover tool to show tooltips + tooltip_name = graph.get_tooltip_name() + if tooltip_name: + self.hover = HoverTool() + tooltips = f""" +
+ {tooltip_name} +
+ @description{{safe}} +
+
+ """ + self.hover.tooltips = tooltips + else: + self.hover = None + + # Add the nodes to the graph + self._add_nodes(nodes) + + # Add the edges to the graph + self._add_edges(nodes, edges) + + # Add a legend to the graph + self._add_legend(graph) + + # Add our features to the graph (e.g. tools) + self._add_features(suite_name) + + def generate_html(self): + """ + Generate HTML for the Bokeh graph. + """ + return file_html(self.plot, 'inline', "graph") + + def _add_nodes(self, nodes: list[Node]): + """ + Add the nodes to the graph in the form of Rect and Text glyphs. + """ + # The ColumnDataSources to store our nodes and edges in Bokeh's format + node_source: ColumnDataSource = ColumnDataSource( + {'id': [], 'x': [], 'y': [], 'w': [], 'h': [], 'color': [], 'description': []}) + node_label_source: ColumnDataSource = ColumnDataSource( + {'id': [], 'x': [], 'y': [], 'label': []}) + + # Add all nodes to the column data sources + for node in nodes: + _add_node_to_sources(node, self.final_trace, node_source, node_label_source) + + # Add the glyphs for nodes and their labels + node_glyph = Rect(x='x', y='y', width='w', height='h', fill_color='color') + node_glyph_renderer = self.plot.add_glyph(node_source, node_glyph) + + if self.hover is not None: + self.hover.renderers = [node_glyph_renderer] + + node_label_glyph = Text(x='x', y='y', text='label', text_align='left', text_baseline='middle', + text_font_size=f'{MAJOR_FONT_SIZE}pt', text_font=value("Courier New")) + node_label_glyph.tags = [f"scalable_text{MAJOR_FONT_SIZE}"] + self.plot.add_glyph(node_label_source, node_label_glyph) + + def _add_edges(self, nodes: list[Node], edges: list[Edge]): + """ + Add the edges to the graph in the form of Arrow layouts and Segment, Bezier, and Text glyphs. + """ + # The ColumnDataSources to store our edges in Bokeh's format + edge_part_source: ColumnDataSource = ColumnDataSource( + {'from': [], 'to': [], 'start_x': [], 'start_y': [], 'end_x': [], 'end_y': [], 'color': []}) + edge_arrow_source: ColumnDataSource = ColumnDataSource( + {'from': [], 'to': [], 'start_x': [], 'start_y': [], 'end_x': [], 'end_y': [], 'color': []}) + edge_bezier_source: ColumnDataSource = ColumnDataSource( + {'from': [], 'to': [], 'start_x': [], 'start_y': [], 'end_x': [], 'end_y': [], 'control1_x': [], + 'control1_y': [], 'control2_x': [], 'control2_y': [], 'color': []}) + edge_label_source: ColumnDataSource = ColumnDataSource({'from': [], 'to': [], 'x': [], 'y': [], 'label': []}) + + for edge in edges: + _add_edge_to_sources(nodes, edge, self.final_trace, edge_part_source, edge_arrow_source, edge_bezier_source, + edge_label_source) + + # Add the glyphs for edges and their labels + edge_part_glyph = Segment(x0='start_x', y0='start_y', x1='end_x', y1='end_y', line_color='color') + self.plot.add_glyph(edge_part_source, edge_part_glyph) + + arrow_layout = Arrow( + end=NormalHead(size=10, fill_color='color', line_color='color'), + x_start='start_x', y_start='start_y', + x_end='end_x', y_end='end_y', line_color='color', + source=edge_arrow_source + ) + self.plot.add_layout(arrow_layout) + self.arrows.append(arrow_layout) + + edge_bezier_glyph = Bezier(x0='start_x', y0='start_y', x1='end_x', y1='end_y', cx0='control1_x', + cy0='control1_y', cx1='control2_x', cy1='control2_y', line_color='color') + self.plot.add_glyph(edge_bezier_source, edge_bezier_glyph) + + edge_label_glyph = Text(x='x', y='y', text='label', text_align='center', text_baseline='middle', + text_font_size=f'{MINOR_FONT_SIZE}pt', text_font=value("Courier New")) + edge_label_glyph.tags = [f"scalable_text{MINOR_FONT_SIZE}"] + self.plot.add_glyph(edge_label_source, edge_label_glyph) + + def _create_layout(self) -> tuple[list[Node], list[Edge]]: + """ + Create the Sugiyama layout using grandalf. + """ + # Containers to convert networkx nodes/edges to the proper format. + vertices = [] + edges = [] + flips = [] + + # Extract nodes from networkx and put them in the proper format to be used by grandalf. + start = None + for node_id in self.networkx.nodes: + v = GVertex(node_id) + w, h = _calculate_dimensions(self.networkx.nodes[node_id]['label']) + v.view = NodeView(w, h) + vertices.append(v) + if node_id == self.start: + start = v + + # Calculate which edges need to be flipped to make the graph acyclic. + flip = _flip_edges([e for e in self.networkx.edges]) + + # Extract edges from networkx and put them in the proper format to be used by grandalf. + for (from_id, to_id) in self.networkx.edges: + from_node = _find_node(vertices, from_id) + to_node = _find_node(vertices, to_id) + e = GEdge(from_node, to_node) + e.view = EdgeView() + edges.append(e) + if (from_id, to_id) in flip: + flips.append(e) + + # Feed the info to grandalf and get the layout. + g = GGraph(vertices, edges) + sugiyama = SugiyamaLayout(g.C[0]) + + # Set specific margins as these values worked best in user-testing + # Space between nodes + sugiyama.xspace = 10 + sugiyama.yspace = 15 + # Default width for nodes with no set width (in this case only for edge routing) + sugiyama.dw = 2 + # Default height for nodes with no set height (in this case only for edge routing) + sugiyama.dh = 2 + + sugiyama.init_all(roots=[start], inverted_edges=flips) + sugiyama.draw() + + # Extract the information we need from the nodes and edges and return them in our format. + ns = [] + for v in g.C[0].sV: + node_id = v.data + label = self.networkx.nodes[node_id]['label'] + (x, y) = v.view.xy + (w, h) = _calculate_dimensions(label) + description = self.networkx.nodes[node_id]['description'] + ns.append(Node(node_id, label, x, -y, w, h, description)) + + es = [] + for e in g.C[0].sE: + from_id = e.v[0].data + to_id = e.v[1].data + label = self.networkx.edges[(from_id, to_id)]['label'] + points = [] + # invert y axis + for p in e.view.points: + points.append((p[0], -p[1])) + + es.append(Edge(from_id, to_id, label, points)) + + return ns, es + + def _add_features(self, suite_name: str): + """ + Add our features to the graph such as tools, titles, and JavaScript callbacks. + """ + self.plot.add_layout(Title(text=suite_name, align="center"), "above") + + # Add the different tools + wheel_zoom = WheelZoomTool() + self.plot.add_tools(ResetTool(), SaveTool(), + wheel_zoom, PanTool(), + FullscreenTool(), ZoomInTool(factor=0.4), ZoomOutTool(factor=0.4)) + self.plot.toolbar.active_scroll = wheel_zoom + + if self.hover: + self.plot.add_tools(self.hover) + + # Specify the default range - these values represent the aspect ratio of the actual view in the window + self.plot.x_range = Range1d(-INNER_WINDOW_WIDTH / 2, INNER_WINDOW_WIDTH / 2) + self.plot.y_range = Range1d(-INNER_WINDOW_HEIGHT, 0) + self.plot.x_range.tags = [{"initial_span": INNER_WINDOW_WIDTH}] + self.plot.y_range.tags = [{"initial_span": INNER_WINDOW_HEIGHT}] + + # A JS callback to scale text and arrows, and change aspect ratio. + resize_cb = CustomJS(args=dict(xr=self.plot.x_range, yr=self.plot.y_range, plot=self.plot, arrows=self.arrows), + code=f""" + // Initialize initial scale tag + if (!plot.tags || plot.tags.length === 0) {{ + plot.tags = [{{ + initial_scale: plot.inner_height / (yr.end - yr.start) + }}] + }} + + // Calculate current x and y span + const xspan = xr.end - xr.start; + const yspan = yr.end - yr.start; + + // Calculate inner aspect ratio and span aspect ratio + const inner_aspect = plot.inner_width / plot.inner_height; + const span_aspect = xspan / yspan; + + // Let span aspect ratio match inner aspect ratio if needed + if (Math.abs(inner_aspect - span_aspect) > 0.05) {{ + const xmid = xr.start + xspan / 2; + const new_xspan = yspan * inner_aspect; + xr.start = xmid - new_xspan / 2; + xr.end = xmid + new_xspan / 2; + }} + + // Calculate scale factor + const scale = (plot.inner_height / yspan) / plot.tags[0].initial_scale + + // Scale text + for (const r of plot.renderers) {{ + if (!r.glyph || !r.glyph.tags) continue + + if (r.glyph.tags.includes("scalable_text{MAJOR_FONT_SIZE}")) {{ + r.glyph.text_font_size = Math.floor({MAJOR_FONT_SIZE} * scale) + "pt" + }} + + if (r.glyph.tags.includes("scalable_text{MINOR_FONT_SIZE}")) {{ + r.glyph.text_font_size = Math.floor({MINOR_FONT_SIZE} * scale) + "pt" + }} + }} + + // Scale arrows + for (const a of arrows) {{ + if (!a.properties) continue; + if (!a.properties.end) continue; + if (!a.properties.end._value) continue; + if (!a.properties.end._value.properties) continue; + if (!a.properties.end._value.properties.size) continue; + if (!a.properties.end._value.properties.size._value) continue; + if (!a.properties.end._value.properties.size._value.value) continue; + if (a._base_end_size == null) + a._base_end_size = a.properties.end._value.properties.size._value.value; + a.properties.end._value.properties.size._value.value = a._base_end_size * Math.sqrt(scale); + a.change.emit(); + }}""") + + # Add the callback to the values that change when zooming/resizing. + self.plot.x_range.js_on_change("start", resize_cb) + self.plot.x_range.js_on_change("end", resize_cb) + self.plot.y_range.js_on_change("start", resize_cb) + self.plot.y_range.js_on_change("end", resize_cb) + self.plot.js_on_change("inner_width", resize_cb) + self.plot.js_on_change("inner_height", resize_cb) + + def _add_legend(self, graph: AbstractGraph): + """ + Adds a legend to the graph with the node/edge information from the given graph. + """ + empty_source = ColumnDataSource({'_': [0]}) + + final_trace_node_glyph = Rect(x='_', y='_', width='_', height='_', fill_color=FINAL_TRACE_NODE_COLOR) + final_trace_node = self.plot.add_glyph(empty_source, final_trace_node_glyph) + + other_node_glyph = Rect(x='_', y='_', width='_', height='_', fill_color=OTHER_NODE_COLOR) + other_node = self.plot.add_glyph(empty_source, other_node_glyph) + + final_trace_edge_glyph = Segment( + x0='_', x1='_', + y0='_', y1='_', line_color=FINAL_TRACE_EDGE_COLOR + ) + final_trace_edge = self.plot.add_glyph(empty_source, final_trace_edge_glyph) + + other_edge_glyph = Segment( + x0='_', x1='_', + y0='_', y1='_', line_color=OTHER_EDGE_COLOR + ) + other_edge = self.plot.add_glyph(empty_source, other_edge_glyph) + + legend = Legend(items=[(graph.get_legend_info_final_trace_node(), [final_trace_node]), + (graph.get_legend_info_other_node(), [other_node]), + (graph.get_legend_info_final_trace_edge(), [final_trace_edge]), + (graph.get_legend_info_other_edge(), [other_edge])], + location=LEGEND_LOCATION, orientation="vertical") + self.plot.add_layout(legend, LEGEND_PLACE) + + +class NodeView: + """ + A view of a node in the format that grandalf expects. + """ + + def __init__(self, width: float, height: float): + self.w, self.h = width, height + self.xy = (0, 0) + + +class EdgeView: + """ + A view of an edge in the format that grandalf expects. + """ + + def __init__(self): + self.points = [] + + def setpath(self, points: list[tuple[float, float]]): + self.points = points + + +def _find_node(nodes: list[GVertex], node_id: str): + """ + Find a node given its id in a list of grandalf nodes. + """ + for node in nodes: + if node.data == node_id: + return node + return None + + +def _get_connection_coordinates(nodes: list[Node], node_id: str) -> list[tuple[float, float]]: + """ + Get the coordinates where edges can connect to a node given its id. + These places are the middle of the left, right, top, and bottom edge, as well as the corners of the node. + """ + start_possibilities = [] + + # Get node from list + node = None + for n in nodes: + if n.node_id == node_id: + node = n + break + + # Left + start_possibilities.append((node.x - node.width / 2, node.y)) + # Right + start_possibilities.append((node.x + node.width / 2, node.y)) + # Bottom + start_possibilities.append((node.x, node.y - node.height / 2)) + # Top + start_possibilities.append((node.x, node.y + node.height / 2)) + # Left bottom + start_possibilities.append((node.x - node.width / 2, node.y - node.height / 2)) + # Left top + start_possibilities.append((node.x - node.width / 2, node.y + node.height / 2)) + # Right bottom + start_possibilities.append((node.x + node.width / 2, node.y - node.height / 2)) + # Right top + start_possibilities.append((node.x + node.width / 2, node.y + node.height / 2)) + + return start_possibilities + + +def _minimize_distance(from_pos: list[tuple[float, float]], to_pos: list[tuple[float, float]]) -> tuple[ + float, float, float, float]: + """ + Find a pair of positions that minimizes their distance. + """ + min_dist = -1 + fx, fy, tx, ty = 0, 0, 0, 0 + + # Calculate the distance between all permutations + for fp in from_pos: + for tp in to_pos: + distance = (fp[0] - tp[0]) ** 2 + (fp[1] - tp[1]) ** 2 + if min_dist == -1 or distance < min_dist: + min_dist = distance + fx, fy, tx, ty = fp[0], fp[1], tp[0], tp[1] + + # Return the permutation with the shortest distance + return fx, fy, tx, ty + + +def _add_edge_to_sources(nodes: list[Node], edge: Edge, final_trace: list[str], edge_part_source: ColumnDataSource, + edge_arrow_source: ColumnDataSource, edge_bezier_source: ColumnDataSource, + edge_label_source: ColumnDataSource): + """ + Add an edge between two nodes to the ColumnDataSources. + Contains all logic to set their color, find their attachment points, and do self-loops properly. + """ + in_final_trace = False + for i in range(len(final_trace) - 1): + if edge.from_node == final_trace[i] and edge.to_node == final_trace[i + 1]: + in_final_trace = True + break + + if edge.from_node == edge.to_node: + _add_self_loop_to_sources(nodes, edge, in_final_trace, edge_arrow_source, edge_bezier_source, edge_label_source) + return + + start_x, start_y = 0, 0 + end_x, end_y = 0, 0 + + # Add edges going through the calculated points + for i in range(len(edge.points) - 1): + start_x, start_y = edge.points[i] + end_x, end_y = edge.points[i + 1] + + # Collect possibilities where the edge can start and end + if i == 0: + from_possibilities = _get_connection_coordinates(nodes, edge.from_node) + else: + from_possibilities = [(start_x, start_y)] + + if i == len(edge.points) - 2: + to_possibilities = _get_connection_coordinates(nodes, edge.to_node) + else: + to_possibilities = [(end_x, end_y)] + + # Choose connection points that minimize edge length + start_x, start_y, end_x, end_y = _minimize_distance(from_possibilities, to_possibilities) + + if i < len(edge.points) - 2: + # Middle part of edge without arrow + edge_part_source.data['from'].append(edge.from_node) + edge_part_source.data['to'].append(edge.to_node) + edge_part_source.data['start_x'].append(start_x) + edge_part_source.data['start_y'].append(start_y) + edge_part_source.data['end_x'].append(end_x) + edge_part_source.data['end_y'].append(end_y) + edge_part_source.data['color'].append(FINAL_TRACE_EDGE_COLOR if in_final_trace else OTHER_EDGE_COLOR) + else: + # End of edge with arrow + edge_arrow_source.data['from'].append(edge.from_node) + edge_arrow_source.data['to'].append(edge.to_node) + edge_arrow_source.data['start_x'].append(start_x) + edge_arrow_source.data['start_y'].append(start_y) + edge_arrow_source.data['end_x'].append(end_x) + edge_arrow_source.data['end_y'].append(end_y) + edge_arrow_source.data['color'].append(FINAL_TRACE_EDGE_COLOR if in_final_trace else OTHER_EDGE_COLOR) + + # Add the label + edge_label_source.data['from'].append(edge.from_node) + edge_label_source.data['to'].append(edge.to_node) + edge_label_source.data['x'].append((start_x + end_x) / 2) + edge_label_source.data['y'].append((start_y + end_y) / 2) + edge_label_source.data['label'].append(edge.label) + + +def _add_self_loop_to_sources(nodes: list[Node], edge: Edge, in_final_trace: bool, edge_arrow_source: ColumnDataSource, + edge_bezier_source: ColumnDataSource, edge_label_source: ColumnDataSource): + """ + Add a self-loop edge for a node to the ColumnDataSources, consisting of a Beziér curve and an arrow. + """ + connection = _get_connection_coordinates(nodes, edge.from_node) + + right_x, right_y = connection[1] + + # Add the Bézier curve + edge_bezier_source.data['from'].append(edge.from_node) + edge_bezier_source.data['to'].append(edge.to_node) + edge_bezier_source.data['start_x'].append(right_x) + edge_bezier_source.data['start_y'].append(right_y + 5) + edge_bezier_source.data['end_x'].append(right_x) + edge_bezier_source.data['end_y'].append(right_y - 5) + edge_bezier_source.data['control1_x'].append(right_x + 25) + edge_bezier_source.data['control1_y'].append(right_y + 25) + edge_bezier_source.data['control2_x'].append(right_x + 25) + edge_bezier_source.data['control2_y'].append(right_y - 25) + edge_bezier_source.data['color'].append(FINAL_TRACE_EDGE_COLOR if in_final_trace else OTHER_EDGE_COLOR) + + # Add the arrow + edge_arrow_source.data['from'].append(edge.from_node) + edge_arrow_source.data['to'].append(edge.to_node) + edge_arrow_source.data['start_x'].append(right_x + 0.001) + edge_arrow_source.data['start_y'].append(right_y - 5.001) + edge_arrow_source.data['end_x'].append(right_x) + edge_arrow_source.data['end_y'].append(right_y - 5) + edge_arrow_source.data['color'].append(FINAL_TRACE_EDGE_COLOR if in_final_trace else OTHER_EDGE_COLOR) + + # Add the label + edge_label_source.data['from'].append(edge.from_node) + edge_label_source.data['to'].append(edge.to_node) + edge_label_source.data['x'].append(right_x + 25) + edge_label_source.data['y'].append(right_y) + edge_label_source.data['label'].append(edge.label) + + +def _add_node_to_sources(node: Node, final_trace: list[str], node_source: ColumnDataSource, + node_label_source: ColumnDataSource): + """ + Add a node to the ColumnDataSources. + """ + node_source.data['id'].append(node.node_id) + node_source.data['x'].append(node.x) + node_source.data['y'].append(node.y) + node_source.data['w'].append(node.width) + node_source.data['h'].append(node.height) + node_source.data['color'].append( + FINAL_TRACE_NODE_COLOR if node.node_id in final_trace else OTHER_NODE_COLOR) + node_source.data['description'].append(node.description) + + node_label_source.data['id'].append(node.node_id) + node_label_source.data['x'].append(node.x - node.width / 2 + HORIZONTAL_PADDING_WITHIN_NODES) + node_label_source.data['y'].append(node.y) + node_label_source.data['label'].append(node.label) + + +def _calculate_dimensions(label: str) -> tuple[float, float]: + """ + Calculate a node's dimensions based on its label and the given font size constant. + Assumes the font is Courier New. + """ + lines = label.splitlines() + width = 0 + for line in lines: + width = max(width, len(line) * (MAJOR_FONT_SIZE / 3 + 5)) + height = len(lines) * (MAJOR_FONT_SIZE / 2 + 9) * 1.37 - 9 + return width + 2 * HORIZONTAL_PADDING_WITHIN_NODES, height + 2 * VERTICAL_PADDING_WITHIN_NODES + + +def _flip_edges(edges: list[tuple[str, str]]) -> list[tuple[str, str]]: + """ + Calculate which edges need to be flipped to make a graph acyclic. + """ + # Step 1: Build adjacency list from edges + adj = {} + for u, v in edges: + if u not in adj: + adj[u] = [] + adj[u].append(v) + + # Step 2: Helper function to detect cycles + def dfs(node, visited, rec_stack, cycle_edges): + visited[node] = True + rec_stack[node] = True + + if node in adj: + for neighbor in adj[node]: + edge = (node, neighbor) + + if not visited.get(neighbor, False): + if dfs(neighbor, visited, rec_stack, cycle_edges): + cycle_edges.append(edge) + elif rec_stack.get(neighbor, False): + # Found a cycle, add the edge to the cycle_edges list + cycle_edges.append(edge) + + rec_stack[node] = False + return False + + # Step 3: Detect cycles + visited = {} + rec_stack = {} + cycle_edges = [] + + for node in adj: + if not visited.get(node, False): + dfs(node, visited, rec_stack, cycle_edges) + + # Step 4: Return the list of edges that need to be flipped + # In this case, the cycle_edges are the ones that we need to "break" by flipping + return cycle_edges diff --git a/robotmbt/visualise/visualiser.py b/robotmbt/visualise/visualiser.py new file mode 100644 index 00000000..e5a7fdc3 --- /dev/null +++ b/robotmbt/visualise/visualiser.py @@ -0,0 +1,123 @@ +# BSD 3-Clause License +# +# Copyright (c) 2026, T.B. Dubbeling, J. Foederer, T.S. Kas, D.R. Osinga, D.F. Serra e Silva, J.C. Willegers +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from robotmbt.modelspace import ModelSpace +from robotmbt.tracestate import TraceState +from robotmbt.visualise import networkvisualiser +from robotmbt.visualise.graphs.scenariodeltavaluegraph import ScenarioDeltaValueGraph +from robotmbt.visualise.graphs.abstractgraph import AbstractGraph +from robotmbt.visualise.graphs.scenariograph import ScenarioGraph +from robotmbt.visualise.models import TraceInfo, StateInfo, ScenarioInfo +import html + +GRAPHS = { + 'scenario': ScenarioGraph, + 'scenario-delta-value': ScenarioDeltaValueGraph, +} + + +class Visualiser: + """ + The Visualiser class bridges the different concerns to provide + a simple interface through which the graph can be updated, + and retrieved in HTML format. + """ + + # glue method to let us construct Visualiser objects in Robot tests. + @classmethod + def construct(cls, graph_type: str): + # just calls __init__, but without having underscores etc. + return cls(graph_type) + + def __init__(self, graph_type: str, suite_name: str = "", export: str = '', trace_info: TraceInfo = None): + if not export and not graph_type in GRAPHS.keys(): + raise ValueError(f"Unknown graph type: {graph_type}") + + self.graph_type: str = graph_type + + if trace_info is None: + self.trace_info: TraceInfo = TraceInfo() + else: + self.trace_info = trace_info + + self.suite_name = suite_name + self.export = export + + def update_trace(self, trace: TraceState): + """ + Uses the new snapshots from trace to update the trace info. + Multiple new snapshots can be pushed or popped at once. + """ + trace_len = len(trace._snapshots) + # We don't have any information + if trace_len == 0: + self.trace_info.update_trace(None, StateInfo(ModelSpace()), 0) + + # New snapshots have been pushed + elif trace_len > self.trace_info.previous_length: + prev = self.trace_info.previous_length + r = trace_len - prev + # Extract all snapshots that have been pushed and update our trace info with their scenario/model info + for i in range(r): + snap = trace._snapshots[prev + i] + scenario = snap.scenario + model = snap.model + self.trace_info.update_trace(ScenarioInfo(scenario), StateInfo(model), prev + i + 1) + + # Snapshots have been removed + else: + snap = trace._snapshots[-1] + scenario = snap.scenario + model = snap.model + self.trace_info.update_trace(ScenarioInfo(scenario), StateInfo(model), trace_len) + + def _get_graph(self) -> AbstractGraph | None: + if self.graph_type not in GRAPHS.keys(): + return None + + return GRAPHS[self.graph_type](self.trace_info) + + def generate_visualisation(self) -> tuple[str, bool]: + """ + Finalize the visualisation. Exports the graph to JSON if requested, and generates HTML if requested. + The boolean signals whether the output is in HTML format or not. + """ + if self.export: + self.trace_info.export_graph(self.suite_name, self.export) + + graph: AbstractGraph = self._get_graph() + if graph is None and self.export: + return f"Successfully exported to {self.export}!", False + elif graph is None: + raise ValueError(f"Unknown graph type: {self.graph_type}") + + html_bokeh = networkvisualiser.NetworkVisualiser(graph, self.suite_name).generate_html() + + return f'', True diff --git a/utest/test_visualise_models.py b/utest/test_visualise_models.py new file mode 100644 index 00000000..e15b95a3 --- /dev/null +++ b/utest/test_visualise_models.py @@ -0,0 +1,215 @@ +# BSD 3-Clause License +# +# Copyright (c) 2026, T.B. Dubbeling, J. Foederer, T.S. Kas, D.R. Osinga, D.F. Serra e Silva, J.C. Willegers +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import unittest + +try: + from robotmbt.visualise.models import * + + VISUALISE = True +except ImportError: + VISUALISE = False + +if VISUALISE: + class TestVisualiseModels(unittest.TestCase): + """ + Contains tests for robotmbt/visualise/models.py + """ + + """ + Class: ScenarioInfo + """ + + def test_scenarioInfo_str(self): + si = ScenarioInfo('test') + self.assertEqual(si.name, 'test') + self.assertEqual(si.src_id, 'test') + + def test_scenarioInfo_Scenario(self): + s = Scenario('test') + s.src_id = 0 + si = ScenarioInfo(s) + self.assertEqual(si.name, 'test') + self.assertEqual(si.src_id, 0) + + def test_split_name_empty_string(self): + result = ScenarioInfo._split_name("") + self.assertEqual(result, "") + self.assertNotIn('\n', result) + + def test_split_name_single_short_word(self): + result = ScenarioInfo._split_name("Hello") + self.assertEqual(result, "Hello") + self.assertNotIn('\n', result) + + def test_split_name_single_exact_length_word(self): + exact_20 = "abcdefghijklmnopqrst" + result = ScenarioInfo._split_name(exact_20) + self.assertEqual(result, exact_20) + self.assertNotIn('\n', result) + + def test_split_name_single_long_word(self): + name = "ThisIsAReallyLongNameWithoutAnySpacesAtAll" + result = ScenarioInfo._split_name(name) + self.assertEqual(result, name) + self.assertNotIn('\n', result) + + def test_split_name_two_words_short(self): + result = ScenarioInfo._split_name("Hello World") + self.assertEqual(result, "Hello World") + self.assertNotIn('\n', result) + + def test_split_name_two_words_exceeds_limit(self): + name = "Supercalifragilistic Hello" + result = ScenarioInfo._split_name(name) + + self.assertEqual(result.replace('\n', ' '), name) + self.assertIn('\n', result) + + def test_split_name_multiple_words_need_split(self): + name = "This is a very long scenario name that should be split" + result = ScenarioInfo._split_name(name) + + self.assertEqual(result.replace('\n', ' '), name) + self.assertIn('\n', result) + + """ + Class: StateInfo + """ + + def test_stateInfo_empty(self): + s = StateInfo(ModelSpace()) + self.assertEqual(str(s), '') + + def test_stateInfo_prop_empty(self): + space = ModelSpace() + space.props['prop1'] = ModelSpace() + s = StateInfo(space) + self.assertEqual(str(s), '') + + def test_stateInfo_prop_val(self): + space = ModelSpace() + prop1 = ModelSpace() + prop1.value = 1 + space.props['prop1'] = prop1 + s = StateInfo(space) + self.assertTrue('prop1:' in str(s)) + self.assertTrue('value=1' in str(s)) + + def test_stateInfo_prop_val_empty(self): + space = ModelSpace() + prop1 = ModelSpace() + prop1.value = 1 + prop2 = ModelSpace() + space.props['prop1'] = prop1 + space.props['prop2'] = prop2 + s = StateInfo(space) + self.assertTrue('prop1:' in str(s)) + self.assertTrue('value=1' in str(s)) + self.assertFalse('prop2:' in str(s)) + + """ + Class: TraceInfo + """ + + def test_trace_info_update_normal(self): + info = TraceInfo() + + self.assertEqual(len(info.current_trace), 0) + self.assertEqual(len(info.all_traces), 0) + + info.update_trace(ScenarioInfo('test'), StateInfo._create_state_with_prop('prop', [('value', 1)]), 1) + + self.assertEqual(len(info.current_trace), 1) + self.assertEqual(len(info.all_traces), 0) + + info.update_trace(ScenarioInfo('test'), StateInfo._create_state_with_prop('prop', [('value', 2)]), 2) + + self.assertEqual(len(info.current_trace), 2) + self.assertEqual(len(info.all_traces), 0) + + info.update_trace(ScenarioInfo('test'), StateInfo._create_state_with_prop('prop', [('value', 3)]), 3) + + self.assertEqual(len(info.current_trace), 3) + self.assertEqual(len(info.all_traces), 0) + + def test_trace_info_update_backtrack(self): + info = TraceInfo() + + self.assertEqual(len(info.current_trace), 0) + self.assertEqual(len(info.all_traces), 0) + + info.update_trace(ScenarioInfo('test'), StateInfo._create_state_with_prop('prop', [('value', 1)]), 1) + + self.assertEqual(len(info.current_trace), 1) + self.assertEqual(len(info.all_traces), 0) + + info.update_trace(ScenarioInfo('test'), StateInfo._create_state_with_prop('prop', [('value', 2)]), 2) + + self.assertEqual(len(info.current_trace), 2) + self.assertEqual(len(info.all_traces), 0) + + info.update_trace(ScenarioInfo('test'), StateInfo._create_state_with_prop('prop', [('value', 3)]), 3) + + self.assertEqual(len(info.current_trace), 3) + self.assertEqual(len(info.all_traces), 0) + + info.update_trace(ScenarioInfo('test'), StateInfo._create_state_with_prop('prop', [('value', 2)]), 2) + + self.assertEqual(len(info.current_trace), 2) + self.assertEqual(len(info.all_traces), 1) + self.assertEqual(len(info.all_traces[0]), 3) + + info.update_trace(ScenarioInfo('test'), StateInfo._create_state_with_prop('prop', [('value', 1)]), 1) + + self.assertEqual(len(info.current_trace), 1) + self.assertEqual(len(info.all_traces), 1) + self.assertEqual(len(info.all_traces[0]), 3) + + info.update_trace(ScenarioInfo('test'), StateInfo._create_state_with_prop('prop', [('value', 4)]), 2) + + self.assertEqual(len(info.current_trace), 2) + self.assertEqual(len(info.all_traces), 1) + self.assertEqual(len(info.all_traces[0]), 3) + + info.update_trace(ScenarioInfo('test'), StateInfo._create_state_with_prop('prop', [('value', 5)]), 3) + + self.assertEqual(len(info.current_trace), 3) + self.assertEqual(len(info.all_traces), 1) + self.assertEqual(len(info.all_traces[0]), 3) + + info.update_trace(ScenarioInfo('test'), StateInfo._create_state_with_prop('prop', [('value', 6)]), 4) + + self.assertEqual(len(info.current_trace), 4) + self.assertEqual(len(info.all_traces), 1) + self.assertEqual(len(info.all_traces[0]), 3) + +if __name__ == '__main__': + unittest.main()