From e2889b95fe1d9c1dc7da0e126ad17490bed8161f Mon Sep 17 00:00:00 2001 From: belmegatron Date: Thu, 13 Nov 2025 18:28:02 +0000 Subject: [PATCH 01/11] feat(PLU-192): initial attempt at attempting to import type information --- .../services/analysis_sync/analysis_sync.py | 255 ++++++++++++++++-- 1 file changed, 227 insertions(+), 28 deletions(-) diff --git a/reai_toolkit/app/services/analysis_sync/analysis_sync.py b/reai_toolkit/app/services/analysis_sync/analysis_sync.py index 6f76b0b..ff07eeb 100644 --- a/reai_toolkit/app/services/analysis_sync/analysis_sync.py +++ b/reai_toolkit/app/services/analysis_sync/analysis_sync.py @@ -1,13 +1,33 @@ import threading -from typing import Any, Callable +from typing import Any, Callable, cast import ida_kernwin +from libbs.api import DecompilerInterface +import libbs.artifacts +from libbs.decompilers.ida.compat import execute_ui, execute_write, execute_read + import idautils import idaapi from loguru import logger -from revengai import AnalysesCoreApi, Configuration, FunctionMapping -from libbs.decompilers.ida.compat import execute_write, execute_read +from revengai import ( + AnalysesCoreApi, + ApiClient, + Configuration, + FunctionMapping, + FunctionDataTypesList, + FunctionsDataTypesApi, + BaseResponseFunctionDataTypesList, + FunctionDataTypesListItem, + FunctionInfoOutput, + FunctionInfoInputFuncDepsInner, + Structure, + TypeDefinition, + Enumeration, + GlobalVariable, + FunctionTypeOutput, + StackVariable, +) from reai_toolkit.app.core.netstore_service import SimpleNetStore from reai_toolkit.app.core.shared_schema import GenericApiReturn @@ -16,11 +36,22 @@ from revengai import BaseResponseBasic +class TaggedDependency: + def __init__(self, dependency: Structure | Enumeration | TypeDefinition | GlobalVariable): + self.dependency: Structure | Enumeration | TypeDefinition | GlobalVariable = dependency + self.processed: bool = False + self.name: str = self.dependency.name + + def __repr__(self): + return self.dependency.__repr__() + + class AnalysisSyncService(IThreadService): _thread_callback: Callable[..., Any] = None def __init__(self, netstore_service: SimpleNetStore, sdk_config: Configuration): super().__init__(netstore_service=netstore_service, sdk_config=sdk_config) + self.deci: DecompilerInterface = DecompilerInterface.discover(force_decompiler="ida") def call_callback(self, generic_return: GenericApiReturn) -> None: self._thread_callback(generic_return) @@ -51,9 +82,7 @@ def _fetch_model_id(self, analysis_id: int) -> int: with self.yield_api_client(sdk_config=self.sdk_config) as api_client: analyses_client = AnalysesCoreApi(api_client) - analysis_details: BaseResponseBasic = analyses_client.get_analysis_basic_info( - analysis_id=analysis_id - ) + analysis_details = analyses_client.get_analysis_basic_info(analysis_id=analysis_id) model_id = analysis_details.data.model_id self.safe_put_model_id(model_id=model_id) model_name = analysis_details.data.model_name @@ -85,9 +114,7 @@ def _fetch_function_map(self, analysis_id: int) -> FunctionMapping: with self.yield_api_client(sdk_config=self.sdk_config) as api_client: analyses_client = AnalysesCoreApi(api_client) - function_map = analyses_client.get_analysis_function_map( - analysis_id=analysis_id - ) + function_map = analyses_client.get_analysis_function_map(analysis_id=analysis_id) func_map = function_map.data.function_maps self.safe_put_function_mapping(func_map=func_map) return func_map @@ -99,9 +126,7 @@ def _match_functions( function_map = func_map.function_map inverse_function_map = func_map.inverse_function_map - logger.info( - f"RevEng.AI: Retrieved {len(function_map)} function mappings from analysis" - ) + logger.info(f"RevEng.AI: Retrieved {len(function_map)} function mappings from analysis") # Compute which IDA functions match the revengai analysis functions matched_functions = [] @@ -122,9 +147,7 @@ def _match_functions( continue self.safe_set_name(start_ea, new_name, check_user_flags=True) - matched_functions.append( - (int(inverse_function_map[str(start_ea)]), start_ea) - ) + matched_functions.append((int(inverse_function_map[str(start_ea)]), start_ea)) local_function_vaddrs_matched.add(start_ea) else: unmatched_local_functions.append(start_ea) @@ -137,12 +160,8 @@ def _match_functions( unmatched_portal_map[int(func_vaddr)] = int(func_id_str) logger.info(f"RevEng.AI: Matched {len(matched_functions)} functions") - logger.info( - f"RevEng.AI: {len(unmatched_local_functions)} local functions not matched" - ) - logger.info( - f"RevEng.AI: {len(unmatched_remote_functions)} remote functions not matched" - ) + logger.info(f"RevEng.AI: {len(unmatched_local_functions)} local functions not matched") + logger.info(f"RevEng.AI: {len(unmatched_remote_functions)} remote functions not matched") return GenericApiReturn( success=True, @@ -154,12 +173,185 @@ def _match_functions( ), ) + def _get_data_types(self, analysis_id: int) -> FunctionDataTypesList | None: + with ApiClient(configuration=self.sdk_config) as api_client: + client = FunctionsDataTypesApi(api_client=api_client) + response: BaseResponseFunctionDataTypesList = ( + client.list_function_data_types_for_analysis(analysis_id) + ) + if response.status: + return response.data + + def _normalize_type(self, type: str) -> str: + # TODO: Do we need namespace information here? Observing discrepancies where sometimes namespace is used, and other times it isn't. + # I think we will have problems with C++ demangled symbol names using this approach. Needs investigating... + split_type: list[str] = type.split("::") + normalized: str = split_type[-1] + # normalized = type + + # It would appear this is a Ghidra-ism and IDA is unaware of this type. + # TODO: Auto add Ghidra typedef for primitives so we don't need to bother doing this... + if normalized == "uchar": + normalized = "unsigned char" + elif normalized == "qword": + normalized = "unsigned __int64" + elif normalized == "sqword": + normalized = "__int64" + + return normalized + + def _process_dependency( + self, tagged_dependency: TaggedDependency, lookup: dict[str, TaggedDependency] + ) -> None: + if tagged_dependency.processed: + logger.debug(f"skipping {tagged_dependency.name} as already processed") + return + + logger.debug(f"processing {tagged_dependency.name}...") + dependency = tagged_dependency.dependency + match dependency: + case Structure(): + s: Structure = cast(Structure, dependency) + for member in s.members.values(): + subdependency = lookup.get(member.type) + if subdependency: + self._process_dependency(subdependency, lookup) + member.type = self._normalize_type(member.type) + + self.deci.structs[s.name] = libbs.artifacts.Struct( + name=s.name, size=s.size, members={v.offset: v for v in s.members.values()} + ) + case Enumeration(): + e: Enumeration = cast(Enumeration, dependency) + self.deci.enums[e.name] = libbs.artifacts.Enum(name=e.name, members=e.members) + case TypeDefinition(): + t: TypeDefinition = cast(TypeDefinition, dependency) + + subdependency = lookup.get(t.type) + if subdependency: + self._process_dependency(subdependency, lookup) + + normalized_type: str = self._normalize_type(t.type) + self.deci.typedefs[t.name] = libbs.artifacts.Typedef( + name=t.name, type_=normalized_type + ) + case GlobalVariable(): + g: GlobalVariable = cast(GlobalVariable, dependency) + subdependency = lookup.get(g.type) + if subdependency: + self._process_dependency(subdependency, lookup) + + normalized_type = self._normalize_type(g.type) + self.deci.global_vars[g.addr] = libbs.artifacts.GlobalVariable( + addr=g.addr, name=g.name, type_=normalized_type, size=g.size + ) + case _: + logger.warning(f"unrecognised type: {dependency}") + + logger.debug(f"finished processing {dependency.name}") + tagged_dependency.processed = True + + @execute_ui + def _modify_function(self, func: FunctionTypeOutput) -> None: + # IDA expects an RVA so we need to subtract the base address. + base_address: int = self.deci.binary_base_addr + rva: int = func.addr - base_address + + target_func: libbs.artifacts.Function | None = self.deci.functions.get(rva) + if target_func is None: + return + + target_func.name = func.name + target_func.size = func.size + target_func.type = func.type + + # Check if we extracted stack variable data and import if so. + if func.stack_vars: + stack_var: StackVariable + for stack_var in func.stack_vars.values(): + # TODO: PLU-192 What do we want to do if a stack variable does not exist at the specified offset? + target_stack_var: libbs.artifacts.StackVariable | None = target_func.stack_vars.get( + stack_var.offset + ) + + if target_stack_var is None: + continue + + target_stack_var.name = stack_var.name + target_stack_var.type = self._normalize_type(stack_var.type) + target_stack_var.size = stack_var.size + + # Check the target function has a header. + if target_func.header: + target_func.header.name = func.header.name + target_func.header.type = self._normalize_type(func.header.type) + + for arg in func.header.args.values(): + arg.type = self._normalize_type(arg.type) + + target_func.header.args = {v.offset: v for v in func.header.args.values()} + + self.deci.functions[rva] = target_func + + def _import_data_types(self, functions: FunctionDataTypesList) -> None: + # TODO: PLU-192 If we already have debug symbols, do we want to skip this? I think we should! + # TODO: PLU-192 Factor this all out into it's own class as we are going to reuse it for both function matching and autounstrip + function: FunctionDataTypesListItem + + lookup: dict[str, TaggedDependency] = {} + for function in functions.items: + # Skip if data types are still being extracted. + if function.completed is False: + logger.warning( + f"extracting data types for {function.function_id} is still in progress..." + ) + continue + + data_types = function.data_types + if data_types is None: + continue + + # Build the lookup, this will allow us to check if we have processed a dependency before and stop us clobbering existing types and breaking xrefs. + for dependency in data_types.func_deps: + lookup[dependency.actual_instance.name] = TaggedDependency( + dependency.actual_instance + ) + + logger.debug("processing function dependencies") + for function in functions.items: + data_types: FunctionInfoOutput = function.data_types + + # No additional type information to import so we can skip. + if data_types is None: + continue + + # Enumerate function dependencies and check we have imported the associated type information. + dependency: FunctionInfoInputFuncDepsInner + for dependency in data_types.func_deps: + tagged_dependency = lookup.get(dependency.actual_instance.name) + self._process_dependency(tagged_dependency, lookup) + + logger.debug(f"processing {len(functions.items)} functions") + for i, function in enumerate(functions.items): + data_types: FunctionInfoOutput = function.data_types + + # No additional type information to import so we can skip. + if data_types: + # If we have info on the function signature, we can now apply it as we should have imported all dependencies. + func: FunctionTypeOutput | None = data_types.func_types + if func: + logger.debug(f"processing {func.to_dict()}") + self._modify_function(func) + logger.debug(f"processed {i+1} out of {len(functions.items)} functions") + else: + logger.debug(f"skipping function {i+1} as no func") + else: + logger.debug(f"skipping function {i+1} as no data types") + def _safe_match_functions( self, func_map: FunctionMapping ) -> GenericApiReturn[MatchedFunctionSummary]: - data = GenericApiReturn( - success=False, error_message="Failed to match functions." - ) + data = GenericApiReturn(success=False, error_message="Failed to match functions.") def _do(): try: @@ -175,15 +367,12 @@ def _do(): return data - def _sync_analysis_data( - self, stop_event: threading.Event, analysis_id: int - ) -> None: + def _sync_analysis_data(self, _: threading.Event, analysis_id: int) -> None: """ Syncs the analysis data until completion or failure. """ # Fetch Model ID - Used for function matching - response = self.api_request_returning( fn=lambda: self._fetch_model_id(analysis_id=analysis_id) ) @@ -203,5 +392,15 @@ def _sync_analysis_data( function_mapping: FunctionMapping = response.data response = self._safe_match_functions(func_map=function_mapping) + if not response.success: + self.call_callback(generic_return=response) + return + + result: FunctionDataTypesList | None = self._get_data_types(analysis_id) + if result and result.total_data_types_count: + logger.debug(f"applying type information for {analysis_id}...") + self._import_data_types(result) + else: + logger.debug(f"found no type information for {analysis_id}") self.call_callback(generic_return=response) From f3a6bf06b580afc334c5df10ba114cc1b2a1d158 Mon Sep 17 00:00:00 2001 From: belmegatron Date: Thu, 13 Nov 2025 20:07:12 +0000 Subject: [PATCH 02/11] feat(PLU-192): refactored and added transformations subpackage --- .../services/analysis_sync/analysis_sync.py | 181 +-------------- reai_toolkit/app/transformations/__init__.py | 0 .../app/transformations/import_data_types.py | 214 ++++++++++++++++++ 3 files changed, 217 insertions(+), 178 deletions(-) create mode 100644 reai_toolkit/app/transformations/__init__.py create mode 100644 reai_toolkit/app/transformations/import_data_types.py diff --git a/reai_toolkit/app/services/analysis_sync/analysis_sync.py b/reai_toolkit/app/services/analysis_sync/analysis_sync.py index ff07eeb..4dcb8e2 100644 --- a/reai_toolkit/app/services/analysis_sync/analysis_sync.py +++ b/reai_toolkit/app/services/analysis_sync/analysis_sync.py @@ -1,5 +1,5 @@ import threading -from typing import Any, Callable, cast +from typing import Any, Callable import ida_kernwin from libbs.api import DecompilerInterface @@ -18,15 +18,6 @@ FunctionDataTypesList, FunctionsDataTypesApi, BaseResponseFunctionDataTypesList, - FunctionDataTypesListItem, - FunctionInfoOutput, - FunctionInfoInputFuncDepsInner, - Structure, - TypeDefinition, - Enumeration, - GlobalVariable, - FunctionTypeOutput, - StackVariable, ) from reai_toolkit.app.core.netstore_service import SimpleNetStore @@ -51,7 +42,6 @@ class AnalysisSyncService(IThreadService): def __init__(self, netstore_service: SimpleNetStore, sdk_config: Configuration): super().__init__(netstore_service=netstore_service, sdk_config=sdk_config) - self.deci: DecompilerInterface = DecompilerInterface.discover(force_decompiler="ida") def call_callback(self, generic_return: GenericApiReturn) -> None: self._thread_callback(generic_return) @@ -182,172 +172,6 @@ def _get_data_types(self, analysis_id: int) -> FunctionDataTypesList | None: if response.status: return response.data - def _normalize_type(self, type: str) -> str: - # TODO: Do we need namespace information here? Observing discrepancies where sometimes namespace is used, and other times it isn't. - # I think we will have problems with C++ demangled symbol names using this approach. Needs investigating... - split_type: list[str] = type.split("::") - normalized: str = split_type[-1] - # normalized = type - - # It would appear this is a Ghidra-ism and IDA is unaware of this type. - # TODO: Auto add Ghidra typedef for primitives so we don't need to bother doing this... - if normalized == "uchar": - normalized = "unsigned char" - elif normalized == "qword": - normalized = "unsigned __int64" - elif normalized == "sqword": - normalized = "__int64" - - return normalized - - def _process_dependency( - self, tagged_dependency: TaggedDependency, lookup: dict[str, TaggedDependency] - ) -> None: - if tagged_dependency.processed: - logger.debug(f"skipping {tagged_dependency.name} as already processed") - return - - logger.debug(f"processing {tagged_dependency.name}...") - dependency = tagged_dependency.dependency - match dependency: - case Structure(): - s: Structure = cast(Structure, dependency) - for member in s.members.values(): - subdependency = lookup.get(member.type) - if subdependency: - self._process_dependency(subdependency, lookup) - member.type = self._normalize_type(member.type) - - self.deci.structs[s.name] = libbs.artifacts.Struct( - name=s.name, size=s.size, members={v.offset: v for v in s.members.values()} - ) - case Enumeration(): - e: Enumeration = cast(Enumeration, dependency) - self.deci.enums[e.name] = libbs.artifacts.Enum(name=e.name, members=e.members) - case TypeDefinition(): - t: TypeDefinition = cast(TypeDefinition, dependency) - - subdependency = lookup.get(t.type) - if subdependency: - self._process_dependency(subdependency, lookup) - - normalized_type: str = self._normalize_type(t.type) - self.deci.typedefs[t.name] = libbs.artifacts.Typedef( - name=t.name, type_=normalized_type - ) - case GlobalVariable(): - g: GlobalVariable = cast(GlobalVariable, dependency) - subdependency = lookup.get(g.type) - if subdependency: - self._process_dependency(subdependency, lookup) - - normalized_type = self._normalize_type(g.type) - self.deci.global_vars[g.addr] = libbs.artifacts.GlobalVariable( - addr=g.addr, name=g.name, type_=normalized_type, size=g.size - ) - case _: - logger.warning(f"unrecognised type: {dependency}") - - logger.debug(f"finished processing {dependency.name}") - tagged_dependency.processed = True - - @execute_ui - def _modify_function(self, func: FunctionTypeOutput) -> None: - # IDA expects an RVA so we need to subtract the base address. - base_address: int = self.deci.binary_base_addr - rva: int = func.addr - base_address - - target_func: libbs.artifacts.Function | None = self.deci.functions.get(rva) - if target_func is None: - return - - target_func.name = func.name - target_func.size = func.size - target_func.type = func.type - - # Check if we extracted stack variable data and import if so. - if func.stack_vars: - stack_var: StackVariable - for stack_var in func.stack_vars.values(): - # TODO: PLU-192 What do we want to do if a stack variable does not exist at the specified offset? - target_stack_var: libbs.artifacts.StackVariable | None = target_func.stack_vars.get( - stack_var.offset - ) - - if target_stack_var is None: - continue - - target_stack_var.name = stack_var.name - target_stack_var.type = self._normalize_type(stack_var.type) - target_stack_var.size = stack_var.size - - # Check the target function has a header. - if target_func.header: - target_func.header.name = func.header.name - target_func.header.type = self._normalize_type(func.header.type) - - for arg in func.header.args.values(): - arg.type = self._normalize_type(arg.type) - - target_func.header.args = {v.offset: v for v in func.header.args.values()} - - self.deci.functions[rva] = target_func - - def _import_data_types(self, functions: FunctionDataTypesList) -> None: - # TODO: PLU-192 If we already have debug symbols, do we want to skip this? I think we should! - # TODO: PLU-192 Factor this all out into it's own class as we are going to reuse it for both function matching and autounstrip - function: FunctionDataTypesListItem - - lookup: dict[str, TaggedDependency] = {} - for function in functions.items: - # Skip if data types are still being extracted. - if function.completed is False: - logger.warning( - f"extracting data types for {function.function_id} is still in progress..." - ) - continue - - data_types = function.data_types - if data_types is None: - continue - - # Build the lookup, this will allow us to check if we have processed a dependency before and stop us clobbering existing types and breaking xrefs. - for dependency in data_types.func_deps: - lookup[dependency.actual_instance.name] = TaggedDependency( - dependency.actual_instance - ) - - logger.debug("processing function dependencies") - for function in functions.items: - data_types: FunctionInfoOutput = function.data_types - - # No additional type information to import so we can skip. - if data_types is None: - continue - - # Enumerate function dependencies and check we have imported the associated type information. - dependency: FunctionInfoInputFuncDepsInner - for dependency in data_types.func_deps: - tagged_dependency = lookup.get(dependency.actual_instance.name) - self._process_dependency(tagged_dependency, lookup) - - logger.debug(f"processing {len(functions.items)} functions") - for i, function in enumerate(functions.items): - data_types: FunctionInfoOutput = function.data_types - - # No additional type information to import so we can skip. - if data_types: - # If we have info on the function signature, we can now apply it as we should have imported all dependencies. - func: FunctionTypeOutput | None = data_types.func_types - if func: - logger.debug(f"processing {func.to_dict()}") - self._modify_function(func) - logger.debug(f"processed {i+1} out of {len(functions.items)} functions") - else: - logger.debug(f"skipping function {i+1} as no func") - else: - logger.debug(f"skipping function {i+1} as no data types") - def _safe_match_functions( self, func_map: FunctionMapping ) -> GenericApiReturn[MatchedFunctionSummary]: @@ -399,7 +223,8 @@ def _sync_analysis_data(self, _: threading.Event, analysis_id: int) -> None: result: FunctionDataTypesList | None = self._get_data_types(analysis_id) if result and result.total_data_types_count: logger.debug(f"applying type information for {analysis_id}...") - self._import_data_types(result) + import_data_types: ImportDataTypes = ImportDataTypes() + import_data_types.execute(result) else: logger.debug(f"found no type information for {analysis_id}") diff --git a/reai_toolkit/app/transformations/__init__.py b/reai_toolkit/app/transformations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/reai_toolkit/app/transformations/import_data_types.py b/reai_toolkit/app/transformations/import_data_types.py new file mode 100644 index 0000000..64de2e2 --- /dev/null +++ b/reai_toolkit/app/transformations/import_data_types.py @@ -0,0 +1,214 @@ +from typing import cast + +import libbs.artifacts +from libbs.api import DecompilerInterface +from libbs.decompilers.ida.compat import execute_ui +from loguru import logger +from revengai import ( + Argument, + Enumeration, + FunctionDataTypesList, + FunctionDataTypesListItem, + FunctionHeader, + FunctionInfoInputFuncDepsInner, + FunctionInfoOutput, + FunctionTypeOutput, + GlobalVariable, + StackVariable, + Structure, + TypeDefinition, +) + + +class TaggedDependency: + def __init__(self, dependency: Structure | Enumeration | TypeDefinition | GlobalVariable): + self.dependency: Structure | Enumeration | TypeDefinition | GlobalVariable = dependency + self.processed: bool = False + self.name: str = self.dependency.name + + def __repr__(self): + return self.dependency.__repr__() + + +class ImportDataTypes: + def __init__(self): + self.deci: DecompilerInterface + + @execute_ui + def execute(self, functions: FunctionDataTypesList): + # TODO: PLU-192 do we even need a class here? Could arguably pass the decompiler around + self.deci = DecompilerInterface.discover(force_decompiler="ida") + + # TODO: PLU-192 If we already have debug symbols, do we want to skip this? I think we should! + function: FunctionDataTypesListItem + + lookup: dict[str, TaggedDependency] = {} + for function in functions.items: + # Skip if data types are still being extracted. + if function.completed is False: + logger.warning( + f"extracting data types for {function.function_id} is still in progress..." + ) + continue + + data_types = function.data_types + if data_types is None: + continue + + # Build the lookup, this will allow us to check if we have processed a dependency before and stop us clobbering existing types and breaking xrefs. + for dependency in data_types.func_deps: + lookup[dependency.actual_instance.name] = TaggedDependency( + dependency.actual_instance + ) + + logger.debug("processing function dependencies") + for function in functions.items: + data_types: FunctionInfoOutput = function.data_types + + # No additional type information to import so we can skip. + if data_types is None: + continue + + # Enumerate function dependencies and check we have imported the associated type information. + dependency: FunctionInfoInputFuncDepsInner + for dependency in data_types.func_deps: + tagged_dependency = lookup.get(dependency.actual_instance.name) + self.process_dependency(tagged_dependency, lookup) + + logger.debug(f"processing {len(functions.items)} functions") + for function in functions.items: + data_types: FunctionInfoOutput = function.data_types + + if data_types: + func: FunctionTypeOutput | None = data_types.func_types + if func: + self.update_function(func) + + def process_dependency( + self, tagged_dependency: TaggedDependency, lookup: dict[str, TaggedDependency] + ) -> None: + if tagged_dependency.processed: + return + + dependency = tagged_dependency.dependency + match dependency: + case Structure(): + self.update_struct(cast(Structure, dependency), lookup) + case Enumeration(): + self.update_enum(cast(Enumeration, dependency)) + case TypeDefinition(): + self.update_typedef(cast(TypeDefinition, dependency), lookup) + case GlobalVariable(): + self.update_global_var(cast(GlobalVariable, dependency), lookup) + case _: + logger.warning(f"unrecognised dependency type: {dependency}") + + tagged_dependency.processed = True + + def update_struct(self, imported_struct: Structure, lookup: dict[str, TaggedDependency]) -> None: + for member in imported_struct.members.values(): + subdependency = lookup.get(member.type) + if subdependency: + self.process_dependency(subdependency, lookup) + member.type = self.normalise_type(member.type) + + self.deci.structs[imported_struct.name] = libbs.artifacts.Struct( + name=imported_struct.name, size=imported_struct.size, members={v.offset: v for v in imported_struct.members.values()} + ) + + def update_enum(self, imported_enum: Enumeration) -> None: + self.deci.enums[imported_enum.name] = libbs.artifacts.Enum(name=imported_enum.name, members=imported_enum.members) + + def update_typedef(self, imported_typedef: TypeDefinition, lookup: dict[str, TaggedDependency]) -> None: + subdependency = lookup.get(imported_typedef.type) + if subdependency: + self.process_dependency(subdependency, lookup) + + normalized_type: str = self.normalise_type(imported_typedef.type) + self.deci.typedefs[imported_typedef.name] = libbs.artifacts.Typedef( + name=imported_typedef.name, type_=normalized_type + ) + + def update_global_var(self, imported_global_var: GlobalVariable, lookup: dict[str, TaggedDependency]) -> None: + subdependency = lookup.get(imported_global_var.type) + if subdependency: + self.process_dependency(subdependency, lookup) + + normalized_type = self.normalise_type(imported_global_var.type) + self.deci.global_vars[imported_global_var.addr] = libbs.artifacts.GlobalVariable( + addr=imported_global_var.addr, name=imported_global_var.name, type_=normalized_type, size=imported_global_var.size + ) + + def update_function(self, func: FunctionTypeOutput) -> None: + base_address: int = self.deci.binary_base_addr + rva: int = func.addr - base_address + + target_func: libbs.artifacts.Function | None = self.deci.functions.get(rva) + if target_func is None: + return + + target_func.name = func.name + target_func.size = func.size + target_func.type = func.type + + # Check if we extracted stack variable data and import if so. + if func.stack_vars: + self.update_stack_variables(func.stack_vars, target_func) + + # Check the target function has a header. + if target_func.header: + self.update_header(func.header, target_func) + + self.deci.functions[rva] = target_func + + def update_stack_variables( + self, + imported_stack_variables: dict[str, StackVariable], + target_function: libbs.artifacts.Function, + ) -> None: + # TODO: PLU-192 What do we want to do if a stack variable does not exist at the specified offset? + stack_var: StackVariable + for stack_var in imported_stack_variables.values(): + target_stack_var: libbs.artifacts.StackVariable | None = target_function.stack_vars.get( + stack_var.offset + ) + + if target_stack_var is None: + continue + + target_stack_var.name = stack_var.name + target_stack_var.type = self.normalise_type(stack_var.type) + target_stack_var.size = stack_var.size + + def update_header( + self, imported_header: FunctionHeader, target_function: libbs.artifacts.Function + ) -> None: + target_function.header.name = imported_header.name + target_function.header.type = self.normalise_type(imported_header.type) + self.update_function_arguments(imported_header.args, target_function) + + def update_function_arguments( + self, imported_args: dict[str, Argument], target_function: libbs.artifacts.Function + ) -> None: + for arg in imported_args.values(): + arg.type = self.normalise_type(arg.type) + + target_function.header.args = {v.offset: v for v in imported_args.values()} + + @staticmethod + def normalise_type(type: str) -> str: + # TODO: Do we need namespace information here? Observing discrepancies where sometimes namespace is used, and other times it isn't. + # I think we will have problems with C++ demangled symbol names using this approach. Needs investigating... + split_type: list[str] = type.split("::") + normalized: str = split_type[-1] + + # It would appear this is a Ghidra-ism and IDA is unaware of this type. + # TODO: Auto add Ghidra typedef for primitives so we don't need to bother doing this... + if normalized == "uchar": + normalized = "unsigned char" + elif normalized == "qword": + normalized = "unsigned __int64" + elif normalized == "sqword": + normalized = "__int64" + + return normalized From e3bd4ef5a0c748bf8bf46a4944c2c1ba1209f22e Mon Sep 17 00:00:00 2001 From: belmegatron Date: Thu, 13 Nov 2025 23:31:15 +0000 Subject: [PATCH 03/11] feat(PLU-192): importing data types in single iteration --- .../app/transformations/import_data_types.py | 38 +++++-------------- 1 file changed, 9 insertions(+), 29 deletions(-) diff --git a/reai_toolkit/app/transformations/import_data_types.py b/reai_toolkit/app/transformations/import_data_types.py index 64de2e2..c631ae8 100644 --- a/reai_toolkit/app/transformations/import_data_types.py +++ b/reai_toolkit/app/transformations/import_data_types.py @@ -40,49 +40,29 @@ def execute(self, functions: FunctionDataTypesList): self.deci = DecompilerInterface.discover(force_decompiler="ida") # TODO: PLU-192 If we already have debug symbols, do we want to skip this? I think we should! - function: FunctionDataTypesListItem - lookup: dict[str, TaggedDependency] = {} - for function in functions.items: - # Skip if data types are still being extracted. - if function.completed is False: - logger.warning( - f"extracting data types for {function.function_id} is still in progress..." - ) - continue - - data_types = function.data_types - if data_types is None: - continue - - # Build the lookup, this will allow us to check if we have processed a dependency before and stop us clobbering existing types and breaking xrefs. - for dependency in data_types.func_deps: - lookup[dependency.actual_instance.name] = TaggedDependency( - dependency.actual_instance - ) - logger.debug("processing function dependencies") for function in functions.items: data_types: FunctionInfoOutput = function.data_types - # No additional type information to import so we can skip. if data_types is None: continue + + # Track processed dependencies to prevent duplicate imports. + # Without this: + # - Shared dependencies get re-processed, breaking references (shows as invalid ordinals in IDA) + # - Cannot resolve subdependencies (e.g. struct fields that reference other imported types) + lookup |= {dep.actual_instance.name: TaggedDependency(dep.actual_instance) for dep in data_types.func_deps if dep.actual_instance.name not in lookup} - # Enumerate function dependencies and check we have imported the associated type information. dependency: FunctionInfoInputFuncDepsInner for dependency in data_types.func_deps: tagged_dependency = lookup.get(dependency.actual_instance.name) self.process_dependency(tagged_dependency, lookup) - logger.debug(f"processing {len(functions.items)} functions") - for function in functions.items: - data_types: FunctionInfoOutput = function.data_types + func: FunctionTypeOutput | None = data_types.func_types + if func: + self.update_function(func) - if data_types: - func: FunctionTypeOutput | None = data_types.func_types - if func: - self.update_function(func) def process_dependency( self, tagged_dependency: TaggedDependency, lookup: dict[str, TaggedDependency] From cb164abe3423fb3df63489ac592af9fcae57d903 Mon Sep 17 00:00:00 2001 From: belmegatron Date: Fri, 14 Nov 2025 17:39:24 +0000 Subject: [PATCH 04/11] feat(PLU-192): added import data types service, removed adding stack vars --- reai_toolkit/app/app.py | 4 ++ .../app/components/dialogs/matching_dialog.py | 18 ++++++++- reai_toolkit/app/factory.py | 1 + .../app/services/data_types/__init__.py | 0 .../services/data_types/data_types_service.py | 40 +++++++++++++++++++ .../app/services/data_types/schema.py | 7 ++++ .../app/transformations/import_data_types.py | 37 +++-------------- 7 files changed, 73 insertions(+), 34 deletions(-) create mode 100644 reai_toolkit/app/services/data_types/__init__.py create mode 100644 reai_toolkit/app/services/data_types/data_types_service.py create mode 100644 reai_toolkit/app/services/data_types/schema.py diff --git a/reai_toolkit/app/app.py b/reai_toolkit/app/app.py index 1287437..1d540c8 100644 --- a/reai_toolkit/app/app.py +++ b/reai_toolkit/app/app.py @@ -14,6 +14,7 @@ from reai_toolkit.app.services.matching.matching_service import MatchingService from reai_toolkit.app.services.rename.rename_service import RenameService from reai_toolkit.app.services.upload.upload_service import UploadService +from reai_toolkit.app.services.data_types.data_types_service import ImportDataTypesService class App: @@ -58,3 +59,6 @@ def __init__(self, ida_version: str = "UNKNOWN", plugin_version: str = "UNKNOWN" self.matching_service = MatchingService( netstore_service=self.netstore_service, sdk_config=sdk_config ) + self.data_types_service = ImportDataTypesService( + netstore_service=self.netstore_service, sdk_config=sdk_config + ) diff --git a/reai_toolkit/app/components/dialogs/matching_dialog.py b/reai_toolkit/app/components/dialogs/matching_dialog.py index f7a9f06..26a97ab 100644 --- a/reai_toolkit/app/components/dialogs/matching_dialog.py +++ b/reai_toolkit/app/components/dialogs/matching_dialog.py @@ -26,6 +26,7 @@ SummaryEvent, ValidFunction, ) +from reai_toolkit.app.services.data_types.data_types_service import ImportDataTypesService from reai_toolkit.app.services.rename.rename_service import RenameService from reai_toolkit.app.services.rename.schema import RenameInput @@ -59,9 +60,10 @@ class MatchingWorker(QtCore.QObject): finished = Signal() # always emitted on exit errored = Signal(str) - def __init__(self, match_service, gen_kwargs: dict): + def __init__(self, match_service, data_types_service, gen_kwargs: dict): super().__init__() self._match_service = match_service + self._data_types_service = data_types_service self._gen_kwargs = gen_kwargs self._stop = False @@ -100,6 +102,7 @@ def __init__( func_map: dict[str, int], matching_service: MatchingService, rename_service: RenameService, + data_types_service: ImportDataTypesService, parent: QtWidgets.QWidget | None = None, ): super().__init__(parent=parent) @@ -108,8 +111,12 @@ def __init__( self.matching_service = matching_service self.rename_service = rename_service + self.data_types_service = data_types_service self._func_map = func_map + # Used for looking up which function we matched to for a given function id. + self.current_to_matched_func: dict[int, MatchedFunction] = {} + self.ui = Ui_MatchingPanel() self.setWindowTitle("RevEng.AI — Function matching") self.ui.setupUi(self) @@ -214,6 +221,7 @@ def __init__( if hasattr(self.ui, "okRenameButton"): self.ui.okRenameButton.clicked.connect(self.enqueue_renames) + self.ui.okRenameButton.clicked.connect(self.import_data_types) # ----------------- Util buttons ----------------- self.ui.btnClearSelection.clicked.connect( @@ -699,7 +707,7 @@ def start_ann(self): self.stop_ann() # ensure previous worker is cleaned up self._matching_thread = QtCore.QThread(self) - self._matching_worker = MatchingWorker(self.matching_service, gen_kwargs) + self._matching_worker = MatchingWorker(self.matching_service, self.data_types_service, gen_kwargs) self._matching_worker.moveToThread(self._matching_thread) # connections @@ -1031,6 +1039,7 @@ def display_matching_results_multiple_functions(self, query: str = ""): filtered_funcs: List[FunctionMatch] = [] + for r in self.matching_results.results: if query: matched_function = r.matched_functions[0] if r.matched_functions else None @@ -1183,6 +1192,11 @@ def enqueue_renames(self): except Exception as e: print(f"Failed to enqueue renames: {e}") + + def import_data_types(self): + print("importing data types...") + # TODO: PLU-192 Pass correct arguments here + # self.data_types_service.import_data_types() # ===================================================================== # (Optional) page-switch helpers diff --git a/reai_toolkit/app/factory.py b/reai_toolkit/app/factory.py index 8b5fbd7..3d06317 100644 --- a/reai_toolkit/app/factory.py +++ b/reai_toolkit/app/factory.py @@ -70,6 +70,7 @@ def function_matching( func_map=func_map, matching_service=self.app.matching_service, rename_service=self.app.rename_service, + data_types_service=self.app.data_types_service, parent=self.parent, ) diff --git a/reai_toolkit/app/services/data_types/__init__.py b/reai_toolkit/app/services/data_types/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/reai_toolkit/app/services/data_types/data_types_service.py b/reai_toolkit/app/services/data_types/data_types_service.py new file mode 100644 index 0000000..8579333 --- /dev/null +++ b/reai_toolkit/app/services/data_types/data_types_service.py @@ -0,0 +1,40 @@ + +from revengai import ( + Configuration, +) + +from reai_toolkit.app.core.netstore_service import SimpleNetStore +from revengai import ( + FunctionsDataTypesApi, + FunctionDataTypesList, + BaseResponseFunctionDataTypesList +) + +from reai_toolkit.app.interfaces.thread_service import IThreadService +from reai_toolkit.app.transformations.import_data_types import ImportDataTypes + + +class ImportDataTypesService(IThreadService): + def __init__(self, netstore_service: SimpleNetStore, sdk_config: Configuration): + super().__init__(netstore_service=netstore_service, sdk_config=sdk_config) + + def import_data_types(self, analysis_id: int, function_ids: list[int] | None = None): + # TODO: PLU-192 Create lookup of effective_address: matched_function + # TODO: PLU-192 Batch function_ids from the same analysis_id together before making the API call. + # TODO: PLU-192 Make the API call and import the types using the matched function data. + # TODO: PLU-192 Attempt to apply these but using the original effective address. This means we need a way of going back from matched function to src ea. + + response = self._get_data_types(analysis_id, function_ids) + if response: + idt : ImportDataTypes = ImportDataTypes() + idt.execute(response) + + def _get_data_types(self, analysis_id: int, function_ids: list[int] | None = None) -> FunctionDataTypesList | None: + with self.yield_api_client(sdk_config=self.sdk_config) as api_client: + client = FunctionsDataTypesApi(api_client=api_client) + response: BaseResponseFunctionDataTypesList = ( + client.list_function_data_types_for_analysis(analysis_id, function_ids=function_ids) + ) + if response.status: + return response.data + diff --git a/reai_toolkit/app/services/data_types/schema.py b/reai_toolkit/app/services/data_types/schema.py new file mode 100644 index 0000000..84dff61 --- /dev/null +++ b/reai_toolkit/app/services/data_types/schema.py @@ -0,0 +1,7 @@ +from pydantic import BaseModel + + +class DataTypeMatch(BaseModel): + function_id: int + ea: int + diff --git a/reai_toolkit/app/transformations/import_data_types.py b/reai_toolkit/app/transformations/import_data_types.py index c631ae8..57e96c4 100644 --- a/reai_toolkit/app/transformations/import_data_types.py +++ b/reai_toolkit/app/transformations/import_data_types.py @@ -8,13 +8,11 @@ Argument, Enumeration, FunctionDataTypesList, - FunctionDataTypesListItem, FunctionHeader, FunctionInfoInputFuncDepsInner, FunctionInfoOutput, FunctionTypeOutput, GlobalVariable, - StackVariable, Structure, TypeDefinition, ) @@ -36,10 +34,7 @@ def __init__(self): @execute_ui def execute(self, functions: FunctionDataTypesList): - # TODO: PLU-192 do we even need a class here? Could arguably pass the decompiler around self.deci = DecompilerInterface.discover(force_decompiler="ida") - - # TODO: PLU-192 If we already have debug symbols, do we want to skip this? I think we should! lookup: dict[str, TaggedDependency] = {} for function in functions.items: @@ -109,6 +104,8 @@ def update_typedef(self, imported_typedef: TypeDefinition, lookup: dict[str, Tag name=imported_typedef.name, type_=normalized_type ) + # TODO: PLU-192 Do we want to think about how these are used? What happens in the case where we match a function from a library that's been + # statically linked into a completely different binary? def update_global_var(self, imported_global_var: GlobalVariable, lookup: dict[str, TaggedDependency]) -> None: subdependency = lookup.get(imported_global_var.type) if subdependency: @@ -131,35 +128,12 @@ def update_function(self, func: FunctionTypeOutput) -> None: target_func.size = func.size target_func.type = func.type - # Check if we extracted stack variable data and import if so. - if func.stack_vars: - self.update_stack_variables(func.stack_vars, target_func) - # Check the target function has a header. if target_func.header: self.update_header(func.header, target_func) self.deci.functions[rva] = target_func - def update_stack_variables( - self, - imported_stack_variables: dict[str, StackVariable], - target_function: libbs.artifacts.Function, - ) -> None: - # TODO: PLU-192 What do we want to do if a stack variable does not exist at the specified offset? - stack_var: StackVariable - for stack_var in imported_stack_variables.values(): - target_stack_var: libbs.artifacts.StackVariable | None = target_function.stack_vars.get( - stack_var.offset - ) - - if target_stack_var is None: - continue - - target_stack_var.name = stack_var.name - target_stack_var.type = self.normalise_type(stack_var.type) - target_stack_var.size = stack_var.size - def update_header( self, imported_header: FunctionHeader, target_function: libbs.artifacts.Function ) -> None: @@ -177,13 +151,12 @@ def update_function_arguments( @staticmethod def normalise_type(type: str) -> str: - # TODO: Do we need namespace information here? Observing discrepancies where sometimes namespace is used, and other times it isn't. - # I think we will have problems with C++ demangled symbol names using this approach. Needs investigating... + # TODO: PLU-192 There are inconsistencies with how types are presented, sometimes with a namespace and sometimes without. + # Need to investigate further as we need to retain namespace information otherwise potential for symbols clashing. split_type: list[str] = type.split("::") normalized: str = split_type[-1] - # It would appear this is a Ghidra-ism and IDA is unaware of this type. - # TODO: Auto add Ghidra typedef for primitives so we don't need to bother doing this... + # TODO: Add IDA typedefs for Ghidra primitives so we don't need to bother doing this... if normalized == "uchar": normalized = "unsigned char" elif normalized == "qword": From 90be709c13691a445a68b22c06e2ce1d66f42426 Mon Sep 17 00:00:00 2001 From: belmegatron Date: Mon, 17 Nov 2025 19:29:49 +0000 Subject: [PATCH 05/11] feat(PLU-192): attempting to retrieve data types for matched functions --- .../app/components/dialogs/matching_dialog.py | 3 +- .../services/data_types/data_types_service.py | 41 ++++++++++++++----- 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/reai_toolkit/app/components/dialogs/matching_dialog.py b/reai_toolkit/app/components/dialogs/matching_dialog.py index 26a97ab..687f452 100644 --- a/reai_toolkit/app/components/dialogs/matching_dialog.py +++ b/reai_toolkit/app/components/dialogs/matching_dialog.py @@ -1195,8 +1195,7 @@ def enqueue_renames(self): def import_data_types(self): print("importing data types...") - # TODO: PLU-192 Pass correct arguments here - # self.data_types_service.import_data_types() + self.data_types_service.import_data_types(self.current_to_matched_func) # ===================================================================== # (Optional) page-switch helpers diff --git a/reai_toolkit/app/services/data_types/data_types_service.py b/reai_toolkit/app/services/data_types/data_types_service.py index 8579333..57aa108 100644 --- a/reai_toolkit/app/services/data_types/data_types_service.py +++ b/reai_toolkit/app/services/data_types/data_types_service.py @@ -1,8 +1,10 @@ +import debugpy from revengai import ( Configuration, ) +from revengai.models import MatchedFunction from reai_toolkit.app.core.netstore_service import SimpleNetStore from revengai import ( FunctionsDataTypesApi, @@ -13,21 +15,40 @@ from reai_toolkit.app.interfaces.thread_service import IThreadService from reai_toolkit.app.transformations.import_data_types import ImportDataTypes +def wait_for_debugger(): + # Start debug server + debugpy.listen(60000, in_process_debug_adapter=True) + print("Waiting for debugger attach...") + debugpy.wait_for_client() # Pause until debugger connects + print("Debugger attached!") + class ImportDataTypesService(IThreadService): def __init__(self, netstore_service: SimpleNetStore, sdk_config: Configuration): super().__init__(netstore_service=netstore_service, sdk_config=sdk_config) - def import_data_types(self, analysis_id: int, function_ids: list[int] | None = None): - # TODO: PLU-192 Create lookup of effective_address: matched_function - # TODO: PLU-192 Batch function_ids from the same analysis_id together before making the API call. - # TODO: PLU-192 Make the API call and import the types using the matched function data. - # TODO: PLU-192 Attempt to apply these but using the original effective address. This means we need a way of going back from matched function to src ea. - - response = self._get_data_types(analysis_id, function_ids) - if response: - idt : ImportDataTypes = ImportDataTypes() - idt.execute(response) + def import_data_types(self, matches: dict[int, MatchedFunction]): + wait_for_debugger() + idt : ImportDataTypes = ImportDataTypes() + + # Overwrite the matched effective address with the original effective address. + # We need this in order to ensure that we write the obtained function datatypes to the right location. + for original_ea, matched_func in matches.items(): + matched_func.function_vaddr = original_ea + + # Batch up function id's by analysis id in order to minimize the number of calls to the data types endpoint. + funcs_by_analysis_id: dict[int, list[int]] = {} + for matched_func in matches.values(): + if funcs_by_analysis_id.get(matched_func.analysis_id) is None: + funcs_by_analysis_id[matched_func.analysis_id] = [matched_func.function_id] + else: + funcs_by_analysis_id[matched_func.analysis_id].append(matched_func.function_id) + + # Attempt to retrieve the data types from the API and apply them to our analysis. + for analysis_id, function_ids in funcs_by_analysis_id.items(): + response = self._get_data_types(analysis_id, function_ids) + if response: + idt.execute(response) def _get_data_types(self, analysis_id: int, function_ids: list[int] | None = None) -> FunctionDataTypesList | None: with self.yield_api_client(sdk_config=self.sdk_config) as api_client: From 9a3bc9dea223d89e842c1dc996e63e4a0dcad587 Mon Sep 17 00:00:00 2001 From: belmegatron Date: Fri, 5 Dec 2025 13:17:59 +0000 Subject: [PATCH 06/11] feat(PLU-192): handle exceptions thrown by analyses no longer existing when trying to pull matched func data types --- pyproject.toml | 3 +- .../app/components/dialogs/matching_dialog.py | 2 + .../services/analysis_sync/analysis_sync.py | 14 ++++--- .../services/data_types/data_types_service.py | 25 ++++++------ .../app/transformations/import_data_types.py | 38 +++++++++++++++---- uv.lock | 37 ++++++++++++++++-- 6 files changed, 90 insertions(+), 29 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 0ca9541..cea415c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,8 @@ dev-dependencies = [ "pyqt5", "pyqt6-stubs", "pyqt6", - "PySide6" + "PySide6", + "debugpy>=1.8.17", ] # If you *do* want to install a stub package for IDA later, you can use a sources entry: diff --git a/reai_toolkit/app/components/dialogs/matching_dialog.py b/reai_toolkit/app/components/dialogs/matching_dialog.py index 687f452..4095386 100644 --- a/reai_toolkit/app/components/dialogs/matching_dialog.py +++ b/reai_toolkit/app/components/dialogs/matching_dialog.py @@ -1092,6 +1092,8 @@ def display_matching_results_multiple_functions(self, query: str = ""): r.matched_functions[0] if r.matched_functions else None ) + self.current_to_matched_func[r.function_id] = matched_function + # Column 3: Matched Name table.setItem( row, diff --git a/reai_toolkit/app/services/analysis_sync/analysis_sync.py b/reai_toolkit/app/services/analysis_sync/analysis_sync.py index 4dcb8e2..ea78c53 100644 --- a/reai_toolkit/app/services/analysis_sync/analysis_sync.py +++ b/reai_toolkit/app/services/analysis_sync/analysis_sync.py @@ -2,9 +2,7 @@ from typing import Any, Callable import ida_kernwin -from libbs.api import DecompilerInterface -import libbs.artifacts -from libbs.decompilers.ida.compat import execute_ui, execute_write, execute_read +from libbs.decompilers.ida.compat import execute_write, execute_read import idautils import idaapi @@ -18,13 +16,17 @@ FunctionDataTypesList, FunctionsDataTypesApi, BaseResponseFunctionDataTypesList, + Structure, + Enumeration, + TypeDefinition, + GlobalVariable ) from reai_toolkit.app.core.netstore_service import SimpleNetStore from reai_toolkit.app.core.shared_schema import GenericApiReturn from reai_toolkit.app.interfaces.thread_service import IThreadService from reai_toolkit.app.services.analysis_sync.schema import MatchedFunctionSummary -from revengai import BaseResponseBasic +from reai_toolkit.app.transformations.import_data_types import ImportDataTypes class TaggedDependency: @@ -105,7 +107,7 @@ def _fetch_function_map(self, analysis_id: int) -> FunctionMapping: analyses_client = AnalysesCoreApi(api_client) function_map = analyses_client.get_analysis_function_map(analysis_id=analysis_id) - func_map = function_map.data.function_maps + func_map: FunctionMapping = function_map.data.function_maps self.safe_put_function_mapping(func_map=func_map) return func_map @@ -213,7 +215,7 @@ def _sync_analysis_data(self, _: threading.Event, analysis_id: int) -> None: self.call_callback(generic_return=response) return - function_mapping: FunctionMapping = response.data + function_mapping: FunctionMapping | None = response.data response = self._safe_match_functions(func_map=function_mapping) if not response.success: diff --git a/reai_toolkit/app/services/data_types/data_types_service.py b/reai_toolkit/app/services/data_types/data_types_service.py index 57aa108..5e245e9 100644 --- a/reai_toolkit/app/services/data_types/data_types_service.py +++ b/reai_toolkit/app/services/data_types/data_types_service.py @@ -1,10 +1,11 @@ import debugpy -from revengai import ( - Configuration, -) +from revengai import Configuration +from revengai.exceptions import NotFoundException -from revengai.models import MatchedFunction +from loguru import logger + +from revengai.models.matched_function import MatchedFunction from reai_toolkit.app.core.netstore_service import SimpleNetStore from revengai import ( FunctionsDataTypesApi, @@ -18,9 +19,7 @@ def wait_for_debugger(): # Start debug server debugpy.listen(60000, in_process_debug_adapter=True) - print("Waiting for debugger attach...") debugpy.wait_for_client() # Pause until debugger connects - print("Debugger attached!") class ImportDataTypesService(IThreadService): @@ -28,7 +27,7 @@ def __init__(self, netstore_service: SimpleNetStore, sdk_config: Configuration): super().__init__(netstore_service=netstore_service, sdk_config=sdk_config) def import_data_types(self, matches: dict[int, MatchedFunction]): - wait_for_debugger() + # wait_for_debugger() idt : ImportDataTypes = ImportDataTypes() # Overwrite the matched effective address with the original effective address. @@ -46,15 +45,19 @@ def import_data_types(self, matches: dict[int, MatchedFunction]): # Attempt to retrieve the data types from the API and apply them to our analysis. for analysis_id, function_ids in funcs_by_analysis_id.items(): - response = self._get_data_types(analysis_id, function_ids) - if response: - idt.execute(response) + try: + response: FunctionDataTypesList | None = self._get_data_types(analysis_id, function_ids) + except NotFoundException as e: + logger.warning(f"failed to apply data types for {function_ids} due to: {e}") + else: + if response: + idt.execute(response) def _get_data_types(self, analysis_id: int, function_ids: list[int] | None = None) -> FunctionDataTypesList | None: with self.yield_api_client(sdk_config=self.sdk_config) as api_client: client = FunctionsDataTypesApi(api_client=api_client) response: BaseResponseFunctionDataTypesList = ( - client.list_function_data_types_for_analysis(analysis_id, function_ids=function_ids) + client.list_function_data_types_for_analysis(analysis_id, function_ids=function_ids) # type: ignore ) if response.status: return response.data diff --git a/reai_toolkit/app/transformations/import_data_types.py b/reai_toolkit/app/transformations/import_data_types.py index 57e96c4..0bb61b7 100644 --- a/reai_toolkit/app/transformations/import_data_types.py +++ b/reai_toolkit/app/transformations/import_data_types.py @@ -19,7 +19,7 @@ class TaggedDependency: - def __init__(self, dependency: Structure | Enumeration | TypeDefinition | GlobalVariable): + def __init__(self, dependency: Structure | Enumeration | TypeDefinition | GlobalVariable) -> None: self.dependency: Structure | Enumeration | TypeDefinition | GlobalVariable = dependency self.processed: bool = False self.name: str = self.dependency.name @@ -34,25 +34,36 @@ def __init__(self): @execute_ui def execute(self, functions: FunctionDataTypesList): - self.deci = DecompilerInterface.discover(force_decompiler="ida") + self.deci = DecompilerInterface.discover(force_decompiler="ida") # type: ignore lookup: dict[str, TaggedDependency] = {} for function in functions.items: - data_types: FunctionInfoOutput = function.data_types + logger.debug(f"importing data types for {function.function_id}") + data_types: FunctionInfoOutput | None = function.data_types if data_types is None: + logger.warning(f"skipping {function.function_id} as there are no available data types") continue # Track processed dependencies to prevent duplicate imports. # Without this: # - Shared dependencies get re-processed, breaking references (shows as invalid ordinals in IDA) # - Cannot resolve subdependencies (e.g. struct fields that reference other imported types) - lookup |= {dep.actual_instance.name: TaggedDependency(dep.actual_instance) for dep in data_types.func_deps if dep.actual_instance.name not in lookup} + for dep in data_types.func_deps: + if dep.actual_instance is None: + continue + + if dep.actual_instance.name not in lookup: + lookup.update({dep.actual_instance.name: TaggedDependency(dep.actual_instance)}) dependency: FunctionInfoInputFuncDepsInner for dependency in data_types.func_deps: - tagged_dependency = lookup.get(dependency.actual_instance.name) - self.process_dependency(tagged_dependency, lookup) + if dependency.actual_instance is None: + continue + + tagged_dependency: TaggedDependency | None = lookup.get(dependency.actual_instance.name) + if tagged_dependency: + self.process_dependency(tagged_dependency, lookup) func: FunctionTypeOutput | None = data_types.func_types if func: @@ -64,6 +75,8 @@ def process_dependency( ) -> None: if tagged_dependency.processed: return + + logger.debug(f"processing dependency: {tagged_dependency.name}") dependency = tagged_dependency.dependency match dependency: @@ -81,6 +94,9 @@ def process_dependency( tagged_dependency.processed = True def update_struct(self, imported_struct: Structure, lookup: dict[str, TaggedDependency]) -> None: + if imported_struct.size is None: + return + for member in imported_struct.members.values(): subdependency = lookup.get(member.type) if subdependency: @@ -88,7 +104,7 @@ def update_struct(self, imported_struct: Structure, lookup: dict[str, TaggedDepe member.type = self.normalise_type(member.type) self.deci.structs[imported_struct.name] = libbs.artifacts.Struct( - name=imported_struct.name, size=imported_struct.size, members={v.offset: v for v in imported_struct.members.values()} + name=imported_struct.name, size=imported_struct.size, members={v.offset: v for v in imported_struct.members.values()} # type: ignore ) def update_enum(self, imported_enum: Enumeration) -> None: @@ -120,7 +136,7 @@ def update_function(self, func: FunctionTypeOutput) -> None: base_address: int = self.deci.binary_base_addr rva: int = func.addr - base_address - target_func: libbs.artifacts.Function | None = self.deci.functions.get(rva) + target_func: libbs.artifacts.Function | None = self.deci.functions.get(rva) # type: ignore if target_func is None: return @@ -137,6 +153,9 @@ def update_function(self, func: FunctionTypeOutput) -> None: def update_header( self, imported_header: FunctionHeader, target_function: libbs.artifacts.Function ) -> None: + if target_function.header is None: + return + target_function.header.name = imported_header.name target_function.header.type = self.normalise_type(imported_header.type) self.update_function_arguments(imported_header.args, target_function) @@ -144,6 +163,9 @@ def update_header( def update_function_arguments( self, imported_args: dict[str, Argument], target_function: libbs.artifacts.Function ) -> None: + if target_function.header is None: + return + for arg in imported_args.values(): arg.type = self.normalise_type(arg.type) diff --git a/uv.lock b/uv.lock index 22f97d2..6bc7606 100644 --- a/uv.lock +++ b/uv.lock @@ -170,6 +170,35 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" }, ] +[[package]] +name = "debugpy" +version = "1.8.17" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/15/ad/71e708ff4ca377c4230530d6a7aa7992592648c122a2cd2b321cf8b35a76/debugpy-1.8.17.tar.gz", hash = "sha256:fd723b47a8c08892b1a16b2c6239a8b96637c62a59b94bb5dab4bac592a58a8e", size = 1644129, upload-time = "2025-09-17T16:33:20.633Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/38/36/b57c6e818d909f6e59c0182252921cf435e0951126a97e11de37e72ab5e1/debugpy-1.8.17-cp310-cp310-macosx_15_0_x86_64.whl", hash = "sha256:c41d2ce8bbaddcc0009cc73f65318eedfa3dbc88a8298081deb05389f1ab5542", size = 2098021, upload-time = "2025-09-17T16:33:22.556Z" }, + { url = "https://files.pythonhosted.org/packages/be/01/0363c7efdd1e9febd090bb13cee4fb1057215b157b2979a4ca5ccb678217/debugpy-1.8.17-cp310-cp310-manylinux_2_34_x86_64.whl", hash = "sha256:1440fd514e1b815edd5861ca394786f90eb24960eb26d6f7200994333b1d79e3", size = 3087399, upload-time = "2025-09-17T16:33:24.292Z" }, + { url = "https://files.pythonhosted.org/packages/79/bc/4a984729674aa9a84856650438b9665f9a1d5a748804ac6f37932ce0d4aa/debugpy-1.8.17-cp310-cp310-win32.whl", hash = "sha256:3a32c0af575749083d7492dc79f6ab69f21b2d2ad4cd977a958a07d5865316e4", size = 5230292, upload-time = "2025-09-17T16:33:26.137Z" }, + { url = "https://files.pythonhosted.org/packages/5d/19/2b9b3092d0cf81a5aa10c86271999453030af354d1a5a7d6e34c574515d7/debugpy-1.8.17-cp310-cp310-win_amd64.whl", hash = "sha256:a3aad0537cf4d9c1996434be68c6c9a6d233ac6f76c2a482c7803295b4e4f99a", size = 5261885, upload-time = "2025-09-17T16:33:27.592Z" }, + { url = "https://files.pythonhosted.org/packages/d8/53/3af72b5c159278c4a0cf4cffa518675a0e73bdb7d1cac0239b815502d2ce/debugpy-1.8.17-cp311-cp311-macosx_15_0_universal2.whl", hash = "sha256:d3fce3f0e3de262a3b67e69916d001f3e767661c6e1ee42553009d445d1cd840", size = 2207154, upload-time = "2025-09-17T16:33:29.457Z" }, + { url = "https://files.pythonhosted.org/packages/8f/6d/204f407df45600e2245b4a39860ed4ba32552330a0b3f5f160ae4cc30072/debugpy-1.8.17-cp311-cp311-manylinux_2_34_x86_64.whl", hash = "sha256:c6bdf134457ae0cac6fb68205776be635d31174eeac9541e1d0c062165c6461f", size = 3170322, upload-time = "2025-09-17T16:33:30.837Z" }, + { url = "https://files.pythonhosted.org/packages/f2/13/1b8f87d39cf83c6b713de2620c31205299e6065622e7dd37aff4808dd410/debugpy-1.8.17-cp311-cp311-win32.whl", hash = "sha256:e79a195f9e059edfe5d8bf6f3749b2599452d3e9380484cd261f6b7cd2c7c4da", size = 5155078, upload-time = "2025-09-17T16:33:33.331Z" }, + { url = "https://files.pythonhosted.org/packages/c2/c5/c012c60a2922cc91caa9675d0ddfbb14ba59e1e36228355f41cab6483469/debugpy-1.8.17-cp311-cp311-win_amd64.whl", hash = "sha256:b532282ad4eca958b1b2d7dbcb2b7218e02cb934165859b918e3b6ba7772d3f4", size = 5179011, upload-time = "2025-09-17T16:33:35.711Z" }, + { url = "https://files.pythonhosted.org/packages/08/2b/9d8e65beb2751876c82e1aceb32f328c43ec872711fa80257c7674f45650/debugpy-1.8.17-cp312-cp312-macosx_15_0_universal2.whl", hash = "sha256:f14467edef672195c6f6b8e27ce5005313cb5d03c9239059bc7182b60c176e2d", size = 2549522, upload-time = "2025-09-17T16:33:38.466Z" }, + { url = "https://files.pythonhosted.org/packages/b4/78/eb0d77f02971c05fca0eb7465b18058ba84bd957062f5eec82f941ac792a/debugpy-1.8.17-cp312-cp312-manylinux_2_34_x86_64.whl", hash = "sha256:24693179ef9dfa20dca8605905a42b392be56d410c333af82f1c5dff807a64cc", size = 4309417, upload-time = "2025-09-17T16:33:41.299Z" }, + { url = "https://files.pythonhosted.org/packages/37/42/c40f1d8cc1fed1e75ea54298a382395b8b937d923fcf41ab0797a554f555/debugpy-1.8.17-cp312-cp312-win32.whl", hash = "sha256:6a4e9dacf2cbb60d2514ff7b04b4534b0139facbf2abdffe0639ddb6088e59cf", size = 5277130, upload-time = "2025-09-17T16:33:43.554Z" }, + { url = "https://files.pythonhosted.org/packages/72/22/84263b205baad32b81b36eac076de0cdbe09fe2d0637f5b32243dc7c925b/debugpy-1.8.17-cp312-cp312-win_amd64.whl", hash = "sha256:e8f8f61c518952fb15f74a302e068b48d9c4691768ade433e4adeea961993464", size = 5319053, upload-time = "2025-09-17T16:33:53.033Z" }, + { url = "https://files.pythonhosted.org/packages/50/76/597e5cb97d026274ba297af8d89138dfd9e695767ba0e0895edb20963f40/debugpy-1.8.17-cp313-cp313-macosx_15_0_universal2.whl", hash = "sha256:857c1dd5d70042502aef1c6d1c2801211f3ea7e56f75e9c335f434afb403e464", size = 2538386, upload-time = "2025-09-17T16:33:54.594Z" }, + { url = "https://files.pythonhosted.org/packages/5f/60/ce5c34fcdfec493701f9d1532dba95b21b2f6394147234dce21160bd923f/debugpy-1.8.17-cp313-cp313-manylinux_2_34_x86_64.whl", hash = "sha256:3bea3b0b12f3946e098cce9b43c3c46e317b567f79570c3f43f0b96d00788088", size = 4292100, upload-time = "2025-09-17T16:33:56.353Z" }, + { url = "https://files.pythonhosted.org/packages/e8/95/7873cf2146577ef71d2a20bf553f12df865922a6f87b9e8ee1df04f01785/debugpy-1.8.17-cp313-cp313-win32.whl", hash = "sha256:e34ee844c2f17b18556b5bbe59e1e2ff4e86a00282d2a46edab73fd7f18f4a83", size = 5277002, upload-time = "2025-09-17T16:33:58.231Z" }, + { url = "https://files.pythonhosted.org/packages/46/11/18c79a1cee5ff539a94ec4aa290c1c069a5580fd5cfd2fb2e282f8e905da/debugpy-1.8.17-cp313-cp313-win_amd64.whl", hash = "sha256:6c5cd6f009ad4fca8e33e5238210dc1e5f42db07d4b6ab21ac7ffa904a196420", size = 5319047, upload-time = "2025-09-17T16:34:00.586Z" }, + { url = "https://files.pythonhosted.org/packages/de/45/115d55b2a9da6de812696064ceb505c31e952c5d89c4ed1d9bb983deec34/debugpy-1.8.17-cp314-cp314-macosx_15_0_universal2.whl", hash = "sha256:045290c010bcd2d82bc97aa2daf6837443cd52f6328592698809b4549babcee1", size = 2536899, upload-time = "2025-09-17T16:34:02.657Z" }, + { url = "https://files.pythonhosted.org/packages/5a/73/2aa00c7f1f06e997ef57dc9b23d61a92120bec1437a012afb6d176585197/debugpy-1.8.17-cp314-cp314-manylinux_2_34_x86_64.whl", hash = "sha256:b69b6bd9dba6a03632534cdf67c760625760a215ae289f7489a452af1031fe1f", size = 4268254, upload-time = "2025-09-17T16:34:04.486Z" }, + { url = "https://files.pythonhosted.org/packages/86/b5/ed3e65c63c68a6634e3ba04bd10255c8e46ec16ebed7d1c79e4816d8a760/debugpy-1.8.17-cp314-cp314-win32.whl", hash = "sha256:5c59b74aa5630f3a5194467100c3b3d1c77898f9ab27e3f7dc5d40fc2f122670", size = 5277203, upload-time = "2025-09-17T16:34:06.65Z" }, + { url = "https://files.pythonhosted.org/packages/b0/26/394276b71c7538445f29e792f589ab7379ae70fd26ff5577dfde71158e96/debugpy-1.8.17-cp314-cp314-win_amd64.whl", hash = "sha256:893cba7bb0f55161de4365584b025f7064e1f88913551bcd23be3260b231429c", size = 5318493, upload-time = "2025-09-17T16:34:08.483Z" }, + { url = "https://files.pythonhosted.org/packages/b0/d0/89247ec250369fc76db477720a26b2fce7ba079ff1380e4ab4529d2fe233/debugpy-1.8.17-py2.py3-none-any.whl", hash = "sha256:60c7dca6571efe660ccb7a9508d73ca14b8796c4ed484c2002abba714226cfef", size = 5283210, upload-time = "2025-09-17T16:34:25.835Z" }, +] + [[package]] name = "exceptiongroup" version = "1.3.0" @@ -889,6 +918,7 @@ dependencies = [ [package.dev-dependencies] dev = [ { name = "black" }, + { name = "debugpy" }, { name = "idapro" }, { name = "isort" }, { name = "mypy" }, @@ -913,6 +943,7 @@ requires-dist = [ [package.metadata.requires-dev] dev = [ { name = "black", specifier = ">=24.8" }, + { name = "debugpy", specifier = ">=1.8.17" }, { name = "idapro", specifier = ">=0.0.5" }, { name = "isort", specifier = ">=5.13" }, { name = "mypy", specifier = ">=1.12" }, @@ -942,7 +973,7 @@ wheels = [ [[package]] name = "revengai" -version = "2.14.1" +version = "2.56.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "lazy-imports" }, @@ -951,9 +982,9 @@ dependencies = [ { name = "typing-extensions" }, { name = "urllib3" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/74/a8/f81ca6f12f457ad29c1c0daee2fb3218e655f97a53f7c64b2b139162c37f/revengai-2.14.1.tar.gz", hash = "sha256:3fbbe665253595bc54a3019a9e0299810020e4af30c616cea942950be5f359ff", size = 273563, upload-time = "2025-10-27T11:57:21.968Z" } +sdist = { url = "https://files.pythonhosted.org/packages/52/e6/0daeb9b54439385a4921777f86590f098f138855bdea408b8dc1c8a8f2e8/revengai-2.56.0.tar.gz", hash = "sha256:7993c8cb407309f03456c7b48d391417895fa584777099f7597eb90df950472a", size = 264382, upload-time = "2025-12-04T09:13:13.572Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/99/4c/c332b6cee7e56273611dfcc898772118892fb4e69d50031b7ed7ab5a0da2/revengai-2.14.1-py3-none-any.whl", hash = "sha256:1fde9ea6b44ae46b9f069e2838f0924fe603f61d5e0fabef702904f67c3c3aa5", size = 507066, upload-time = "2025-10-27T11:57:20.611Z" }, + { url = "https://files.pythonhosted.org/packages/f1/b9/89c24a57979c81b4ff9f5801cdf2d5506ed321d62fd97a5cf3326d09c177/revengai-2.56.0-py3-none-any.whl", hash = "sha256:cecb601184c8308a5af80094434d8c8dd5a1a82201d708d12d8f3da106ede5ab", size = 485705, upload-time = "2025-12-04T09:13:12.445Z" }, ] [[package]] From 3064a06a1a7ee4358dc4431a226ebc473cf963cb Mon Sep 17 00:00:00 2001 From: belmegatron Date: Fri, 5 Dec 2025 13:36:10 +0000 Subject: [PATCH 07/11] feat(PLU-192): removed global var handling for data types, adding ticket references for TODO's --- .../app/components/dialogs/matching_dialog.py | 1 - .../services/data_types/data_types_service.py | 12 +---- .../app/services/data_types/schema.py | 7 --- .../app/transformations/import_data_types.py | 45 ++++++------------- 4 files changed, 16 insertions(+), 49 deletions(-) delete mode 100644 reai_toolkit/app/services/data_types/schema.py diff --git a/reai_toolkit/app/components/dialogs/matching_dialog.py b/reai_toolkit/app/components/dialogs/matching_dialog.py index 4095386..6d3e377 100644 --- a/reai_toolkit/app/components/dialogs/matching_dialog.py +++ b/reai_toolkit/app/components/dialogs/matching_dialog.py @@ -1039,7 +1039,6 @@ def display_matching_results_multiple_functions(self, query: str = ""): filtered_funcs: List[FunctionMatch] = [] - for r in self.matching_results.results: if query: matched_function = r.matched_functions[0] if r.matched_functions else None diff --git a/reai_toolkit/app/services/data_types/data_types_service.py b/reai_toolkit/app/services/data_types/data_types_service.py index 5e245e9..27c7a24 100644 --- a/reai_toolkit/app/services/data_types/data_types_service.py +++ b/reai_toolkit/app/services/data_types/data_types_service.py @@ -1,5 +1,3 @@ -import debugpy - from revengai import Configuration from revengai.exceptions import NotFoundException @@ -16,18 +14,12 @@ from reai_toolkit.app.interfaces.thread_service import IThreadService from reai_toolkit.app.transformations.import_data_types import ImportDataTypes -def wait_for_debugger(): - # Start debug server - debugpy.listen(60000, in_process_debug_adapter=True) - debugpy.wait_for_client() # Pause until debugger connects - class ImportDataTypesService(IThreadService): - def __init__(self, netstore_service: SimpleNetStore, sdk_config: Configuration): + def __init__(self, netstore_service: SimpleNetStore, sdk_config: Configuration) -> None: super().__init__(netstore_service=netstore_service, sdk_config=sdk_config) - def import_data_types(self, matches: dict[int, MatchedFunction]): - # wait_for_debugger() + def import_data_types(self, matches: dict[int, MatchedFunction]) -> None: idt : ImportDataTypes = ImportDataTypes() # Overwrite the matched effective address with the original effective address. diff --git a/reai_toolkit/app/services/data_types/schema.py b/reai_toolkit/app/services/data_types/schema.py deleted file mode 100644 index 84dff61..0000000 --- a/reai_toolkit/app/services/data_types/schema.py +++ /dev/null @@ -1,7 +0,0 @@ -from pydantic import BaseModel - - -class DataTypeMatch(BaseModel): - function_id: int - ea: int - diff --git a/reai_toolkit/app/transformations/import_data_types.py b/reai_toolkit/app/transformations/import_data_types.py index 0bb61b7..b7fd9d3 100644 --- a/reai_toolkit/app/transformations/import_data_types.py +++ b/reai_toolkit/app/transformations/import_data_types.py @@ -12,39 +12,38 @@ FunctionInfoInputFuncDepsInner, FunctionInfoOutput, FunctionTypeOutput, - GlobalVariable, Structure, TypeDefinition, ) class TaggedDependency: - def __init__(self, dependency: Structure | Enumeration | TypeDefinition | GlobalVariable) -> None: - self.dependency: Structure | Enumeration | TypeDefinition | GlobalVariable = dependency + def __init__(self, dependency: Structure | Enumeration | TypeDefinition) -> None: + self.dependency: Structure | Enumeration | TypeDefinition = dependency self.processed: bool = False self.name: str = self.dependency.name - def __repr__(self): + def __repr__(self) -> str: return self.dependency.__repr__() class ImportDataTypes: - def __init__(self): + def __init__(self) -> None: self.deci: DecompilerInterface @execute_ui - def execute(self, functions: FunctionDataTypesList): + def execute(self, functions: FunctionDataTypesList) -> None: self.deci = DecompilerInterface.discover(force_decompiler="ida") # type: ignore lookup: dict[str, TaggedDependency] = {} for function in functions.items: - logger.debug(f"importing data types for {function.function_id}") data_types: FunctionInfoOutput | None = function.data_types if data_types is None: - logger.warning(f"skipping {function.function_id} as there are no available data types") continue + logger.debug(f"importing data types for {function.function_id}") + # Track processed dependencies to prevent duplicate imports. # Without this: # - Shared dependencies get re-processed, breaking references (shows as invalid ordinals in IDA) @@ -54,7 +53,7 @@ def execute(self, functions: FunctionDataTypesList): continue if dep.actual_instance.name not in lookup: - lookup.update({dep.actual_instance.name: TaggedDependency(dep.actual_instance)}) + lookup.update({dep.actual_instance.name: TaggedDependency(dep.actual_instance)}) # type: ignore dependency: FunctionInfoInputFuncDepsInner for dependency in data_types.func_deps: @@ -76,9 +75,7 @@ def process_dependency( if tagged_dependency.processed: return - logger.debug(f"processing dependency: {tagged_dependency.name}") - - dependency = tagged_dependency.dependency + dependency: Structure | Enumeration | TypeDefinition = tagged_dependency.dependency match dependency: case Structure(): self.update_struct(cast(Structure, dependency), lookup) @@ -86,10 +83,8 @@ def process_dependency( self.update_enum(cast(Enumeration, dependency)) case TypeDefinition(): self.update_typedef(cast(TypeDefinition, dependency), lookup) - case GlobalVariable(): - self.update_global_var(cast(GlobalVariable, dependency), lookup) case _: - logger.warning(f"unrecognised dependency type: {dependency}") + logger.warning(f"unsupported dependency type: {dependency}") tagged_dependency.processed = True @@ -98,7 +93,7 @@ def update_struct(self, imported_struct: Structure, lookup: dict[str, TaggedDepe return for member in imported_struct.members.values(): - subdependency = lookup.get(member.type) + subdependency: TaggedDependency | None = lookup.get(member.type) if subdependency: self.process_dependency(subdependency, lookup) member.type = self.normalise_type(member.type) @@ -111,7 +106,7 @@ def update_enum(self, imported_enum: Enumeration) -> None: self.deci.enums[imported_enum.name] = libbs.artifacts.Enum(name=imported_enum.name, members=imported_enum.members) def update_typedef(self, imported_typedef: TypeDefinition, lookup: dict[str, TaggedDependency]) -> None: - subdependency = lookup.get(imported_typedef.type) + subdependency: TaggedDependency | None = lookup.get(imported_typedef.type) if subdependency: self.process_dependency(subdependency, lookup) @@ -120,18 +115,6 @@ def update_typedef(self, imported_typedef: TypeDefinition, lookup: dict[str, Tag name=imported_typedef.name, type_=normalized_type ) - # TODO: PLU-192 Do we want to think about how these are used? What happens in the case where we match a function from a library that's been - # statically linked into a completely different binary? - def update_global_var(self, imported_global_var: GlobalVariable, lookup: dict[str, TaggedDependency]) -> None: - subdependency = lookup.get(imported_global_var.type) - if subdependency: - self.process_dependency(subdependency, lookup) - - normalized_type = self.normalise_type(imported_global_var.type) - self.deci.global_vars[imported_global_var.addr] = libbs.artifacts.GlobalVariable( - addr=imported_global_var.addr, name=imported_global_var.name, type_=normalized_type, size=imported_global_var.size - ) - def update_function(self, func: FunctionTypeOutput) -> None: base_address: int = self.deci.binary_base_addr rva: int = func.addr - base_address @@ -173,12 +156,12 @@ def update_function_arguments( @staticmethod def normalise_type(type: str) -> str: - # TODO: PLU-192 There are inconsistencies with how types are presented, sometimes with a namespace and sometimes without. + # TODO: PLU-214 There are inconsistencies with how types are presented, sometimes with a namespace and sometimes without. # Need to investigate further as we need to retain namespace information otherwise potential for symbols clashing. split_type: list[str] = type.split("::") normalized: str = split_type[-1] - # TODO: Add IDA typedefs for Ghidra primitives so we don't need to bother doing this... + # TODO: PLU-213 Add IDA typedefs for Ghidra primitives so we don't need to bother doing this... if normalized == "uchar": normalized = "unsigned char" elif normalized == "qword": From aaf13ac560b7f2c4ef5f98396be03e3946f76ddb Mon Sep 17 00:00:00 2001 From: belmegatron Date: Fri, 5 Dec 2025 17:10:09 +0000 Subject: [PATCH 08/11] feat(PLU-192): switched to using /v2/functions/data_types endpoint --- .../app/components/dialogs/matching_dialog.py | 8 ++--- .../services/data_types/data_types_service.py | 36 +++++++------------ .../app/transformations/import_data_types.py | 14 +++++--- 3 files changed, 26 insertions(+), 32 deletions(-) diff --git a/reai_toolkit/app/components/dialogs/matching_dialog.py b/reai_toolkit/app/components/dialogs/matching_dialog.py index 6d3e377..30de2d3 100644 --- a/reai_toolkit/app/components/dialogs/matching_dialog.py +++ b/reai_toolkit/app/components/dialogs/matching_dialog.py @@ -114,8 +114,8 @@ def __init__( self.data_types_service = data_types_service self._func_map = func_map - # Used for looking up which function we matched to for a given function id. - self.current_to_matched_func: dict[int, MatchedFunction] = {} + # Matched function id to original effective address + self.matched_func_to_original_ea: dict[int, int] = {} self.ui = Ui_MatchingPanel() self.setWindowTitle("RevEng.AI — Function matching") @@ -1091,7 +1091,7 @@ def display_matching_results_multiple_functions(self, query: str = ""): r.matched_functions[0] if r.matched_functions else None ) - self.current_to_matched_func[r.function_id] = matched_function + self.matched_func_to_original_ea[matched_function.function_id] = self._func_map[str(r.function_id)] # Column 3: Matched Name table.setItem( @@ -1196,7 +1196,7 @@ def enqueue_renames(self): def import_data_types(self): print("importing data types...") - self.data_types_service.import_data_types(self.current_to_matched_func) + self.data_types_service.import_data_types(self.matched_func_to_original_ea) # ===================================================================== # (Optional) page-switch helpers diff --git a/reai_toolkit/app/services/data_types/data_types_service.py b/reai_toolkit/app/services/data_types/data_types_service.py index 27c7a24..fb11af5 100644 --- a/reai_toolkit/app/services/data_types/data_types_service.py +++ b/reai_toolkit/app/services/data_types/data_types_service.py @@ -19,37 +19,25 @@ class ImportDataTypesService(IThreadService): def __init__(self, netstore_service: SimpleNetStore, sdk_config: Configuration) -> None: super().__init__(netstore_service=netstore_service, sdk_config=sdk_config) - def import_data_types(self, matches: dict[int, MatchedFunction]) -> None: + def import_data_types(self, matches: dict[int, int]) -> None: idt : ImportDataTypes = ImportDataTypes() - # Overwrite the matched effective address with the original effective address. - # We need this in order to ensure that we write the obtained function datatypes to the right location. - for original_ea, matched_func in matches.items(): - matched_func.function_vaddr = original_ea - - # Batch up function id's by analysis id in order to minimize the number of calls to the data types endpoint. - funcs_by_analysis_id: dict[int, list[int]] = {} - for matched_func in matches.values(): - if funcs_by_analysis_id.get(matched_func.analysis_id) is None: - funcs_by_analysis_id[matched_func.analysis_id] = [matched_func.function_id] - else: - funcs_by_analysis_id[matched_func.analysis_id].append(matched_func.function_id) + matched_function_ids: list[int] = list(matches.keys()) # Attempt to retrieve the data types from the API and apply them to our analysis. - for analysis_id, function_ids in funcs_by_analysis_id.items(): - try: - response: FunctionDataTypesList | None = self._get_data_types(analysis_id, function_ids) - except NotFoundException as e: - logger.warning(f"failed to apply data types for {function_ids} due to: {e}") - else: - if response: - idt.execute(response) - - def _get_data_types(self, analysis_id: int, function_ids: list[int] | None = None) -> FunctionDataTypesList | None: + try: + response: FunctionDataTypesList | None = self._get_data_types(matched_function_ids) + except NotFoundException as e: + logger.warning(f"failed to apply data types for {matched_function_ids} due to: {e}") + else: + if response: + idt.execute(response, matched_function_mapping=matches) + + def _get_data_types(self, function_ids: list[int] | None = None) -> FunctionDataTypesList | None: with self.yield_api_client(sdk_config=self.sdk_config) as api_client: client = FunctionsDataTypesApi(api_client=api_client) response: BaseResponseFunctionDataTypesList = ( - client.list_function_data_types_for_analysis(analysis_id, function_ids=function_ids) # type: ignore + client.list_function_data_types_for_functions(function_ids=function_ids) # type: ignore ) if response.status: return response.data diff --git a/reai_toolkit/app/transformations/import_data_types.py b/reai_toolkit/app/transformations/import_data_types.py index b7fd9d3..a2a44cd 100644 --- a/reai_toolkit/app/transformations/import_data_types.py +++ b/reai_toolkit/app/transformations/import_data_types.py @@ -32,7 +32,7 @@ def __init__(self) -> None: self.deci: DecompilerInterface @execute_ui - def execute(self, functions: FunctionDataTypesList) -> None: + def execute(self, functions: FunctionDataTypesList, matched_function_mapping: dict[int, int] = {}) -> None: self.deci = DecompilerInterface.discover(force_decompiler="ida") # type: ignore lookup: dict[str, TaggedDependency] = {} @@ -66,7 +66,13 @@ def execute(self, functions: FunctionDataTypesList) -> None: func: FunctionTypeOutput | None = data_types.func_types if func: - self.update_function(func) + # If we obtained data types from a matched function, we need to make sure we map it to the original effective address. + if matched_function_mapping: + ea: int = matched_function_mapping[function.function_id] + else: + ea: int = func.addr + + self.update_function(func, ea) def process_dependency( @@ -115,9 +121,9 @@ def update_typedef(self, imported_typedef: TypeDefinition, lookup: dict[str, Tag name=imported_typedef.name, type_=normalized_type ) - def update_function(self, func: FunctionTypeOutput) -> None: + def update_function(self, func: FunctionTypeOutput, ea: int) -> None: base_address: int = self.deci.binary_base_addr - rva: int = func.addr - base_address + rva: int = ea - base_address target_func: libbs.artifacts.Function | None = self.deci.functions.get(rva) # type: ignore if target_func is None: From b89eb0e9bf5677ff264da928830bc670fd863099 Mon Sep 17 00:00:00 2001 From: belmegatron Date: Mon, 8 Dec 2025 18:07:50 +0000 Subject: [PATCH 09/11] feat(PLU-192): added support for importing data types when auto unstrip is performed --- .../components/dialogs/auto_unstrip_dialog.py | 13 ++-- .../app/components/dialogs/matching_dialog.py | 1 - reai_toolkit/app/coordinator.py | 1 + .../coordinators/auto_unstrip_coordinator.py | 8 ++- .../coordinators/sync_analysis_coordinator.py | 5 +- .../services/analysis_sync/analysis_sync.py | 63 +++++++------------ .../app/services/analysis_sync/schema.py | 5 +- .../auto_unstrip/auto_unstrip_service.py | 3 +- .../services/data_types/data_types_service.py | 12 +++- .../app/services/rename/rename_service.py | 8 --- .../app/transformations/import_data_types.py | 3 +- reai_toolkit_entry.py | 1 - uv.lock | 6 +- 13 files changed, 58 insertions(+), 71 deletions(-) diff --git a/reai_toolkit/app/components/dialogs/auto_unstrip_dialog.py b/reai_toolkit/app/components/dialogs/auto_unstrip_dialog.py index 7f531a4..bcf9f5c 100644 --- a/reai_toolkit/app/components/dialogs/auto_unstrip_dialog.py +++ b/reai_toolkit/app/components/dialogs/auto_unstrip_dialog.py @@ -77,32 +77,37 @@ def _populate_table(self, matches: list[MatchedFunctionSuggestion]) -> None: hdr.setStretchLastSection(False) table.setRowCount(len(matches)) + + if QT_VER == 6: + flags = QtCore.Qt.ItemFlag.ItemIsSelectable | QtCore.Qt.ItemFlag.ItemIsEnabled + else: + flags = QtCore.Qt.ItemIsSelectable | QtCore.Qt.ItemIsEnabled for row, m in enumerate(matches): # 2. address cell addr_item = QtWidgets.QTableWidgetItem(hex(m.function_vaddr)) - addr_item.setFlags(QtCore.Qt.ItemIsSelectable | QtCore.Qt.ItemIsEnabled) + addr_item.setFlags(flags) addr_item.setToolTip(addr_item.text()) # show on hover table.setItem(row, 0, addr_item) # 3. current name cell current_name = get_safe_name(m.function_vaddr) or "" cur_item = QtWidgets.QTableWidgetItem(current_name) - cur_item.setFlags(QtCore.Qt.ItemIsSelectable | QtCore.Qt.ItemIsEnabled) + cur_item.setFlags(flags) cur_item.setToolTip(current_name) table.setItem(row, 1, cur_item) # 4. suggested name cell sug_name = m.suggested_name or "" sug_item = QtWidgets.QTableWidgetItem(sug_name) - sug_item.setFlags(QtCore.Qt.ItemIsSelectable | QtCore.Qt.ItemIsEnabled) + sug_item.setFlags(flags) sug_item.setToolTip(sug_name) # tooltip always has full text table.setItem(row, 2, sug_item) # 5. Suggested demangled name cell demangled_name = m.suggested_demangled_name or sug_name dem_item = QtWidgets.QTableWidgetItem(demangled_name) - dem_item.setFlags(QtCore.Qt.ItemIsSelectable | QtCore.Qt.ItemIsEnabled) + dem_item.setFlags(flags) dem_item.setToolTip(demangled_name) # tooltip always has full text table.setItem(row, 3, dem_item) diff --git a/reai_toolkit/app/components/dialogs/matching_dialog.py b/reai_toolkit/app/components/dialogs/matching_dialog.py index 30de2d3..423a7e2 100644 --- a/reai_toolkit/app/components/dialogs/matching_dialog.py +++ b/reai_toolkit/app/components/dialogs/matching_dialog.py @@ -1195,7 +1195,6 @@ def enqueue_renames(self): print(f"Failed to enqueue renames: {e}") def import_data_types(self): - print("importing data types...") self.data_types_service.import_data_types(self.matched_func_to_original_ea) # ===================================================================== diff --git a/reai_toolkit/app/coordinator.py b/reai_toolkit/app/coordinator.py index 55bf3b2..34d2d7a 100644 --- a/reai_toolkit/app/coordinator.py +++ b/reai_toolkit/app/coordinator.py @@ -73,6 +73,7 @@ def __init__(self, app, factory, log): log=log, auto_unstrip_service=app.auto_unstrip_service, rename_service=app.rename_service, + data_types_service=app.data_types_service ) self.ai_decompc: AiDecompCoordinator = AiDecompCoordinator( diff --git a/reai_toolkit/app/coordinators/auto_unstrip_coordinator.py b/reai_toolkit/app/coordinators/auto_unstrip_coordinator.py index ad6dd26..dbc6bcf 100644 --- a/reai_toolkit/app/coordinators/auto_unstrip_coordinator.py +++ b/reai_toolkit/app/coordinators/auto_unstrip_coordinator.py @@ -10,6 +10,7 @@ ) from reai_toolkit.app.services.rename.rename_service import RenameService from reai_toolkit.app.services.rename.schema import RenameInput +from reai_toolkit.app.services.data_types.data_types_service import ImportDataTypesService class AutoUnstripCoordinator(BaseCoordinator): @@ -25,10 +26,12 @@ def __init__( log, auto_unstrip_service: AutoUnstripService, rename_service: RenameService, + data_types_service: ImportDataTypesService, ): super().__init__(app=app, factory=factory, log=log) self.auto_unstrip_service = auto_unstrip_service self.rename_service = rename_service + self.data_types_service: ImportDataTypesService = data_types_service def run_dialog(self) -> None: if self.auto_unstrip_service.is_worker_running(): @@ -46,7 +49,7 @@ def run_dialog(self) -> None: def _open_auto_unstrip_dialog(self) -> None: self.factory.auto_unstrip(response=self.last_response.data).open_modal() - def _on_complete(self, response: GenericApiReturn[AutoUnstripResponse]): + def _on_complete(self, response: GenericApiReturn[AutoUnstripResponse]) -> None: print("Auto-unstrip process completed.") if not response.success: @@ -54,6 +57,7 @@ def _on_complete(self, response: GenericApiReturn[AutoUnstripResponse]): return rename_list = [] + data_types_mapping: dict[int, int] = {} self.last_response = response @@ -65,8 +69,10 @@ def _on_complete(self, response: GenericApiReturn[AutoUnstripResponse]): new_name=function.suggested_demangled_name, ) ) + data_types_mapping[function.function_id] = function.function_vaddr self.rename_service.enqueue_rename(rename_list=rename_list) + self.data_types_service.import_data_types(data_types_mapping) ida_kernwin.execute_ui_requests([self._open_auto_unstrip_dialog]) diff --git a/reai_toolkit/app/coordinators/sync_analysis_coordinator.py b/reai_toolkit/app/coordinators/sync_analysis_coordinator.py index 4e3dd06..d32df8d 100644 --- a/reai_toolkit/app/coordinators/sync_analysis_coordinator.py +++ b/reai_toolkit/app/coordinators/sync_analysis_coordinator.py @@ -48,9 +48,8 @@ def _on_complete( """ if generic_return.success: self.safe_info( - msg=f"Analysis data synced successfully. \n\nSynced {generic_return.data.matched_local_function_count} functions with remote analysis." - + f"\n{generic_return.data.unmatched_local_function_count} local functions not present in remote analysis." - + f"\n{generic_return.data.unmatched_remote_function_count} remote functions not present in local analysis." + msg=f"Analysis data synced successfully. \n\nSynced {generic_return.data.matched_function_count} functions with remote analysis." + + f"\n{generic_return.data.unmatched_function_count} local functions not present in remote analysis." ) else: self.safe_error(message=generic_return.error_message) diff --git a/reai_toolkit/app/services/analysis_sync/analysis_sync.py b/reai_toolkit/app/services/analysis_sync/analysis_sync.py index ea78c53..3ad3fc5 100644 --- a/reai_toolkit/app/services/analysis_sync/analysis_sync.py +++ b/reai_toolkit/app/services/analysis_sync/analysis_sync.py @@ -115,53 +115,37 @@ def _match_functions( self, func_map: FunctionMapping, ) -> GenericApiReturn[MatchedFunctionSummary]: - function_map = func_map.function_map - inverse_function_map = func_map.inverse_function_map + # Mapping of local function addresses to mangled names + local_vaddr_to_matched_name: dict[str, str] = func_map.name_map - logger.info(f"RevEng.AI: Retrieved {len(function_map)} function mappings from analysis") + logger.info(f"RevEng.AI: Retrieved {len(local_vaddr_to_matched_name)} functions from analysis") # Compute which IDA functions match the revengai analysis functions - matched_functions = [] - unmatched_local_functions = [] - unmatched_remote_functions = [] - - # Track local functions matched - local_function_vaddrs_matched = set() - fun_count = 0 - for key, value in func_map.name_map.items(): - if "FUN_" in value: - fun_count += 1 - - for start_ea in idautils.Functions(): - if str(start_ea) in inverse_function_map: - new_name: str | None = func_map.name_map.get(str(start_ea), None) - if new_name is None: - continue - - self.safe_set_name(start_ea, new_name, check_user_flags=True) - matched_functions.append((int(inverse_function_map[str(start_ea)]), start_ea)) - local_function_vaddrs_matched.add(start_ea) + matched_function_count: int = 0 + unmatched_function_count: int = 0 + total_function_count: int = 0 + + local_vaddr: int + for local_vaddr in idautils.Functions(): + local_vaddr_str: str = str(local_vaddr) + new_name: str | None = local_vaddr_to_matched_name.get(local_vaddr_str) + if new_name: + self.safe_set_name(local_vaddr, new_name, check_user_flags=True) + matched_function_count += 1 else: - unmatched_local_functions.append(start_ea) + unmatched_function_count += 1 + + total_function_count += 1 - unmatched_portal_map = {} - # Track remote functions not matched - for func_id_str, func_vaddr in function_map.items(): - if int(func_vaddr) not in local_function_vaddrs_matched: - unmatched_remote_functions.append((int(func_vaddr), int(func_id_str))) - unmatched_portal_map[int(func_vaddr)] = int(func_id_str) - - logger.info(f"RevEng.AI: Matched {len(matched_functions)} functions") - logger.info(f"RevEng.AI: {len(unmatched_local_functions)} local functions not matched") - logger.info(f"RevEng.AI: {len(unmatched_remote_functions)} remote functions not matched") + logger.info(f"RevEng.AI: Matched {matched_function_count} functions") + logger.info(f"RevEng.AI: {unmatched_function_count} functions not matched") return GenericApiReturn( success=True, data=MatchedFunctionSummary( - matched_local_function_count=len(matched_functions), - unmatched_local_function_count=len(unmatched_local_functions), - unmatched_remote_function_count=len(unmatched_remote_functions), - total_function_count=len(function_map), + matched_function_count=matched_function_count, + unmatched_function_count=unmatched_function_count, + total_function_count=total_function_count, ), ) @@ -224,10 +208,9 @@ def _sync_analysis_data(self, _: threading.Event, analysis_id: int) -> None: result: FunctionDataTypesList | None = self._get_data_types(analysis_id) if result and result.total_data_types_count: - logger.debug(f"applying type information for {analysis_id}...") import_data_types: ImportDataTypes = ImportDataTypes() import_data_types.execute(result) else: - logger.debug(f"found no type information for {analysis_id}") + logger.warning(f"found no type information for {analysis_id}") self.call_callback(generic_return=response) diff --git a/reai_toolkit/app/services/analysis_sync/schema.py b/reai_toolkit/app/services/analysis_sync/schema.py index 3a54375..d78a5ad 100644 --- a/reai_toolkit/app/services/analysis_sync/schema.py +++ b/reai_toolkit/app/services/analysis_sync/schema.py @@ -2,7 +2,6 @@ class MatchedFunctionSummary(BaseModel): - matched_local_function_count: int - unmatched_local_function_count: int - unmatched_remote_function_count: int + matched_function_count: int + unmatched_function_count: int total_function_count: int diff --git a/reai_toolkit/app/services/auto_unstrip/auto_unstrip_service.py b/reai_toolkit/app/services/auto_unstrip/auto_unstrip_service.py index 714dfaa..8498b4d 100644 --- a/reai_toolkit/app/services/auto_unstrip/auto_unstrip_service.py +++ b/reai_toolkit/app/services/auto_unstrip/auto_unstrip_service.py @@ -74,10 +74,9 @@ def _fetch_unstrip_status(self, analysis_id: int) -> AutoUnstripResponse: with self.yield_api_client(sdk_config=self.sdk_config) as api_client: functions_api = FunctionsCoreApi(api_client=api_client) - result = functions_api.auto_unstrip( + result: AutoUnstripResponse = functions_api.auto_unstrip( analysis_id=analysis_id, auto_unstrip_request=AutoUnstripRequest( - min_similarity=0.9, apply=True, # Will not let the users pick names if enabled. ), ) diff --git a/reai_toolkit/app/services/data_types/data_types_service.py b/reai_toolkit/app/services/data_types/data_types_service.py index fb11af5..76480f6 100644 --- a/reai_toolkit/app/services/data_types/data_types_service.py +++ b/reai_toolkit/app/services/data_types/data_types_service.py @@ -1,4 +1,4 @@ -from revengai import Configuration +from revengai import Configuration, FunctionMapping from revengai.exceptions import NotFoundException from loguru import logger @@ -20,8 +20,15 @@ def __init__(self, netstore_service: SimpleNetStore, sdk_config: Configuration) super().__init__(netstore_service=netstore_service, sdk_config=sdk_config) def import_data_types(self, matches: dict[int, int]) -> None: - idt : ImportDataTypes = ImportDataTypes() + """ Import data type information from the remote and apply to our local analysis + + Args: + matches: Mapping of remote function id to local virtual address + """ + if len(matches) == 0: + return + idt : ImportDataTypes = ImportDataTypes() matched_function_ids: list[int] = list(matches.keys()) # Attempt to retrieve the data types from the API and apply them to our analysis. @@ -41,4 +48,3 @@ def _get_data_types(self, function_ids: list[int] | None = None) -> FunctionData ) if response.status: return response.data - diff --git a/reai_toolkit/app/services/rename/rename_service.py b/reai_toolkit/app/services/rename/rename_service.py index 0d76808..63b8e8a 100644 --- a/reai_toolkit/app/services/rename/rename_service.py +++ b/reai_toolkit/app/services/rename/rename_service.py @@ -83,14 +83,6 @@ def _rename_worker(self, stop_event: Optional[threading.Event] = None) -> None: # Do before for execute sync, if fails may not be called. self._rename_q.task_done() - # Display the total success/failure - ida_kernwin.execute_sync( - lambda: logger.info( - f"RevEng.AI: Renaming Batch Completed - Success: {len(function_list) - (total_errors or 0)}, Failures: {total_errors or 0}" - ), - ida_kernwin.MFF_FAST, - ) - def _rename_remote_function_safe(self, matched_func_list) -> GenericApiReturn: data = GenericApiReturn(success=False) diff --git a/reai_toolkit/app/transformations/import_data_types.py b/reai_toolkit/app/transformations/import_data_types.py index a2a44cd..e98ca92 100644 --- a/reai_toolkit/app/transformations/import_data_types.py +++ b/reai_toolkit/app/transformations/import_data_types.py @@ -42,8 +42,6 @@ def execute(self, functions: FunctionDataTypesList, matched_function_mapping: di if data_types is None: continue - logger.debug(f"importing data types for {function.function_id}") - # Track processed dependencies to prevent duplicate imports. # Without this: # - Shared dependencies get re-processed, breaking references (shows as invalid ordinals in IDA) @@ -127,6 +125,7 @@ def update_function(self, func: FunctionTypeOutput, ea: int) -> None: target_func: libbs.artifacts.Function | None = self.deci.functions.get(rva) # type: ignore if target_func is None: + logger.warning(f"failed to update function: {func.name} at rva: 0x{rva:0x}") return target_func.name = func.name diff --git a/reai_toolkit_entry.py b/reai_toolkit_entry.py index 567eea7..098bac0 100644 --- a/reai_toolkit_entry.py +++ b/reai_toolkit_entry.py @@ -4,7 +4,6 @@ import ida_kernwin import idaapi - def _add_vendor_paths(): here = Path(__file__).resolve().parent pkg_root = here / "reai_toolkit" # where your package + vendor live diff --git a/uv.lock b/uv.lock index 6bc7606..93888fb 100644 --- a/uv.lock +++ b/uv.lock @@ -973,7 +973,7 @@ wheels = [ [[package]] name = "revengai" -version = "2.56.0" +version = "2.61.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "lazy-imports" }, @@ -982,9 +982,9 @@ dependencies = [ { name = "typing-extensions" }, { name = "urllib3" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/52/e6/0daeb9b54439385a4921777f86590f098f138855bdea408b8dc1c8a8f2e8/revengai-2.56.0.tar.gz", hash = "sha256:7993c8cb407309f03456c7b48d391417895fa584777099f7597eb90df950472a", size = 264382, upload-time = "2025-12-04T09:13:13.572Z" } +sdist = { url = "https://files.pythonhosted.org/packages/80/e1/9f0677f31f186c5d15461ab699ab0413cf9b16242aa0f0ca8c7a94bfe010/revengai-2.61.0.tar.gz", hash = "sha256:be377cb412caee9f7c8da6eae131645fa86c2878bdb8715f8447adaa0e1f050e", size = 264371, upload-time = "2025-12-08T09:08:15.234Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/f1/b9/89c24a57979c81b4ff9f5801cdf2d5506ed321d62fd97a5cf3326d09c177/revengai-2.56.0-py3-none-any.whl", hash = "sha256:cecb601184c8308a5af80094434d8c8dd5a1a82201d708d12d8f3da106ede5ab", size = 485705, upload-time = "2025-12-04T09:13:12.445Z" }, + { url = "https://files.pythonhosted.org/packages/e3/6d/ca8f61862764daf20f08c07e1be5d0c7480df7732e2a75b0e405a0d784ea/revengai-2.61.0-py3-none-any.whl", hash = "sha256:ae913a8013920754d3e2cafbe55052880b0f8103213178b85172f1d674f25c55", size = 485700, upload-time = "2025-12-08T09:08:16.486Z" }, ] [[package]] From 9480dba2e55b61c7ffe7d75827c93062f815d946 Mon Sep 17 00:00:00 2001 From: belmegatron Date: Mon, 8 Dec 2025 18:25:44 +0000 Subject: [PATCH 10/11] feat(PLU-192): normalising types derived from DWARF info --- .../app/transformations/import_data_types.py | 31 +++++++++++-------- reai_toolkit_entry.py | 1 + 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/reai_toolkit/app/transformations/import_data_types.py b/reai_toolkit/app/transformations/import_data_types.py index e98ca92..d2f30d0 100644 --- a/reai_toolkit/app/transformations/import_data_types.py +++ b/reai_toolkit/app/transformations/import_data_types.py @@ -160,18 +160,23 @@ def update_function_arguments( target_function.header.args = {v.offset: v for v in imported_args.values()} @staticmethod - def normalise_type(type: str) -> str: - # TODO: PLU-214 There are inconsistencies with how types are presented, sometimes with a namespace and sometimes without. - # Need to investigate further as we need to retain namespace information otherwise potential for symbols clashing. - split_type: list[str] = type.split("::") - normalized: str = split_type[-1] + def normalise_type(data_type: str) -> str: + # When we obtain a type from DWARF information, it often looks something like `DWARF/stdint-uintn.h::uint32_t` + # Let's remove the DWARF/*.h prefix + if data_type.startswith("DWARF/"): + # Find the first occurence of `::` + delimiter: str = "::" + pos: int = data_type.find(delimiter) + data_type = data_type[pos+len(delimiter):] + + logger.debug(f"normalising type: {data_type}") # TODO: PLU-213 Add IDA typedefs for Ghidra primitives so we don't need to bother doing this... - if normalized == "uchar": - normalized = "unsigned char" - elif normalized == "qword": - normalized = "unsigned __int64" - elif normalized == "sqword": - normalized = "__int64" - - return normalized + if data_type == "uchar": + data_type = "unsigned char" + elif data_type == "qword": + data_type = "unsigned __int64" + elif data_type == "sqword": + data_type = "__int64" + + return data_type diff --git a/reai_toolkit_entry.py b/reai_toolkit_entry.py index 098bac0..567eea7 100644 --- a/reai_toolkit_entry.py +++ b/reai_toolkit_entry.py @@ -4,6 +4,7 @@ import ida_kernwin import idaapi + def _add_vendor_paths(): here = Path(__file__).resolve().parent pkg_root = here / "reai_toolkit" # where your package + vendor live From 3586838692b544f5f3a023be55fd1c4a5a20448e Mon Sep 17 00:00:00 2001 From: belmegatron Date: Mon, 8 Dec 2025 18:41:40 +0000 Subject: [PATCH 11/11] feat(PLU-192): analysis sync service now using data types service to import data types --- reai_toolkit/app/app.py | 9 ++-- .../services/analysis_sync/analysis_sync.py | 42 +++---------------- .../services/data_types/data_types_service.py | 1 - .../app/transformations/import_data_types.py | 2 - 4 files changed, 11 insertions(+), 43 deletions(-) diff --git a/reai_toolkit/app/app.py b/reai_toolkit/app/app.py index 1d540c8..b2a034c 100644 --- a/reai_toolkit/app/app.py +++ b/reai_toolkit/app/app.py @@ -41,7 +41,10 @@ def __init__(self, ida_version: str = "UNKNOWN", plugin_version: str = "UNKNOWN" self.analysis_status_service = AnalysisStatusService( netstore_service=self.netstore_service, sdk_config=sdk_config ) - self.analysis_sync_service = AnalysisSyncService( + self.data_types_service = ImportDataTypesService( + netstore_service=self.netstore_service, sdk_config=sdk_config + ) + self.analysis_sync_service = AnalysisSyncService(data_types_service=self.data_types_service, netstore_service=self.netstore_service, sdk_config=sdk_config ) self.existing_analyses_service = ExistingAnalysesService( @@ -59,6 +62,4 @@ def __init__(self, ida_version: str = "UNKNOWN", plugin_version: str = "UNKNOWN" self.matching_service = MatchingService( netstore_service=self.netstore_service, sdk_config=sdk_config ) - self.data_types_service = ImportDataTypesService( - netstore_service=self.netstore_service, sdk_config=sdk_config - ) + diff --git a/reai_toolkit/app/services/analysis_sync/analysis_sync.py b/reai_toolkit/app/services/analysis_sync/analysis_sync.py index 3ad3fc5..63268f7 100644 --- a/reai_toolkit/app/services/analysis_sync/analysis_sync.py +++ b/reai_toolkit/app/services/analysis_sync/analysis_sync.py @@ -10,40 +10,23 @@ from loguru import logger from revengai import ( AnalysesCoreApi, - ApiClient, Configuration, FunctionMapping, - FunctionDataTypesList, - FunctionsDataTypesApi, - BaseResponseFunctionDataTypesList, - Structure, - Enumeration, - TypeDefinition, - GlobalVariable ) from reai_toolkit.app.core.netstore_service import SimpleNetStore from reai_toolkit.app.core.shared_schema import GenericApiReturn from reai_toolkit.app.interfaces.thread_service import IThreadService from reai_toolkit.app.services.analysis_sync.schema import MatchedFunctionSummary -from reai_toolkit.app.transformations.import_data_types import ImportDataTypes - - -class TaggedDependency: - def __init__(self, dependency: Structure | Enumeration | TypeDefinition | GlobalVariable): - self.dependency: Structure | Enumeration | TypeDefinition | GlobalVariable = dependency - self.processed: bool = False - self.name: str = self.dependency.name - - def __repr__(self): - return self.dependency.__repr__() +from reai_toolkit.app.services.data_types.data_types_service import ImportDataTypesService class AnalysisSyncService(IThreadService): _thread_callback: Callable[..., Any] = None - def __init__(self, netstore_service: SimpleNetStore, sdk_config: Configuration): + def __init__(self, data_types_service: ImportDataTypesService, netstore_service: SimpleNetStore, sdk_config: Configuration): super().__init__(netstore_service=netstore_service, sdk_config=sdk_config) + self.data_types_service: ImportDataTypesService = data_types_service def call_callback(self, generic_return: GenericApiReturn) -> None: self._thread_callback(generic_return) @@ -149,15 +132,6 @@ def _match_functions( ), ) - def _get_data_types(self, analysis_id: int) -> FunctionDataTypesList | None: - with ApiClient(configuration=self.sdk_config) as api_client: - client = FunctionsDataTypesApi(api_client=api_client) - response: BaseResponseFunctionDataTypesList = ( - client.list_function_data_types_for_analysis(analysis_id) - ) - if response.status: - return response.data - def _safe_match_functions( self, func_map: FunctionMapping ) -> GenericApiReturn[MatchedFunctionSummary]: @@ -200,17 +174,13 @@ def _sync_analysis_data(self, _: threading.Event, analysis_id: int) -> None: return function_mapping: FunctionMapping | None = response.data + if function_mapping is None: + return response = self._safe_match_functions(func_map=function_mapping) if not response.success: self.call_callback(generic_return=response) return - result: FunctionDataTypesList | None = self._get_data_types(analysis_id) - if result and result.total_data_types_count: - import_data_types: ImportDataTypes = ImportDataTypes() - import_data_types.execute(result) - else: - logger.warning(f"found no type information for {analysis_id}") - + self.data_types_service.import_data_types({int(k): v for k, v in function_mapping.function_map.items()}) self.call_callback(generic_return=response) diff --git a/reai_toolkit/app/services/data_types/data_types_service.py b/reai_toolkit/app/services/data_types/data_types_service.py index 76480f6..3a12a07 100644 --- a/reai_toolkit/app/services/data_types/data_types_service.py +++ b/reai_toolkit/app/services/data_types/data_types_service.py @@ -31,7 +31,6 @@ def import_data_types(self, matches: dict[int, int]) -> None: idt : ImportDataTypes = ImportDataTypes() matched_function_ids: list[int] = list(matches.keys()) - # Attempt to retrieve the data types from the API and apply them to our analysis. try: response: FunctionDataTypesList | None = self._get_data_types(matched_function_ids) except NotFoundException as e: diff --git a/reai_toolkit/app/transformations/import_data_types.py b/reai_toolkit/app/transformations/import_data_types.py index d2f30d0..7234c45 100644 --- a/reai_toolkit/app/transformations/import_data_types.py +++ b/reai_toolkit/app/transformations/import_data_types.py @@ -169,8 +169,6 @@ def normalise_type(data_type: str) -> str: pos: int = data_type.find(delimiter) data_type = data_type[pos+len(delimiter):] - logger.debug(f"normalising type: {data_type}") - # TODO: PLU-213 Add IDA typedefs for Ghidra primitives so we don't need to bother doing this... if data_type == "uchar": data_type = "unsigned char"