Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 221 additions & 30 deletions mapillary_tools/geotag/gpmf_parser.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import dataclasses
import io
import itertools
import pathlib
import typing as T

import construct as C

from .. import geo
from ..mp4 import mp4_sample_parser as sample_parser
from .. import geo, imu
from ..mp4.mp4_sample_parser import TrackBoxParser, MovieBoxParser, Sample

"""
Parsing GPS from GPMF data format stored in GoPros. See the GPMF spec: https://github.com/gopro/gpmf-parser
Expand Down Expand Up @@ -125,6 +127,14 @@ class KLVDict(T.TypedDict):
GPMFSampleData = C.GreedyRange(KLV)


@dataclasses.dataclass
class TelemetryData:
gps: T.List[geo.PointWithFix]
accl: T.List[imu.AccelerationData]
gyro: T.List[imu.GyroscopeData]
magn: T.List[imu.MagnetometerData]


# A GPS5 stream example:
# key = b'STRM' type = b'\x00' structure_size = 1 repeat = 400
# data = ListContainer:
Expand Down Expand Up @@ -298,8 +308,120 @@ def _find_first_gps_stream(stream: T.Sequence[KLVDict]) -> T.List[geo.PointWithF
return sample_points


# a sensor matrix with only [1,0,0, 0,-1,0, 0,0,1], is just a form of non-calibrated sensor orientation
def _is_matrix_calibration(matrix: T.Sequence[float]) -> bool:
for v in matrix:
if v not in [0, -1, 1]:
return True
return False


def _build_matrix(
orin: T.Union[bytes, T.Sequence[int]], orio: T.Union[bytes, T.Sequence[int]]
) -> T.Sequence[float]:
matrix = []

# list(b'aA') == [97, 65]
lower_a, upper_A = 97, 65

for out_char in orin:
for in_char in orio:
if in_char == out_char:
matrix.append(1.0)
elif (in_char - lower_a) == (out_char - upper_A):
matrix.append(-1.0)
elif (in_char - upper_A) == (out_char - lower_a):
matrix.append(-1.0)
else:
matrix.append(0.0)

return matrix


def _apply_matrix(
matrix: T.Sequence[float], values: T.Sequence[float]
) -> T.Generator[float, None, None]:
size = len(values)
assert (
len(matrix) == size * size
), f"expecting a square matrix of size {size} x {size} but got {len(matrix)}"

for y in range(size):
row_start = y * size
yield sum(matrix[row_start + x] * values[x] for x in range(size))


def _flatten(nested: T.Sequence[T.Sequence[float]]) -> T.List[float]:
output: T.List[float] = []
for row in nested:
output.extend(row)
return output


def _get_matrix(klv: T.Dict[bytes, KLVDict]) -> T.Optional[T.Sequence[float]]:
mtrx = klv.get(b"MTRX")
if mtrx is not None:
matrix: T.Sequence[float] = _flatten(mtrx["data"])
if _is_matrix_calibration(matrix):
return matrix

orin = klv.get(b"ORIN")
orio = klv.get(b"ORIO")

if orin is not None and orio is not None:
matrix = _build_matrix(b"".join(orin["data"]), b"".join(orio["data"]))
return matrix

return None


def _scale_and_calibrate(
stream: T.Sequence[KLVDict], key: bytes
) -> T.Generator[T.Sequence[float], None, None]:
indexed: T.Dict[bytes, KLVDict] = {klv["key"]: klv for klv in stream}

klv = indexed.get(key)
if klv is None:
return

scal_klv = indexed.get(b"SCAL")

if scal_klv is not None:
# replace 0s with 1s to avoid division by zero
scals = [s or 1 for s in _flatten(scal_klv["data"])]

if not scals:
scals = [1]

if len(scals) == 1:
# infinite repeat
scales: T.Iterable[float] = itertools.repeat(scals[0])
else:
scales = scals

matrix = _get_matrix(indexed)

for values in klv["data"]:
if matrix is None:
yield tuple(v / s for v, s in zip(values, scales))
else:
yield tuple(v / s for v, s in zip(_apply_matrix(matrix, values), scales))


def _find_first_telemetry_stream(stream: T.Sequence[KLVDict], key: bytes):
values: T.List[T.Sequence[float]] = []

for klv in stream:
if klv["key"] == b"STRM":
values = list(_scale_and_calibrate(klv["data"], key))
if values:
break

return values


def _extract_dvnm_from_samples(
fp: T.BinaryIO, samples: T.Iterable[sample_parser.Sample]
fp: T.BinaryIO, samples: T.Iterable[Sample]
) -> T.Dict[int, bytes]:
dvnm_by_dvid: T.Dict[int, bytes] = {}

Expand All @@ -322,10 +444,13 @@ def _extract_dvnm_from_samples(


def _extract_points_from_samples(
fp: T.BinaryIO, samples: T.Iterable[sample_parser.Sample]
) -> T.List[geo.PointWithFix]:
fp: T.BinaryIO, samples: T.Iterable[Sample]
) -> TelemetryData:
# To keep GPS points from different devices separated
points_by_dvid: T.Dict[int, T.List[geo.PointWithFix]] = {}
accls_by_dvid: T.Dict[int, T.List[imu.AccelerationData]] = {}
gyros_by_dvid: T.Dict[int, T.List[imu.GyroscopeData]] = {}
magns_by_dvid: T.Dict[int, T.List[imu.MagnetometerData]] = {}

for sample in samples:
fp.seek(sample.raw_sample.offset, io.SEEK_SET)
Expand All @@ -335,58 +460,124 @@ def _extract_points_from_samples(
# iterate devices
devices = (klv for klv in gpmf_sample_data if klv["key"] == b"DEVC")
for device in devices:
device_id = _find_first_device_id(device["data"])

sample_points = _find_first_gps_stream(device["data"])
if sample_points:
# interpolate timestamps in between
avg_timedelta = sample.exact_timedelta / len(sample_points)
for idx, point in enumerate(sample_points):
point.time = sample.exact_time + avg_timedelta * idx

device_id = _find_first_device_id(device["data"])
device_points = points_by_dvid.setdefault(device_id, [])
device_points.extend(sample_points)

values = list(points_by_dvid.values())
return values[0] if values else []
sample_accls = _find_first_telemetry_stream(device["data"], b"ACCL")
if sample_accls:
# interpolate timestamps in between
avg_delta = sample.exact_timedelta / len(sample_accls)
accls_by_dvid.setdefault(device_id, []).extend(
imu.AccelerationData(
time=sample.exact_time + avg_delta * idx,
x=x,
y=y,
z=z,
)
for idx, (z, x, y, *_) in enumerate(sample_accls)
)

sample_gyros = _find_first_telemetry_stream(device["data"], b"GYRO")
if sample_gyros:
# interpolate timestamps in between
avg_delta = sample.exact_timedelta / len(sample_gyros)
gyros_by_dvid.setdefault(device_id, []).extend(
imu.GyroscopeData(
time=sample.exact_time + avg_delta * idx,
x=x,
y=y,
z=z,
)
for idx, (z, x, y, *_) in enumerate(sample_gyros)
)

sample_magns = _find_first_telemetry_stream(device["data"], b"MAGN")
if sample_magns:
# interpolate timestamps in between
avg_delta = sample.exact_timedelta / len(sample_magns)
magns_by_dvid.setdefault(device_id, []).extend(
imu.MagnetometerData(
time=sample.exact_time + avg_delta * idx,
x=x,
y=y,
z=z,
)
for idx, (z, x, y, *_) in enumerate(sample_magns)
)

return TelemetryData(
gps=list(points_by_dvid.values())[0] if points_by_dvid else [],
accl=list(accls_by_dvid.values())[0] if accls_by_dvid else [],
gyro=list(gyros_by_dvid.values())[0] if gyros_by_dvid else [],
magn=list(magns_by_dvid.values())[0] if magns_by_dvid else [],
)


def _is_gpmd_description(description: T.Dict) -> bool:
return description["format"] == b"gpmd"


def extract_points(fp: T.BinaryIO) -> T.Optional[T.List[geo.PointWithFix]]:
def _contains_gpmd_description(track: TrackBoxParser) -> bool:
descriptions = track.extract_sample_descriptions()
return any(_is_gpmd_description(d) for d in descriptions)


def _filter_gpmd_samples(track: TrackBoxParser) -> T.Generator[Sample, None, None]:
for sample in track.extract_samples():
if _is_gpmd_description(sample.description):
yield sample


def extract_points(fp: T.BinaryIO) -> T.List[geo.PointWithFix]:
"""
Return a list of points (could be empty) if it is a valid GoPro video,
otherwise None
"""
points = None
moov = sample_parser.MovieBoxParser.parse_stream(fp)
moov = MovieBoxParser.parse_stream(fp)
for track in moov.extract_tracks():
descriptions = track.extract_sample_descriptions()
if any(_is_gpmd_description(d) for d in descriptions):
gpmd_samples = (
sample
for sample in track.extract_samples()
if _is_gpmd_description(sample.description)
)
points = list(_extract_points_from_samples(fp, gpmd_samples))
if _contains_gpmd_description(track):
gpmd_samples = _filter_gpmd_samples(track)
telemetry = _extract_points_from_samples(fp, gpmd_samples)
# return the firstly found non-empty points
if points:
return points
if telemetry.gps:
return telemetry.gps

# points could be empty list or None here
return points
return []


def extract_telemetry_data(fp: T.BinaryIO) -> T.Optional[TelemetryData]:
"""
Return the telemetry data from the first found GoPro GPMF track
"""
moov = MovieBoxParser.parse_stream(fp)

for track in moov.extract_tracks():
if _contains_gpmd_description(track):
gpmd_samples = _filter_gpmd_samples(track)
telemetry = _extract_points_from_samples(fp, gpmd_samples)
# return the firstly found non-empty points
if telemetry.gps:
return telemetry

# points could be empty list or None here
return None


def extract_all_device_names(fp: T.BinaryIO) -> T.Dict[int, bytes]:
moov = sample_parser.MovieBoxParser.parse_stream(fp)
moov = MovieBoxParser.parse_stream(fp)
for track in moov.extract_tracks():
descriptions = track.extract_sample_descriptions()
if any(_is_gpmd_description(d) for d in descriptions):
gpmd_samples = (
sample
for sample in track.extract_samples()
if _is_gpmd_description(sample.description)
)
if _contains_gpmd_description(track):
gpmd_samples = _filter_gpmd_samples(track)
device_names = _extract_dvnm_from_samples(fp, gpmd_samples)
if device_names:
return device_names
Expand Down
25 changes: 25 additions & 0 deletions mapillary_tools/imu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import typing as T


# Gyroscope signal in radians/seconds around XYZ axes of the camera. Rotation is positive in the counterclockwise direction.
class GyroscopeData(T.NamedTuple):
time: float
x: float
y: float
z: float


# Accelerometer reading in meters/second^2 along XYZ axes of the camera.
class AccelerationData(T.NamedTuple):
time: float
x: float
y: float
z: float


# Ambient magnetic field.
class MagnetometerData(T.NamedTuple):
time: float
x: float
y: float
z: float
Loading
Loading