diff --git a/pyproject.toml b/pyproject.toml index 0ca9541..cea415c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,8 @@ dev-dependencies = [ "pyqt5", "pyqt6-stubs", "pyqt6", - "PySide6" + "PySide6", + "debugpy>=1.8.17", ] # If you *do* want to install a stub package for IDA later, you can use a sources entry: diff --git a/reai_toolkit/app/app.py b/reai_toolkit/app/app.py index 1287437..b2a034c 100644 --- a/reai_toolkit/app/app.py +++ b/reai_toolkit/app/app.py @@ -14,6 +14,7 @@ from reai_toolkit.app.services.matching.matching_service import MatchingService from reai_toolkit.app.services.rename.rename_service import RenameService from reai_toolkit.app.services.upload.upload_service import UploadService +from reai_toolkit.app.services.data_types.data_types_service import ImportDataTypesService class App: @@ -40,7 +41,10 @@ def __init__(self, ida_version: str = "UNKNOWN", plugin_version: str = "UNKNOWN" self.analysis_status_service = AnalysisStatusService( netstore_service=self.netstore_service, sdk_config=sdk_config ) - self.analysis_sync_service = AnalysisSyncService( + self.data_types_service = ImportDataTypesService( + netstore_service=self.netstore_service, sdk_config=sdk_config + ) + self.analysis_sync_service = AnalysisSyncService(data_types_service=self.data_types_service, netstore_service=self.netstore_service, sdk_config=sdk_config ) self.existing_analyses_service = ExistingAnalysesService( @@ -58,3 +62,4 @@ def __init__(self, ida_version: str = "UNKNOWN", plugin_version: str = "UNKNOWN" self.matching_service = MatchingService( netstore_service=self.netstore_service, sdk_config=sdk_config ) + diff --git a/reai_toolkit/app/components/dialogs/auto_unstrip_dialog.py b/reai_toolkit/app/components/dialogs/auto_unstrip_dialog.py index 7f531a4..bcf9f5c 100644 --- a/reai_toolkit/app/components/dialogs/auto_unstrip_dialog.py +++ b/reai_toolkit/app/components/dialogs/auto_unstrip_dialog.py @@ -77,32 +77,37 @@ def _populate_table(self, matches: list[MatchedFunctionSuggestion]) -> None: hdr.setStretchLastSection(False) table.setRowCount(len(matches)) + + if QT_VER == 6: + flags = QtCore.Qt.ItemFlag.ItemIsSelectable | QtCore.Qt.ItemFlag.ItemIsEnabled + else: + flags = QtCore.Qt.ItemIsSelectable | QtCore.Qt.ItemIsEnabled for row, m in enumerate(matches): # 2. address cell addr_item = QtWidgets.QTableWidgetItem(hex(m.function_vaddr)) - addr_item.setFlags(QtCore.Qt.ItemIsSelectable | QtCore.Qt.ItemIsEnabled) + addr_item.setFlags(flags) addr_item.setToolTip(addr_item.text()) # show on hover table.setItem(row, 0, addr_item) # 3. current name cell current_name = get_safe_name(m.function_vaddr) or "" cur_item = QtWidgets.QTableWidgetItem(current_name) - cur_item.setFlags(QtCore.Qt.ItemIsSelectable | QtCore.Qt.ItemIsEnabled) + cur_item.setFlags(flags) cur_item.setToolTip(current_name) table.setItem(row, 1, cur_item) # 4. suggested name cell sug_name = m.suggested_name or "" sug_item = QtWidgets.QTableWidgetItem(sug_name) - sug_item.setFlags(QtCore.Qt.ItemIsSelectable | QtCore.Qt.ItemIsEnabled) + sug_item.setFlags(flags) sug_item.setToolTip(sug_name) # tooltip always has full text table.setItem(row, 2, sug_item) # 5. Suggested demangled name cell demangled_name = m.suggested_demangled_name or sug_name dem_item = QtWidgets.QTableWidgetItem(demangled_name) - dem_item.setFlags(QtCore.Qt.ItemIsSelectable | QtCore.Qt.ItemIsEnabled) + dem_item.setFlags(flags) dem_item.setToolTip(demangled_name) # tooltip always has full text table.setItem(row, 3, dem_item) diff --git a/reai_toolkit/app/components/dialogs/matching_dialog.py b/reai_toolkit/app/components/dialogs/matching_dialog.py index f7a9f06..423a7e2 100644 --- a/reai_toolkit/app/components/dialogs/matching_dialog.py +++ b/reai_toolkit/app/components/dialogs/matching_dialog.py @@ -26,6 +26,7 @@ SummaryEvent, ValidFunction, ) +from reai_toolkit.app.services.data_types.data_types_service import ImportDataTypesService from reai_toolkit.app.services.rename.rename_service import RenameService from reai_toolkit.app.services.rename.schema import RenameInput @@ -59,9 +60,10 @@ class MatchingWorker(QtCore.QObject): finished = Signal() # always emitted on exit errored = Signal(str) - def __init__(self, match_service, gen_kwargs: dict): + def __init__(self, match_service, data_types_service, gen_kwargs: dict): super().__init__() self._match_service = match_service + self._data_types_service = data_types_service self._gen_kwargs = gen_kwargs self._stop = False @@ -100,6 +102,7 @@ def __init__( func_map: dict[str, int], matching_service: MatchingService, rename_service: RenameService, + data_types_service: ImportDataTypesService, parent: QtWidgets.QWidget | None = None, ): super().__init__(parent=parent) @@ -108,8 +111,12 @@ def __init__( self.matching_service = matching_service self.rename_service = rename_service + self.data_types_service = data_types_service self._func_map = func_map + # Matched function id to original effective address + self.matched_func_to_original_ea: dict[int, int] = {} + self.ui = Ui_MatchingPanel() self.setWindowTitle("RevEng.AI — Function matching") self.ui.setupUi(self) @@ -214,6 +221,7 @@ def __init__( if hasattr(self.ui, "okRenameButton"): self.ui.okRenameButton.clicked.connect(self.enqueue_renames) + self.ui.okRenameButton.clicked.connect(self.import_data_types) # ----------------- Util buttons ----------------- self.ui.btnClearSelection.clicked.connect( @@ -699,7 +707,7 @@ def start_ann(self): self.stop_ann() # ensure previous worker is cleaned up self._matching_thread = QtCore.QThread(self) - self._matching_worker = MatchingWorker(self.matching_service, gen_kwargs) + self._matching_worker = MatchingWorker(self.matching_service, self.data_types_service, gen_kwargs) self._matching_worker.moveToThread(self._matching_thread) # connections @@ -1083,6 +1091,8 @@ def display_matching_results_multiple_functions(self, query: str = ""): r.matched_functions[0] if r.matched_functions else None ) + self.matched_func_to_original_ea[matched_function.function_id] = self._func_map[str(r.function_id)] + # Column 3: Matched Name table.setItem( row, @@ -1183,6 +1193,9 @@ def enqueue_renames(self): except Exception as e: print(f"Failed to enqueue renames: {e}") + + def import_data_types(self): + self.data_types_service.import_data_types(self.matched_func_to_original_ea) # ===================================================================== # (Optional) page-switch helpers diff --git a/reai_toolkit/app/coordinator.py b/reai_toolkit/app/coordinator.py index 55bf3b2..34d2d7a 100644 --- a/reai_toolkit/app/coordinator.py +++ b/reai_toolkit/app/coordinator.py @@ -73,6 +73,7 @@ def __init__(self, app, factory, log): log=log, auto_unstrip_service=app.auto_unstrip_service, rename_service=app.rename_service, + data_types_service=app.data_types_service ) self.ai_decompc: AiDecompCoordinator = AiDecompCoordinator( diff --git a/reai_toolkit/app/coordinators/auto_unstrip_coordinator.py b/reai_toolkit/app/coordinators/auto_unstrip_coordinator.py index ad6dd26..dbc6bcf 100644 --- a/reai_toolkit/app/coordinators/auto_unstrip_coordinator.py +++ b/reai_toolkit/app/coordinators/auto_unstrip_coordinator.py @@ -10,6 +10,7 @@ ) from reai_toolkit.app.services.rename.rename_service import RenameService from reai_toolkit.app.services.rename.schema import RenameInput +from reai_toolkit.app.services.data_types.data_types_service import ImportDataTypesService class AutoUnstripCoordinator(BaseCoordinator): @@ -25,10 +26,12 @@ def __init__( log, auto_unstrip_service: AutoUnstripService, rename_service: RenameService, + data_types_service: ImportDataTypesService, ): super().__init__(app=app, factory=factory, log=log) self.auto_unstrip_service = auto_unstrip_service self.rename_service = rename_service + self.data_types_service: ImportDataTypesService = data_types_service def run_dialog(self) -> None: if self.auto_unstrip_service.is_worker_running(): @@ -46,7 +49,7 @@ def run_dialog(self) -> None: def _open_auto_unstrip_dialog(self) -> None: self.factory.auto_unstrip(response=self.last_response.data).open_modal() - def _on_complete(self, response: GenericApiReturn[AutoUnstripResponse]): + def _on_complete(self, response: GenericApiReturn[AutoUnstripResponse]) -> None: print("Auto-unstrip process completed.") if not response.success: @@ -54,6 +57,7 @@ def _on_complete(self, response: GenericApiReturn[AutoUnstripResponse]): return rename_list = [] + data_types_mapping: dict[int, int] = {} self.last_response = response @@ -65,8 +69,10 @@ def _on_complete(self, response: GenericApiReturn[AutoUnstripResponse]): new_name=function.suggested_demangled_name, ) ) + data_types_mapping[function.function_id] = function.function_vaddr self.rename_service.enqueue_rename(rename_list=rename_list) + self.data_types_service.import_data_types(data_types_mapping) ida_kernwin.execute_ui_requests([self._open_auto_unstrip_dialog]) diff --git a/reai_toolkit/app/coordinators/sync_analysis_coordinator.py b/reai_toolkit/app/coordinators/sync_analysis_coordinator.py index 4e3dd06..d32df8d 100644 --- a/reai_toolkit/app/coordinators/sync_analysis_coordinator.py +++ b/reai_toolkit/app/coordinators/sync_analysis_coordinator.py @@ -48,9 +48,8 @@ def _on_complete( """ if generic_return.success: self.safe_info( - msg=f"Analysis data synced successfully. \n\nSynced {generic_return.data.matched_local_function_count} functions with remote analysis." - + f"\n{generic_return.data.unmatched_local_function_count} local functions not present in remote analysis." - + f"\n{generic_return.data.unmatched_remote_function_count} remote functions not present in local analysis." + msg=f"Analysis data synced successfully. \n\nSynced {generic_return.data.matched_function_count} functions with remote analysis." + + f"\n{generic_return.data.unmatched_function_count} local functions not present in remote analysis." ) else: self.safe_error(message=generic_return.error_message) diff --git a/reai_toolkit/app/factory.py b/reai_toolkit/app/factory.py index 8b5fbd7..3d06317 100644 --- a/reai_toolkit/app/factory.py +++ b/reai_toolkit/app/factory.py @@ -70,6 +70,7 @@ def function_matching( func_map=func_map, matching_service=self.app.matching_service, rename_service=self.app.rename_service, + data_types_service=self.app.data_types_service, parent=self.parent, ) diff --git a/reai_toolkit/app/services/analysis_sync/analysis_sync.py b/reai_toolkit/app/services/analysis_sync/analysis_sync.py index 6f76b0b..63268f7 100644 --- a/reai_toolkit/app/services/analysis_sync/analysis_sync.py +++ b/reai_toolkit/app/services/analysis_sync/analysis_sync.py @@ -2,25 +2,31 @@ from typing import Any, Callable import ida_kernwin +from libbs.decompilers.ida.compat import execute_write, execute_read + import idautils import idaapi from loguru import logger -from revengai import AnalysesCoreApi, Configuration, FunctionMapping -from libbs.decompilers.ida.compat import execute_write, execute_read +from revengai import ( + AnalysesCoreApi, + Configuration, + FunctionMapping, +) from reai_toolkit.app.core.netstore_service import SimpleNetStore from reai_toolkit.app.core.shared_schema import GenericApiReturn from reai_toolkit.app.interfaces.thread_service import IThreadService from reai_toolkit.app.services.analysis_sync.schema import MatchedFunctionSummary -from revengai import BaseResponseBasic +from reai_toolkit.app.services.data_types.data_types_service import ImportDataTypesService class AnalysisSyncService(IThreadService): _thread_callback: Callable[..., Any] = None - def __init__(self, netstore_service: SimpleNetStore, sdk_config: Configuration): + def __init__(self, data_types_service: ImportDataTypesService, netstore_service: SimpleNetStore, sdk_config: Configuration): super().__init__(netstore_service=netstore_service, sdk_config=sdk_config) + self.data_types_service: ImportDataTypesService = data_types_service def call_callback(self, generic_return: GenericApiReturn) -> None: self._thread_callback(generic_return) @@ -51,9 +57,7 @@ def _fetch_model_id(self, analysis_id: int) -> int: with self.yield_api_client(sdk_config=self.sdk_config) as api_client: analyses_client = AnalysesCoreApi(api_client) - analysis_details: BaseResponseBasic = analyses_client.get_analysis_basic_info( - analysis_id=analysis_id - ) + analysis_details = analyses_client.get_analysis_basic_info(analysis_id=analysis_id) model_id = analysis_details.data.model_id self.safe_put_model_id(model_id=model_id) model_name = analysis_details.data.model_name @@ -85,10 +89,8 @@ def _fetch_function_map(self, analysis_id: int) -> FunctionMapping: with self.yield_api_client(sdk_config=self.sdk_config) as api_client: analyses_client = AnalysesCoreApi(api_client) - function_map = analyses_client.get_analysis_function_map( - analysis_id=analysis_id - ) - func_map = function_map.data.function_maps + function_map = analyses_client.get_analysis_function_map(analysis_id=analysis_id) + func_map: FunctionMapping = function_map.data.function_maps self.safe_put_function_mapping(func_map=func_map) return func_map @@ -96,70 +98,44 @@ def _match_functions( self, func_map: FunctionMapping, ) -> GenericApiReturn[MatchedFunctionSummary]: - function_map = func_map.function_map - inverse_function_map = func_map.inverse_function_map + # Mapping of local function addresses to mangled names + local_vaddr_to_matched_name: dict[str, str] = func_map.name_map - logger.info( - f"RevEng.AI: Retrieved {len(function_map)} function mappings from analysis" - ) + logger.info(f"RevEng.AI: Retrieved {len(local_vaddr_to_matched_name)} functions from analysis") # Compute which IDA functions match the revengai analysis functions - matched_functions = [] - unmatched_local_functions = [] - unmatched_remote_functions = [] - - # Track local functions matched - local_function_vaddrs_matched = set() - fun_count = 0 - for key, value in func_map.name_map.items(): - if "FUN_" in value: - fun_count += 1 - - for start_ea in idautils.Functions(): - if str(start_ea) in inverse_function_map: - new_name: str | None = func_map.name_map.get(str(start_ea), None) - if new_name is None: - continue - - self.safe_set_name(start_ea, new_name, check_user_flags=True) - matched_functions.append( - (int(inverse_function_map[str(start_ea)]), start_ea) - ) - local_function_vaddrs_matched.add(start_ea) + matched_function_count: int = 0 + unmatched_function_count: int = 0 + total_function_count: int = 0 + + local_vaddr: int + for local_vaddr in idautils.Functions(): + local_vaddr_str: str = str(local_vaddr) + new_name: str | None = local_vaddr_to_matched_name.get(local_vaddr_str) + if new_name: + self.safe_set_name(local_vaddr, new_name, check_user_flags=True) + matched_function_count += 1 else: - unmatched_local_functions.append(start_ea) - - unmatched_portal_map = {} - # Track remote functions not matched - for func_id_str, func_vaddr in function_map.items(): - if int(func_vaddr) not in local_function_vaddrs_matched: - unmatched_remote_functions.append((int(func_vaddr), int(func_id_str))) - unmatched_portal_map[int(func_vaddr)] = int(func_id_str) - - logger.info(f"RevEng.AI: Matched {len(matched_functions)} functions") - logger.info( - f"RevEng.AI: {len(unmatched_local_functions)} local functions not matched" - ) - logger.info( - f"RevEng.AI: {len(unmatched_remote_functions)} remote functions not matched" - ) + unmatched_function_count += 1 + + total_function_count += 1 + + logger.info(f"RevEng.AI: Matched {matched_function_count} functions") + logger.info(f"RevEng.AI: {unmatched_function_count} functions not matched") return GenericApiReturn( success=True, data=MatchedFunctionSummary( - matched_local_function_count=len(matched_functions), - unmatched_local_function_count=len(unmatched_local_functions), - unmatched_remote_function_count=len(unmatched_remote_functions), - total_function_count=len(function_map), + matched_function_count=matched_function_count, + unmatched_function_count=unmatched_function_count, + total_function_count=total_function_count, ), ) def _safe_match_functions( self, func_map: FunctionMapping ) -> GenericApiReturn[MatchedFunctionSummary]: - data = GenericApiReturn( - success=False, error_message="Failed to match functions." - ) + data = GenericApiReturn(success=False, error_message="Failed to match functions.") def _do(): try: @@ -175,15 +151,12 @@ def _do(): return data - def _sync_analysis_data( - self, stop_event: threading.Event, analysis_id: int - ) -> None: + def _sync_analysis_data(self, _: threading.Event, analysis_id: int) -> None: """ Syncs the analysis data until completion or failure. """ # Fetch Model ID - Used for function matching - response = self.api_request_returning( fn=lambda: self._fetch_model_id(analysis_id=analysis_id) ) @@ -200,8 +173,14 @@ def _sync_analysis_data( self.call_callback(generic_return=response) return - function_mapping: FunctionMapping = response.data + function_mapping: FunctionMapping | None = response.data + if function_mapping is None: + return response = self._safe_match_functions(func_map=function_mapping) + if not response.success: + self.call_callback(generic_return=response) + return + self.data_types_service.import_data_types({int(k): v for k, v in function_mapping.function_map.items()}) self.call_callback(generic_return=response) diff --git a/reai_toolkit/app/services/analysis_sync/schema.py b/reai_toolkit/app/services/analysis_sync/schema.py index 3a54375..d78a5ad 100644 --- a/reai_toolkit/app/services/analysis_sync/schema.py +++ b/reai_toolkit/app/services/analysis_sync/schema.py @@ -2,7 +2,6 @@ class MatchedFunctionSummary(BaseModel): - matched_local_function_count: int - unmatched_local_function_count: int - unmatched_remote_function_count: int + matched_function_count: int + unmatched_function_count: int total_function_count: int diff --git a/reai_toolkit/app/services/auto_unstrip/auto_unstrip_service.py b/reai_toolkit/app/services/auto_unstrip/auto_unstrip_service.py index 714dfaa..8498b4d 100644 --- a/reai_toolkit/app/services/auto_unstrip/auto_unstrip_service.py +++ b/reai_toolkit/app/services/auto_unstrip/auto_unstrip_service.py @@ -74,10 +74,9 @@ def _fetch_unstrip_status(self, analysis_id: int) -> AutoUnstripResponse: with self.yield_api_client(sdk_config=self.sdk_config) as api_client: functions_api = FunctionsCoreApi(api_client=api_client) - result = functions_api.auto_unstrip( + result: AutoUnstripResponse = functions_api.auto_unstrip( analysis_id=analysis_id, auto_unstrip_request=AutoUnstripRequest( - min_similarity=0.9, apply=True, # Will not let the users pick names if enabled. ), ) diff --git a/reai_toolkit/app/services/data_types/__init__.py b/reai_toolkit/app/services/data_types/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/reai_toolkit/app/services/data_types/data_types_service.py b/reai_toolkit/app/services/data_types/data_types_service.py new file mode 100644 index 0000000..3a12a07 --- /dev/null +++ b/reai_toolkit/app/services/data_types/data_types_service.py @@ -0,0 +1,49 @@ +from revengai import Configuration, FunctionMapping +from revengai.exceptions import NotFoundException + +from loguru import logger + +from revengai.models.matched_function import MatchedFunction +from reai_toolkit.app.core.netstore_service import SimpleNetStore +from revengai import ( + FunctionsDataTypesApi, + FunctionDataTypesList, + BaseResponseFunctionDataTypesList +) + +from reai_toolkit.app.interfaces.thread_service import IThreadService +from reai_toolkit.app.transformations.import_data_types import ImportDataTypes + + +class ImportDataTypesService(IThreadService): + def __init__(self, netstore_service: SimpleNetStore, sdk_config: Configuration) -> None: + super().__init__(netstore_service=netstore_service, sdk_config=sdk_config) + + def import_data_types(self, matches: dict[int, int]) -> None: + """ Import data type information from the remote and apply to our local analysis + + Args: + matches: Mapping of remote function id to local virtual address + """ + if len(matches) == 0: + return + + idt : ImportDataTypes = ImportDataTypes() + matched_function_ids: list[int] = list(matches.keys()) + + try: + response: FunctionDataTypesList | None = self._get_data_types(matched_function_ids) + except NotFoundException as e: + logger.warning(f"failed to apply data types for {matched_function_ids} due to: {e}") + else: + if response: + idt.execute(response, matched_function_mapping=matches) + + def _get_data_types(self, function_ids: list[int] | None = None) -> FunctionDataTypesList | None: + with self.yield_api_client(sdk_config=self.sdk_config) as api_client: + client = FunctionsDataTypesApi(api_client=api_client) + response: BaseResponseFunctionDataTypesList = ( + client.list_function_data_types_for_functions(function_ids=function_ids) # type: ignore + ) + if response.status: + return response.data diff --git a/reai_toolkit/app/services/rename/rename_service.py b/reai_toolkit/app/services/rename/rename_service.py index 0d76808..63b8e8a 100644 --- a/reai_toolkit/app/services/rename/rename_service.py +++ b/reai_toolkit/app/services/rename/rename_service.py @@ -83,14 +83,6 @@ def _rename_worker(self, stop_event: Optional[threading.Event] = None) -> None: # Do before for execute sync, if fails may not be called. self._rename_q.task_done() - # Display the total success/failure - ida_kernwin.execute_sync( - lambda: logger.info( - f"RevEng.AI: Renaming Batch Completed - Success: {len(function_list) - (total_errors or 0)}, Failures: {total_errors or 0}" - ), - ida_kernwin.MFF_FAST, - ) - def _rename_remote_function_safe(self, matched_func_list) -> GenericApiReturn: data = GenericApiReturn(success=False) diff --git a/reai_toolkit/app/transformations/__init__.py b/reai_toolkit/app/transformations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/reai_toolkit/app/transformations/import_data_types.py b/reai_toolkit/app/transformations/import_data_types.py new file mode 100644 index 0000000..7234c45 --- /dev/null +++ b/reai_toolkit/app/transformations/import_data_types.py @@ -0,0 +1,180 @@ +from typing import cast + +import libbs.artifacts +from libbs.api import DecompilerInterface +from libbs.decompilers.ida.compat import execute_ui +from loguru import logger +from revengai import ( + Argument, + Enumeration, + FunctionDataTypesList, + FunctionHeader, + FunctionInfoInputFuncDepsInner, + FunctionInfoOutput, + FunctionTypeOutput, + Structure, + TypeDefinition, +) + + +class TaggedDependency: + def __init__(self, dependency: Structure | Enumeration | TypeDefinition) -> None: + self.dependency: Structure | Enumeration | TypeDefinition = dependency + self.processed: bool = False + self.name: str = self.dependency.name + + def __repr__(self) -> str: + return self.dependency.__repr__() + + +class ImportDataTypes: + def __init__(self) -> None: + self.deci: DecompilerInterface + + @execute_ui + def execute(self, functions: FunctionDataTypesList, matched_function_mapping: dict[int, int] = {}) -> None: + self.deci = DecompilerInterface.discover(force_decompiler="ida") # type: ignore + lookup: dict[str, TaggedDependency] = {} + + for function in functions.items: + data_types: FunctionInfoOutput | None = function.data_types + + if data_types is None: + continue + + # Track processed dependencies to prevent duplicate imports. + # Without this: + # - Shared dependencies get re-processed, breaking references (shows as invalid ordinals in IDA) + # - Cannot resolve subdependencies (e.g. struct fields that reference other imported types) + for dep in data_types.func_deps: + if dep.actual_instance is None: + continue + + if dep.actual_instance.name not in lookup: + lookup.update({dep.actual_instance.name: TaggedDependency(dep.actual_instance)}) # type: ignore + + dependency: FunctionInfoInputFuncDepsInner + for dependency in data_types.func_deps: + if dependency.actual_instance is None: + continue + + tagged_dependency: TaggedDependency | None = lookup.get(dependency.actual_instance.name) + if tagged_dependency: + self.process_dependency(tagged_dependency, lookup) + + func: FunctionTypeOutput | None = data_types.func_types + if func: + # If we obtained data types from a matched function, we need to make sure we map it to the original effective address. + if matched_function_mapping: + ea: int = matched_function_mapping[function.function_id] + else: + ea: int = func.addr + + self.update_function(func, ea) + + + def process_dependency( + self, tagged_dependency: TaggedDependency, lookup: dict[str, TaggedDependency] + ) -> None: + if tagged_dependency.processed: + return + + dependency: Structure | Enumeration | TypeDefinition = tagged_dependency.dependency + match dependency: + case Structure(): + self.update_struct(cast(Structure, dependency), lookup) + case Enumeration(): + self.update_enum(cast(Enumeration, dependency)) + case TypeDefinition(): + self.update_typedef(cast(TypeDefinition, dependency), lookup) + case _: + logger.warning(f"unsupported dependency type: {dependency}") + + tagged_dependency.processed = True + + def update_struct(self, imported_struct: Structure, lookup: dict[str, TaggedDependency]) -> None: + if imported_struct.size is None: + return + + for member in imported_struct.members.values(): + subdependency: TaggedDependency | None = lookup.get(member.type) + if subdependency: + self.process_dependency(subdependency, lookup) + member.type = self.normalise_type(member.type) + + self.deci.structs[imported_struct.name] = libbs.artifacts.Struct( + name=imported_struct.name, size=imported_struct.size, members={v.offset: v for v in imported_struct.members.values()} # type: ignore + ) + + def update_enum(self, imported_enum: Enumeration) -> None: + self.deci.enums[imported_enum.name] = libbs.artifacts.Enum(name=imported_enum.name, members=imported_enum.members) + + def update_typedef(self, imported_typedef: TypeDefinition, lookup: dict[str, TaggedDependency]) -> None: + subdependency: TaggedDependency | None = lookup.get(imported_typedef.type) + if subdependency: + self.process_dependency(subdependency, lookup) + + normalized_type: str = self.normalise_type(imported_typedef.type) + self.deci.typedefs[imported_typedef.name] = libbs.artifacts.Typedef( + name=imported_typedef.name, type_=normalized_type + ) + + def update_function(self, func: FunctionTypeOutput, ea: int) -> None: + base_address: int = self.deci.binary_base_addr + rva: int = ea - base_address + + target_func: libbs.artifacts.Function | None = self.deci.functions.get(rva) # type: ignore + if target_func is None: + logger.warning(f"failed to update function: {func.name} at rva: 0x{rva:0x}") + return + + target_func.name = func.name + target_func.size = func.size + target_func.type = func.type + + # Check the target function has a header. + if target_func.header: + self.update_header(func.header, target_func) + + self.deci.functions[rva] = target_func + + def update_header( + self, imported_header: FunctionHeader, target_function: libbs.artifacts.Function + ) -> None: + if target_function.header is None: + return + + target_function.header.name = imported_header.name + target_function.header.type = self.normalise_type(imported_header.type) + self.update_function_arguments(imported_header.args, target_function) + + def update_function_arguments( + self, imported_args: dict[str, Argument], target_function: libbs.artifacts.Function + ) -> None: + if target_function.header is None: + return + + for arg in imported_args.values(): + arg.type = self.normalise_type(arg.type) + + target_function.header.args = {v.offset: v for v in imported_args.values()} + + @staticmethod + def normalise_type(data_type: str) -> str: + # When we obtain a type from DWARF information, it often looks something like `DWARF/stdint-uintn.h::uint32_t` + # Let's remove the DWARF/*.h prefix + if data_type.startswith("DWARF/"): + # Find the first occurence of `::` + delimiter: str = "::" + pos: int = data_type.find(delimiter) + data_type = data_type[pos+len(delimiter):] + + # TODO: PLU-213 Add IDA typedefs for Ghidra primitives so we don't need to bother doing this... + if data_type == "uchar": + data_type = "unsigned char" + elif data_type == "qword": + data_type = "unsigned __int64" + elif data_type == "sqword": + data_type = "__int64" + + return data_type diff --git a/uv.lock b/uv.lock index 22f97d2..93888fb 100644 --- a/uv.lock +++ b/uv.lock @@ -170,6 +170,35 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" }, ] +[[package]] +name = "debugpy" +version = "1.8.17" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/15/ad/71e708ff4ca377c4230530d6a7aa7992592648c122a2cd2b321cf8b35a76/debugpy-1.8.17.tar.gz", hash = "sha256:fd723b47a8c08892b1a16b2c6239a8b96637c62a59b94bb5dab4bac592a58a8e", size = 1644129, upload-time = "2025-09-17T16:33:20.633Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/38/36/b57c6e818d909f6e59c0182252921cf435e0951126a97e11de37e72ab5e1/debugpy-1.8.17-cp310-cp310-macosx_15_0_x86_64.whl", hash = "sha256:c41d2ce8bbaddcc0009cc73f65318eedfa3dbc88a8298081deb05389f1ab5542", size = 2098021, upload-time = "2025-09-17T16:33:22.556Z" }, + { url = "https://files.pythonhosted.org/packages/be/01/0363c7efdd1e9febd090bb13cee4fb1057215b157b2979a4ca5ccb678217/debugpy-1.8.17-cp310-cp310-manylinux_2_34_x86_64.whl", hash = "sha256:1440fd514e1b815edd5861ca394786f90eb24960eb26d6f7200994333b1d79e3", size = 3087399, upload-time = "2025-09-17T16:33:24.292Z" }, + { url = "https://files.pythonhosted.org/packages/79/bc/4a984729674aa9a84856650438b9665f9a1d5a748804ac6f37932ce0d4aa/debugpy-1.8.17-cp310-cp310-win32.whl", hash = "sha256:3a32c0af575749083d7492dc79f6ab69f21b2d2ad4cd977a958a07d5865316e4", size = 5230292, upload-time = "2025-09-17T16:33:26.137Z" }, + { url = "https://files.pythonhosted.org/packages/5d/19/2b9b3092d0cf81a5aa10c86271999453030af354d1a5a7d6e34c574515d7/debugpy-1.8.17-cp310-cp310-win_amd64.whl", hash = "sha256:a3aad0537cf4d9c1996434be68c6c9a6d233ac6f76c2a482c7803295b4e4f99a", size = 5261885, upload-time = "2025-09-17T16:33:27.592Z" }, + { url = "https://files.pythonhosted.org/packages/d8/53/3af72b5c159278c4a0cf4cffa518675a0e73bdb7d1cac0239b815502d2ce/debugpy-1.8.17-cp311-cp311-macosx_15_0_universal2.whl", hash = "sha256:d3fce3f0e3de262a3b67e69916d001f3e767661c6e1ee42553009d445d1cd840", size = 2207154, upload-time = "2025-09-17T16:33:29.457Z" }, + { url = "https://files.pythonhosted.org/packages/8f/6d/204f407df45600e2245b4a39860ed4ba32552330a0b3f5f160ae4cc30072/debugpy-1.8.17-cp311-cp311-manylinux_2_34_x86_64.whl", hash = "sha256:c6bdf134457ae0cac6fb68205776be635d31174eeac9541e1d0c062165c6461f", size = 3170322, upload-time = "2025-09-17T16:33:30.837Z" }, + { url = "https://files.pythonhosted.org/packages/f2/13/1b8f87d39cf83c6b713de2620c31205299e6065622e7dd37aff4808dd410/debugpy-1.8.17-cp311-cp311-win32.whl", hash = "sha256:e79a195f9e059edfe5d8bf6f3749b2599452d3e9380484cd261f6b7cd2c7c4da", size = 5155078, upload-time = "2025-09-17T16:33:33.331Z" }, + { url = "https://files.pythonhosted.org/packages/c2/c5/c012c60a2922cc91caa9675d0ddfbb14ba59e1e36228355f41cab6483469/debugpy-1.8.17-cp311-cp311-win_amd64.whl", hash = "sha256:b532282ad4eca958b1b2d7dbcb2b7218e02cb934165859b918e3b6ba7772d3f4", size = 5179011, upload-time = "2025-09-17T16:33:35.711Z" }, + { url = "https://files.pythonhosted.org/packages/08/2b/9d8e65beb2751876c82e1aceb32f328c43ec872711fa80257c7674f45650/debugpy-1.8.17-cp312-cp312-macosx_15_0_universal2.whl", hash = "sha256:f14467edef672195c6f6b8e27ce5005313cb5d03c9239059bc7182b60c176e2d", size = 2549522, upload-time = "2025-09-17T16:33:38.466Z" }, + { url = "https://files.pythonhosted.org/packages/b4/78/eb0d77f02971c05fca0eb7465b18058ba84bd957062f5eec82f941ac792a/debugpy-1.8.17-cp312-cp312-manylinux_2_34_x86_64.whl", hash = "sha256:24693179ef9dfa20dca8605905a42b392be56d410c333af82f1c5dff807a64cc", size = 4309417, upload-time = "2025-09-17T16:33:41.299Z" }, + { url = "https://files.pythonhosted.org/packages/37/42/c40f1d8cc1fed1e75ea54298a382395b8b937d923fcf41ab0797a554f555/debugpy-1.8.17-cp312-cp312-win32.whl", hash = "sha256:6a4e9dacf2cbb60d2514ff7b04b4534b0139facbf2abdffe0639ddb6088e59cf", size = 5277130, upload-time = "2025-09-17T16:33:43.554Z" }, + { url = "https://files.pythonhosted.org/packages/72/22/84263b205baad32b81b36eac076de0cdbe09fe2d0637f5b32243dc7c925b/debugpy-1.8.17-cp312-cp312-win_amd64.whl", hash = "sha256:e8f8f61c518952fb15f74a302e068b48d9c4691768ade433e4adeea961993464", size = 5319053, upload-time = "2025-09-17T16:33:53.033Z" }, + { url = "https://files.pythonhosted.org/packages/50/76/597e5cb97d026274ba297af8d89138dfd9e695767ba0e0895edb20963f40/debugpy-1.8.17-cp313-cp313-macosx_15_0_universal2.whl", hash = "sha256:857c1dd5d70042502aef1c6d1c2801211f3ea7e56f75e9c335f434afb403e464", size = 2538386, upload-time = "2025-09-17T16:33:54.594Z" }, + { url = "https://files.pythonhosted.org/packages/5f/60/ce5c34fcdfec493701f9d1532dba95b21b2f6394147234dce21160bd923f/debugpy-1.8.17-cp313-cp313-manylinux_2_34_x86_64.whl", hash = "sha256:3bea3b0b12f3946e098cce9b43c3c46e317b567f79570c3f43f0b96d00788088", size = 4292100, upload-time = "2025-09-17T16:33:56.353Z" }, + { url = "https://files.pythonhosted.org/packages/e8/95/7873cf2146577ef71d2a20bf553f12df865922a6f87b9e8ee1df04f01785/debugpy-1.8.17-cp313-cp313-win32.whl", hash = "sha256:e34ee844c2f17b18556b5bbe59e1e2ff4e86a00282d2a46edab73fd7f18f4a83", size = 5277002, upload-time = "2025-09-17T16:33:58.231Z" }, + { url = "https://files.pythonhosted.org/packages/46/11/18c79a1cee5ff539a94ec4aa290c1c069a5580fd5cfd2fb2e282f8e905da/debugpy-1.8.17-cp313-cp313-win_amd64.whl", hash = "sha256:6c5cd6f009ad4fca8e33e5238210dc1e5f42db07d4b6ab21ac7ffa904a196420", size = 5319047, upload-time = "2025-09-17T16:34:00.586Z" }, + { url = "https://files.pythonhosted.org/packages/de/45/115d55b2a9da6de812696064ceb505c31e952c5d89c4ed1d9bb983deec34/debugpy-1.8.17-cp314-cp314-macosx_15_0_universal2.whl", hash = "sha256:045290c010bcd2d82bc97aa2daf6837443cd52f6328592698809b4549babcee1", size = 2536899, upload-time = "2025-09-17T16:34:02.657Z" }, + { url = "https://files.pythonhosted.org/packages/5a/73/2aa00c7f1f06e997ef57dc9b23d61a92120bec1437a012afb6d176585197/debugpy-1.8.17-cp314-cp314-manylinux_2_34_x86_64.whl", hash = "sha256:b69b6bd9dba6a03632534cdf67c760625760a215ae289f7489a452af1031fe1f", size = 4268254, upload-time = "2025-09-17T16:34:04.486Z" }, + { url = "https://files.pythonhosted.org/packages/86/b5/ed3e65c63c68a6634e3ba04bd10255c8e46ec16ebed7d1c79e4816d8a760/debugpy-1.8.17-cp314-cp314-win32.whl", hash = "sha256:5c59b74aa5630f3a5194467100c3b3d1c77898f9ab27e3f7dc5d40fc2f122670", size = 5277203, upload-time = "2025-09-17T16:34:06.65Z" }, + { url = "https://files.pythonhosted.org/packages/b0/26/394276b71c7538445f29e792f589ab7379ae70fd26ff5577dfde71158e96/debugpy-1.8.17-cp314-cp314-win_amd64.whl", hash = "sha256:893cba7bb0f55161de4365584b025f7064e1f88913551bcd23be3260b231429c", size = 5318493, upload-time = "2025-09-17T16:34:08.483Z" }, + { url = "https://files.pythonhosted.org/packages/b0/d0/89247ec250369fc76db477720a26b2fce7ba079ff1380e4ab4529d2fe233/debugpy-1.8.17-py2.py3-none-any.whl", hash = "sha256:60c7dca6571efe660ccb7a9508d73ca14b8796c4ed484c2002abba714226cfef", size = 5283210, upload-time = "2025-09-17T16:34:25.835Z" }, +] + [[package]] name = "exceptiongroup" version = "1.3.0" @@ -889,6 +918,7 @@ dependencies = [ [package.dev-dependencies] dev = [ { name = "black" }, + { name = "debugpy" }, { name = "idapro" }, { name = "isort" }, { name = "mypy" }, @@ -913,6 +943,7 @@ requires-dist = [ [package.metadata.requires-dev] dev = [ { name = "black", specifier = ">=24.8" }, + { name = "debugpy", specifier = ">=1.8.17" }, { name = "idapro", specifier = ">=0.0.5" }, { name = "isort", specifier = ">=5.13" }, { name = "mypy", specifier = ">=1.12" }, @@ -942,7 +973,7 @@ wheels = [ [[package]] name = "revengai" -version = "2.14.1" +version = "2.61.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "lazy-imports" }, @@ -951,9 +982,9 @@ dependencies = [ { name = "typing-extensions" }, { name = "urllib3" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/74/a8/f81ca6f12f457ad29c1c0daee2fb3218e655f97a53f7c64b2b139162c37f/revengai-2.14.1.tar.gz", hash = "sha256:3fbbe665253595bc54a3019a9e0299810020e4af30c616cea942950be5f359ff", size = 273563, upload-time = "2025-10-27T11:57:21.968Z" } +sdist = { url = "https://files.pythonhosted.org/packages/80/e1/9f0677f31f186c5d15461ab699ab0413cf9b16242aa0f0ca8c7a94bfe010/revengai-2.61.0.tar.gz", hash = "sha256:be377cb412caee9f7c8da6eae131645fa86c2878bdb8715f8447adaa0e1f050e", size = 264371, upload-time = "2025-12-08T09:08:15.234Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/99/4c/c332b6cee7e56273611dfcc898772118892fb4e69d50031b7ed7ab5a0da2/revengai-2.14.1-py3-none-any.whl", hash = "sha256:1fde9ea6b44ae46b9f069e2838f0924fe603f61d5e0fabef702904f67c3c3aa5", size = 507066, upload-time = "2025-10-27T11:57:20.611Z" }, + { url = "https://files.pythonhosted.org/packages/e3/6d/ca8f61862764daf20f08c07e1be5d0c7480df7732e2a75b0e405a0d784ea/revengai-2.61.0-py3-none-any.whl", hash = "sha256:ae913a8013920754d3e2cafbe55052880b0f8103213178b85172f1d674f25c55", size = 485700, upload-time = "2025-12-08T09:08:16.486Z" }, ] [[package]]