Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions capa/features/extractors/ida/basicblock.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from typing import Iterator

import idaapi
from ida_domain import Database

import capa.features.extractors.ida.helpers
from capa.features.common import Feature, Characteristic
Expand Down Expand Up @@ -59,50 +60,51 @@ def is_printable_utf16le(chars_: bytes):
return 0


def is_mov_imm_to_stack(insn: idaapi.insn_t) -> bool:
def is_mov_imm_to_stack(db: Database, insn: idaapi.insn_t) -> bool:
"""verify instruction moves immediate onto stack"""
if insn.Op2.type != idaapi.o_imm:
return False

if not helpers.is_op_stack_var(insn.ea, 0):
return False

if not insn.get_canon_mnem().startswith("mov"):
mnem = db.instructions.get_mnemonic(insn)
if not mnem.startswith("mov"):
return False

return True


def bb_contains_stackstring(f: idaapi.func_t, bb: idaapi.BasicBlock) -> bool:
def bb_contains_stackstring(db: Database, f: idaapi.func_t, bb: idaapi.BasicBlock) -> bool:
"""check basic block for stackstring indicators

true if basic block contains enough moves of constant bytes to the stack
"""
count = 0
for insn in capa.features.extractors.ida.helpers.get_instructions_in_range(bb.start_ea, bb.end_ea):
if is_mov_imm_to_stack(insn):
for insn in capa.features.extractors.ida.helpers.get_instructions_in_range(db, bb.start_ea, bb.end_ea):
if is_mov_imm_to_stack(db, insn):
count += get_printable_len(insn.Op2)
if count > MIN_STACKSTRING_LEN:
return True
return False


def extract_bb_stackstring(fh: FunctionHandle, bbh: BBHandle) -> Iterator[tuple[Feature, Address]]:
def extract_bb_stackstring(db: Database, fh: FunctionHandle, bbh: BBHandle) -> Iterator[tuple[Feature, Address]]:
"""extract stackstring indicators from basic block"""
if bb_contains_stackstring(fh.inner, bbh.inner):
if bb_contains_stackstring(db, fh.inner, bbh.inner):
yield Characteristic("stack string"), bbh.address


def extract_bb_tight_loop(fh: FunctionHandle, bbh: BBHandle) -> Iterator[tuple[Feature, Address]]:
def extract_bb_tight_loop(db: Database, fh: FunctionHandle, bbh: BBHandle) -> Iterator[tuple[Feature, Address]]:
"""extract tight loop indicators from a basic block"""
if capa.features.extractors.ida.helpers.is_basic_block_tight_loop(bbh.inner):
if capa.features.extractors.ida.helpers.is_basic_block_tight_loop(db, bbh.inner):
yield Characteristic("tight loop"), bbh.address


def extract_features(fh: FunctionHandle, bbh: BBHandle) -> Iterator[tuple[Feature, Address]]:
def extract_features(db: Database, fh: FunctionHandle, bbh: BBHandle) -> Iterator[tuple[Feature, Address]]:
"""extract basic block features"""
for bb_handler in BASIC_BLOCK_HANDLERS:
for feature, addr in bb_handler(fh, bbh):
for feature, addr in bb_handler(db, fh, bbh):
yield feature, addr
yield BasicBlock(), bbh.address

Expand Down
49 changes: 31 additions & 18 deletions capa/features/extractors/ida/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
# limitations under the License.

from typing import Iterator
from pathlib import Path

import idaapi
from ida_domain import Database

import capa.ida.helpers
import capa.features.extractors.elf
Expand All @@ -35,56 +36,68 @@


class IdaFeatureExtractor(StaticFeatureExtractor):
def __init__(self):
def __init__(self, db: Database):
self.db = db
super().__init__(
hashes=SampleHashes(
md5=capa.ida.helpers.retrieve_input_file_md5(),
md5=db.md5,
sha1="(unknown)",
sha256=capa.ida.helpers.retrieve_input_file_sha256(),
sha256=db.sha256,
)
)
self.global_features: list[tuple[Feature, Address]] = []
self.global_features.extend(capa.features.extractors.ida.file.extract_file_format())
self.global_features.extend(capa.features.extractors.ida.global_.extract_os())
self.global_features.extend(capa.features.extractors.ida.global_.extract_arch())
self.global_features.extend(capa.features.extractors.ida.file.extract_file_format(self.db))
self.global_features.extend(capa.features.extractors.ida.global_.extract_os(self.db))
self.global_features.extend(capa.features.extractors.ida.global_.extract_arch(self.db))

@classmethod
def from_current_database(cls) -> "IdaFeatureExtractor":
"""Create extractor for interactive IDA GUI use."""
db = Database.open()
return cls(db)

@classmethod
def from_file(cls, path: Path) -> "IdaFeatureExtractor":
"""Create extractor for idalib/headless use."""
db = Database.open(str(path))
return cls(db)

def get_base_address(self):
return AbsoluteVirtualAddress(idaapi.get_imagebase())
return AbsoluteVirtualAddress(self.db.base_address)

def extract_global_features(self):
yield from self.global_features

def extract_file_features(self):
yield from capa.features.extractors.ida.file.extract_features()
yield from capa.features.extractors.ida.file.extract_features(self.db)

def get_functions(self) -> Iterator[FunctionHandle]:
import capa.features.extractors.ida.helpers as ida_helpers

# ignore library functions and thunk functions as identified by IDA
yield from ida_helpers.get_functions(skip_thunks=True, skip_libs=True)
yield from ida_helpers.get_functions(self.db, skip_thunks=True, skip_libs=True)

@staticmethod
def get_function(ea: int) -> FunctionHandle:
f = idaapi.get_func(ea)
def get_function(self, ea: int) -> FunctionHandle:
f = self.db.functions.get_at(ea)
return FunctionHandle(address=AbsoluteVirtualAddress(f.start_ea), inner=f)

def extract_function_features(self, fh: FunctionHandle) -> Iterator[tuple[Feature, Address]]:
yield from capa.features.extractors.ida.function.extract_features(fh)
yield from capa.features.extractors.ida.function.extract_features(self.db, fh)

def get_basic_blocks(self, fh: FunctionHandle) -> Iterator[BBHandle]:
import capa.features.extractors.ida.helpers as ida_helpers

for bb in ida_helpers.get_function_blocks(fh.inner):
for bb in ida_helpers.get_function_blocks(self.db, fh.inner):
yield BBHandle(address=AbsoluteVirtualAddress(bb.start_ea), inner=bb)

def extract_basic_block_features(self, fh: FunctionHandle, bbh: BBHandle) -> Iterator[tuple[Feature, Address]]:
yield from capa.features.extractors.ida.basicblock.extract_features(fh, bbh)
yield from capa.features.extractors.ida.basicblock.extract_features(self.db, fh, bbh)

def get_instructions(self, fh: FunctionHandle, bbh: BBHandle) -> Iterator[InsnHandle]:
import capa.features.extractors.ida.helpers as ida_helpers

for insn in ida_helpers.get_instructions_in_range(bbh.inner.start_ea, bbh.inner.end_ea):
for insn in ida_helpers.get_instructions_in_range(self.db, bbh.inner.start_ea, bbh.inner.end_ea):
yield InsnHandle(address=AbsoluteVirtualAddress(insn.ea), inner=insn)

def extract_insn_features(self, fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle):
yield from capa.features.extractors.ida.insn.extract_features(fh, bbh, ih)
yield from capa.features.extractors.ida.insn.extract_features(self.db, fh, bbh, ih)
106 changes: 53 additions & 53 deletions capa/features/extractors/ida/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
import struct
from typing import Iterator

import idc
import idaapi
import idautils
import ida_entry
from ida_domain import Database
from ida_domain.functions import FunctionFlags

import capa.ida.helpers
import capa.features.extractors.common
Expand All @@ -33,7 +32,7 @@
MAX_OFFSET_PE_AFTER_MZ = 0x200


def check_segment_for_pe(seg: idaapi.segment_t) -> Iterator[tuple[int, int]]:
def check_segment_for_pe(db: Database, seg) -> Iterator[tuple[int, int]]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The seg parameter in check_segment_for_pe is missing a type hint. It should likely be ida_domain.segments.Segment for clarity and type safety.

"""check segment for embedded PE

adapted for IDA from:
Expand All @@ -51,8 +50,7 @@ def check_segment_for_pe(seg: idaapi.segment_t) -> Iterator[tuple[int, int]]:

todo = []
for mzx, pex, i in mz_xor:
# find all segment offsets containing XOR'd "MZ" bytes
for off in capa.features.extractors.ida.helpers.find_byte_sequence(seg.start_ea, seg.end_ea, mzx):
for off in capa.features.extractors.ida.helpers.find_byte_sequence(db, seg.start_ea, seg.end_ea, mzx):
todo.append((off, mzx, pex, i))

while len(todo):
Expand All @@ -64,45 +62,47 @@ def check_segment_for_pe(seg: idaapi.segment_t) -> Iterator[tuple[int, int]]:
if seg_max < (e_lfanew + 4):
continue

newoff = struct.unpack("<I", capa.features.extractors.helpers.xor_static(idc.get_bytes(e_lfanew, 4), i))[0]
raw_bytes = db.bytes.get_bytes_at(e_lfanew, 4)
if not raw_bytes:
continue
newoff = struct.unpack("<I", capa.features.extractors.helpers.xor_static(raw_bytes, i))[0]

# assume XOR'd "PE" bytes exist within threshold
if newoff > MAX_OFFSET_PE_AFTER_MZ:
continue

peoff = off + newoff
if seg_max < (peoff + 2):
continue

if idc.get_bytes(peoff, 2) == pex:
pe_bytes = db.bytes.get_bytes_at(peoff, 2)
if pe_bytes == pex:
yield off, i


def extract_file_embedded_pe() -> Iterator[tuple[Feature, Address]]:
def extract_file_embedded_pe(db: Database) -> Iterator[tuple[Feature, Address]]:
"""extract embedded PE features

IDA must load resource sections for this to be complete
- '-R' from console
- Check 'Load resource sections' when opening binary in IDA manually
"""
for seg in capa.features.extractors.ida.helpers.get_segments(skip_header_segments=True):
for ea, _ in check_segment_for_pe(seg):
for seg in capa.features.extractors.ida.helpers.get_segments(db, skip_header_segments=True):
for ea, _ in check_segment_for_pe(db, seg):
yield Characteristic("embedded pe"), FileOffsetAddress(ea)


def extract_file_export_names() -> Iterator[tuple[Feature, Address]]:
def extract_file_export_names(db: Database) -> Iterator[tuple[Feature, Address]]:
"""extract function exports"""
for _, ordinal, ea, name in idautils.Entries():
forwarded_name = ida_entry.get_entry_forwarder(ordinal)
if forwarded_name is None:
yield Export(name), AbsoluteVirtualAddress(ea)
for entry in db.entries.get_all():
if entry.has_forwarder():
forwarded_name = capa.features.extractors.helpers.reformat_forwarded_export_name(entry.forwarder_name)
yield Export(forwarded_name), AbsoluteVirtualAddress(entry.address)
yield Characteristic("forwarded export"), AbsoluteVirtualAddress(entry.address)
else:
forwarded_name = capa.features.extractors.helpers.reformat_forwarded_export_name(forwarded_name)
yield Export(forwarded_name), AbsoluteVirtualAddress(ea)
yield Characteristic("forwarded export"), AbsoluteVirtualAddress(ea)
yield Export(entry.name), AbsoluteVirtualAddress(entry.address)


def extract_file_import_names() -> Iterator[tuple[Feature, Address]]:
def extract_file_import_names(db: Database) -> Iterator[tuple[Feature, Address]]:
"""extract function imports

1. imports by ordinal:
Expand All @@ -113,7 +113,7 @@ def extract_file_import_names() -> Iterator[tuple[Feature, Address]]:
- modulename.importname
- importname
"""
for ea, info in capa.features.extractors.ida.helpers.get_file_imports().items():
for ea, info in capa.features.extractors.ida.helpers.get_file_imports(db).items():
addr = AbsoluteVirtualAddress(ea)
if info[1] and info[2]:
# e.g. in mimikatz: ('cabinet', 'FCIAddFile', 11L)
Expand All @@ -134,30 +134,31 @@ def extract_file_import_names() -> Iterator[tuple[Feature, Address]]:
for name in capa.features.extractors.helpers.generate_symbols(dll, symbol, include_dll=True):
yield Import(name), addr

for ea, info in capa.features.extractors.ida.helpers.get_file_externs().items():
for ea, info in capa.features.extractors.ida.helpers.get_file_externs(db).items():
yield Import(info[1]), AbsoluteVirtualAddress(ea)


def extract_file_section_names() -> Iterator[tuple[Feature, Address]]:
def extract_file_section_names(db: Database) -> Iterator[tuple[Feature, Address]]:
"""extract section names

IDA must load resource sections for this to be complete
- '-R' from console
- Check 'Load resource sections' when opening binary in IDA manually
"""
for seg in capa.features.extractors.ida.helpers.get_segments(skip_header_segments=True):
yield Section(idaapi.get_segm_name(seg)), AbsoluteVirtualAddress(seg.start_ea)
for seg in capa.features.extractors.ida.helpers.get_segments(db, skip_header_segments=True):
name = db.segments.get_name(seg)
yield Section(name), AbsoluteVirtualAddress(seg.start_ea)


def extract_file_strings() -> Iterator[tuple[Feature, Address]]:
def extract_file_strings(db: Database) -> Iterator[tuple[Feature, Address]]:
"""extract ASCII and UTF-16 LE strings

IDA must load resource sections for this to be complete
- '-R' from console
- Check 'Load resource sections' when opening binary in IDA manually
"""
for seg in capa.features.extractors.ida.helpers.get_segments():
seg_buff = capa.features.extractors.ida.helpers.get_segment_buffer(seg)
for seg in capa.features.extractors.ida.helpers.get_segments(db):
seg_buff = capa.features.extractors.ida.helpers.get_segment_buffer(db, seg)

# differing to common string extractor factor in segment offset here
for s in capa.features.extractors.strings.extract_ascii_strings(seg_buff):
Expand All @@ -167,41 +168,40 @@ def extract_file_strings() -> Iterator[tuple[Feature, Address]]:
yield String(s.s), FileOffsetAddress(seg.start_ea + s.offset)


def extract_file_function_names() -> Iterator[tuple[Feature, Address]]:
"""
extract the names of statically-linked library functions.
"""
for ea in idautils.Functions():
addr = AbsoluteVirtualAddress(ea)
if idaapi.get_func(ea).flags & idaapi.FUNC_LIB:
name = idaapi.get_name(ea)
yield FunctionName(name), addr
if name.startswith("_"):
# some linkers may prefix linked routines with a `_` to avoid name collisions.
# extract features for both the mangled and un-mangled representations.
# e.g. `_fwrite` -> `fwrite`
# see: https://stackoverflow.com/a/2628384/87207
yield FunctionName(name[1:]), addr
def extract_file_function_names(db: Database) -> Iterator[tuple[Feature, Address]]:
"""extract the names of statically-linked library functions."""
for f in db.functions.get_all():
flags = db.functions.get_flags(f)
if flags & FunctionFlags.LIB:
addr = AbsoluteVirtualAddress(f.start_ea)
name = db.names.get_at(f.start_ea)
if name:
yield FunctionName(name), addr
if name.startswith("_"):
# some linkers may prefix linked routines with a `_` to avoid name collisions.
# extract features for both the mangled and un-mangled representations.
# e.g. `_fwrite` -> `fwrite`
# see: https://stackoverflow.com/a/2628384/87207
yield FunctionName(name[1:]), addr


def extract_file_format() -> Iterator[tuple[Feature, Address]]:
filetype = capa.ida.helpers.get_filetype()
def extract_file_format(db: Database) -> Iterator[tuple[Feature, Address]]:
format_name = db.format

if filetype in (idaapi.f_PE, idaapi.f_COFF):
if "PE" in format_name or "COFF" in format_name:
yield Format(FORMAT_PE), NO_ADDRESS
elif filetype == idaapi.f_ELF:
elif "ELF" in format_name:
yield Format(FORMAT_ELF), NO_ADDRESS
elif filetype == idaapi.f_BIN:
# no file type to return when processing a binary file, but we want to continue processing
elif "Binary" in format_name:
return
else:
raise NotImplementedError(f"unexpected file format: {filetype}")
raise NotImplementedError(f"unexpected file format: {format_name}")


def extract_features() -> Iterator[tuple[Feature, Address]]:
def extract_features(db: Database) -> Iterator[tuple[Feature, Address]]:
"""extract file features"""
for file_handler in FILE_HANDLERS:
for feature, addr in file_handler():
for feature, addr in file_handler(db):
yield feature, addr


Expand Down
Loading
Loading