Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion pcapng/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,59 @@ class FileScanner(object):
just wrap it in a :py:class:`io.BytesIO` object.
"""

__slots__ = ["stream", "current_section", "endianness"]
__slots__ = [
"stream",
"current_section",
"endianness",
"_stashed_header",
"_last_header_offset",
]

def __init__(self, stream):
self.stream = stream
self.current_section = None
self.endianness = "="

self._last_header_offset = 0
self._stashed_header = None

def __iter__(self):
while True:
try:
if self._stashed_header:
yield self._stashed_header
self._stashed_header = None
yield self._read_next_block()
except StreamEmpty:
return

def skip_section(self):
"""
Skip the current section.
"""
if (
self.stream.seekable()
and self.current_section
and self.current_section.length != -1
):
# seek
self.stream.seek(self._last_header_offset + self.current_section.length)
else:
# iterate till next section
for block in self:
if isinstance(block, blocks.SectionHeader):
self._stashed_header = block
break

def _read_next_block(self):
block_type = self._read_int(32, False)

if block_type == SECTION_HEADER_MAGIC:
block = self._read_section_header()
self.current_section = block
self.endianness = block.endianness
if self.stream.seekable():
self._last_header_offset = self.stream.tell()
return block

if self.current_section is None:
Expand Down
2 changes: 1 addition & 1 deletion pcapng/structs.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ def read_bytes_padded(stream, size, pad_block_size=4):
were read
"""

if stream.tell() % pad_block_size != 0:
if stream.seekable() and (stream.tell() % pad_block_size != 0):
raise RuntimeError("Stream is misaligned!")

data = read_bytes(stream, size)
Expand Down
Binary file added test_data/explicit_shb_sec_length.ntar
Binary file not shown.
Binary file added test_data/implicit_shb_sec_length.ntar
Binary file not shown.
81 changes: 81 additions & 0 deletions tests/test_parse_wireshark_capture_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,84 @@ def test_sample_test010_ntar():
scanner = FileScanner(fp)
for entry in scanner:
pass


def test_skip_section_explicit_length():
"""
Test section skipping when the section is explicitly sized.

The pcapng used here is laid out as follows:
SectionHeader with explicit length of 144
InterfaceDescription
EnhancedPacket
SectionHeader with implicit length
InterfaceDescription
EnhancedPacket

Note that the only difference between the two sections
is the explicit sizing in the first section.
"""
with open("test_data/explicit_shb_sec_length.ntar", "rb") as fp:
scanner = FileScanner(fp)
iter_scanner = iter(scanner)
first_shb = next(iter_scanner)
assert first_shb.length == 144
# Read another block so we're in the middle of the section
next(iter_scanner)
scanner.skip_section()
block = next(iter_scanner)
assert isinstance(block, SectionHeader)
assert block.length == -1


def test_skip_section_implicit_length():
"""
Test section skipping when the section is implicitly sized.

The pcapng used here is laid out as follows:
SectionHeader with implicit length
InterfaceDescription
EnhancedPacket
SectionHeader with explicit length of 144
InterfaceDescription
EnhancedPacket

Note that the only difference between the two sections
is the implicit sizing in the first section.
"""
with open("test_data/implicit_shb_sec_length.ntar", "rb") as fp:
scanner = FileScanner(fp)
iter_scanner = iter(scanner)
first_shb = next(iter_scanner)
assert first_shb.length == -1
# Read another block so we're in the middle of the section
next(iter_scanner)
scanner.skip_section()
block = next(iter_scanner)
assert isinstance(block, SectionHeader)
assert block.length == 144


def test_skip_section_non_seekable():
"""
Test section skipping with non-seekable streams
"""

def throw_err():
raise NotImplementedError

with open("test_data/explicit_shb_sec_length.ntar", "rb") as fp:
fp.seekable = lambda: False
fp.tell = throw_err
fp.seek = throw_err

scanner = FileScanner(fp)
iter_scanner = iter(scanner)
first_shb = next(iter_scanner)
assert first_shb.length == 144
# Read another block so we're in the middle of the section
next(iter_scanner)
scanner.skip_section()
block = next(iter_scanner)
assert isinstance(block, SectionHeader)
assert block.length == -1