diff --git a/mapillary_tools/constants.py b/mapillary_tools/constants.py index fc1fc400..ab6ce003 100644 --- a/mapillary_tools/constants.py +++ b/mapillary_tools/constants.py @@ -1,5 +1,6 @@ from __future__ import annotations +import functools import os import appdirs @@ -8,44 +9,92 @@ def _yes_or_no(val: str) -> bool: - return val.strip().upper() in [ - "1", - "TRUE", - "YES", - ] + return val.strip().upper() in ["1", "TRUE", "YES"] -# In meters -CUTOFF_DISTANCE = float(os.getenv(_ENV_PREFIX + "CUTOFF_DISTANCE", 600)) +def _parse_scaled_integers( + value: str, scale: dict[str, int] | None = None +) -> int | None: + """ + >>> scale = {"": 1, "b": 1, "K": 1024, "M": 1024 * 1024, "G": 1024 * 1024 * 1024} + >>> _parse_scaled_integers("0", scale=scale) + 0 + >>> _parse_scaled_integers("10", scale=scale) + 10 + >>> _parse_scaled_integers("100B", scale=scale) + 100 + >>> _parse_scaled_integers("100k", scale=scale) + 102400 + >>> _parse_scaled_integers("100t", scale=scale) + Traceback (most recent call last): + ValueError: Expect valid integer ends with , b, K, M, G, but got 100T + """ + + if scale is None: + scale = {"": 1} + + value = value.strip().upper() + + if value in ["INF", "INFINITY"]: + return None + + try: + for k, v in scale.items(): + k = k.upper() + if k and value.endswith(k): + return int(value[: -len(k)]) * v + + if "" in scale: + return int(value) * scale[""] + except ValueError: + pass + + raise ValueError( + f"Expect valid integer ends with {', '.join(scale.keys())}, but got {value}" + ) + + +_parse_pixels = functools.partial( + _parse_scaled_integers, + scale={ + "": 1, + "K": 1000, + "M": 1000 * 1000, + "MP": 1000 * 1000, + "G": 1000 * 1000 * 1000, + "GP": 1000 * 1000 * 1000, + }, +) + +_parse_filesize = functools.partial( + _parse_scaled_integers, + scale={"B": 1, "K": 1024, "M": 1024 * 1024, "G": 1024 * 1024 * 1024}, +) + +################### +##### GENERAL ##### +################### +USER_DATA_DIR = appdirs.user_data_dir(appname="mapillary_tools", appauthor="Mapillary") +PROMPT_DISABLED: bool = _yes_or_no(os.getenv(_ENV_PREFIX + "PROMPT_DISABLED", "NO")) + + +############################ +##### VIDEO PROCESSING ##### +############################ # In seconds -CUTOFF_TIME = float(os.getenv(_ENV_PREFIX + "CUTOFF_TIME", 60)) -DUPLICATE_DISTANCE = float(os.getenv(_ENV_PREFIX + "DUPLICATE_DISTANCE", 0.1)) -DUPLICATE_ANGLE = float(os.getenv(_ENV_PREFIX + "DUPLICATE_ANGLE", 5)) -MAX_AVG_SPEED = float( - os.getenv(_ENV_PREFIX + "MAX_AVG_SPEED", 400_000 / 3600) -) # 400 KM/h -# in seconds VIDEO_SAMPLE_INTERVAL = float(os.getenv(_ENV_PREFIX + "VIDEO_SAMPLE_INTERVAL", -1)) -# in meters +# In meters VIDEO_SAMPLE_DISTANCE = float(os.getenv(_ENV_PREFIX + "VIDEO_SAMPLE_DISTANCE", 3)) VIDEO_DURATION_RATIO = float(os.getenv(_ENV_PREFIX + "VIDEO_DURATION_RATIO", 1)) FFPROBE_PATH: str = os.getenv(_ENV_PREFIX + "FFPROBE_PATH", "ffprobe") FFMPEG_PATH: str = os.getenv(_ENV_PREFIX + "FFMPEG_PATH", "ffmpeg") -# When not set, MT will try to check both "exiftool" and "exiftool.exe" from $PATH -EXIFTOOL_PATH: str | None = os.getenv(_ENV_PREFIX + "EXIFTOOL_PATH") +EXIFTOOL_PATH: str = os.getenv(_ENV_PREFIX + "EXIFTOOL_PATH", "exiftool") IMAGE_DESCRIPTION_FILENAME = os.getenv( _ENV_PREFIX + "IMAGE_DESCRIPTION_FILENAME", "mapillary_image_description.json" ) SAMPLED_VIDEO_FRAMES_FILENAME = os.getenv( _ENV_PREFIX + "SAMPLED_VIDEO_FRAMES_FILENAME", "mapillary_sampled_video_frames" ) -USER_DATA_DIR = appdirs.user_data_dir(appname="mapillary_tools", appauthor="Mapillary") -# The chunk size in MB (see chunked transfer encoding https://en.wikipedia.org/wiki/Chunked_transfer_encoding) -# for uploading data to MLY upload service. -# Changing this size does not change the number of requests nor affect upload performance, -# but it affects the responsiveness of the upload progress bar -UPLOAD_CHUNK_SIZE_MB = float(os.getenv(_ENV_PREFIX + "UPLOAD_CHUNK_SIZE_MB", 1)) - # DoP value, the lower the better # See https://github.com/gopro/gpmf-parser#hero5-black-with-gps-enabled-adds # It is used to filter out noisy points @@ -54,40 +103,61 @@ def _yes_or_no(val: str) -> bool: GOPRO_GPS_FIXES: set[int] = set( int(fix) for fix in os.getenv(_ENV_PREFIX + "GOPRO_GPS_FIXES", "2,3").split(",") ) -MAX_UPLOAD_RETRIES: int = int(os.getenv(_ENV_PREFIX + "MAX_UPLOAD_RETRIES", 200)) - # GPS precision, in meters, is used to filter outliers GOPRO_GPS_PRECISION = float(os.getenv(_ENV_PREFIX + "GOPRO_GPS_PRECISION", 15)) +MAPILLARY__EXPERIMENTAL_ENABLE_IMU: bool = _yes_or_no( + os.getenv("MAPILLARY__EXPERIMENTAL_ENABLE_IMU", "NO") +) + +################################# +###### SEQUENCE PROCESSING ###### +################################# +# In meters +CUTOFF_DISTANCE = float(os.getenv(_ENV_PREFIX + "CUTOFF_DISTANCE", 600)) +# In seconds +CUTOFF_TIME = float(os.getenv(_ENV_PREFIX + "CUTOFF_TIME", 60)) +DUPLICATE_DISTANCE = float(os.getenv(_ENV_PREFIX + "DUPLICATE_DISTANCE", 0.1)) +DUPLICATE_ANGLE = float(os.getenv(_ENV_PREFIX + "DUPLICATE_ANGLE", 5)) +MAX_AVG_SPEED = float( + os.getenv(_ENV_PREFIX + "MAX_AVG_SPEED", 400_000 / 3600) +) # 400 KM/h # WARNING: Changing the following envvars might result in failed uploads # Max number of images per sequence -MAX_SEQUENCE_LENGTH = int(os.getenv(_ENV_PREFIX + "MAX_SEQUENCE_LENGTH", 1000)) +MAX_SEQUENCE_LENGTH: int | None = _parse_scaled_integers( + os.getenv(_ENV_PREFIX + "MAX_SEQUENCE_LENGTH", "1000") +) # Max file size per sequence (sum of image filesizes in the sequence) -MAX_SEQUENCE_FILESIZE: str = os.getenv(_ENV_PREFIX + "MAX_SEQUENCE_FILESIZE", "110G") +MAX_SEQUENCE_FILESIZE: int | None = _parse_filesize( + os.getenv(_ENV_PREFIX + "MAX_SEQUENCE_FILESIZE", "110G") +) # Max number of pixels per sequence (sum of image pixels in the sequence) -MAX_SEQUENCE_PIXELS: str = os.getenv(_ENV_PREFIX + "MAX_SEQUENCE_PIXELS", "6G") - -PROMPT_DISABLED: bool = _yes_or_no(os.getenv(_ENV_PREFIX + "PROMPT_DISABLED", "NO")) - -_AUTH_VERIFICATION_DISABLED: bool = _yes_or_no( - os.getenv(_ENV_PREFIX + "_AUTH_VERIFICATION_DISABLED", "NO") +MAX_SEQUENCE_PIXELS: int | None = _parse_pixels( + os.getenv(_ENV_PREFIX + "MAX_SEQUENCE_PIXELS", "6G") ) + +################## +##### UPLOAD ##### +################## MAPILLARY_DISABLE_API_LOGGING: bool = _yes_or_no( os.getenv("MAPILLARY_DISABLE_API_LOGGING", "NO") ) +MAPILLARY_UPLOAD_HISTORY_PATH: str = os.getenv( + "MAPILLARY_UPLOAD_HISTORY_PATH", os.path.join(USER_DATA_DIR, "upload_history") +) +MAX_IMAGE_UPLOAD_WORKERS: int = int( + os.getenv(_ENV_PREFIX + "MAX_IMAGE_UPLOAD_WORKERS", 64) +) +# The chunk size in MB (see chunked transfer encoding https://en.wikipedia.org/wiki/Chunked_transfer_encoding) +# for uploading data to MLY upload service. +# Changing this size does not change the number of requests nor affect upload performance, +# but it affects the responsiveness of the upload progress bar +UPLOAD_CHUNK_SIZE_MB: float = float(os.getenv(_ENV_PREFIX + "UPLOAD_CHUNK_SIZE_MB", 1)) +MAX_UPLOAD_RETRIES: int = int(os.getenv(_ENV_PREFIX + "MAX_UPLOAD_RETRIES", 200)) MAPILLARY__ENABLE_UPLOAD_HISTORY_FOR_DRY_RUN: bool = _yes_or_no( os.getenv("MAPILLARY__ENABLE_UPLOAD_HISTORY_FOR_DRY_RUN", "NO") ) -MAPILLARY__EXPERIMENTAL_ENABLE_IMU: bool = _yes_or_no( - os.getenv("MAPILLARY__EXPERIMENTAL_ENABLE_IMU", "NO") -) -MAPILLARY_UPLOAD_HISTORY_PATH: str = os.getenv( - "MAPILLARY_UPLOAD_HISTORY_PATH", - os.path.join( - USER_DATA_DIR, - "upload_history", - ), +_AUTH_VERIFICATION_DISABLED: bool = _yes_or_no( + os.getenv(_ENV_PREFIX + "_AUTH_VERIFICATION_DISABLED", "NO") ) - -MAX_IMAGE_UPLOAD_WORKERS = int(os.getenv(_ENV_PREFIX + "MAX_IMAGE_UPLOAD_WORKERS", 64)) diff --git a/mapillary_tools/geo.py b/mapillary_tools/geo.py index 5a6bb8d1..1e5fa389 100644 --- a/mapillary_tools/geo.py +++ b/mapillary_tools/geo.py @@ -51,6 +51,22 @@ def gps_distance(latlon_1: tuple[float, float], latlon_2: tuple[float, float]) - return math.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2 + (z1 - z2) ** 2) +def avg_speed(sequence: T.Sequence[PointLike]) -> float: + total_distance = 0.0 + for cur, nxt in pairwise(sequence): + total_distance += gps_distance((cur.lat, cur.lon), (nxt.lat, nxt.lon)) + + if sequence: + time_diff = sequence[-1].time - sequence[0].time + else: + time_diff = 0.0 + + if time_diff == 0.0: + return float("inf") + + return total_distance / time_diff + + def compute_bearing( latlon_1: tuple[float, float], latlon_2: tuple[float, float], diff --git a/mapillary_tools/process_sequence_properties.py b/mapillary_tools/process_sequence_properties.py index c6ba7643..c3d7b1fe 100644 --- a/mapillary_tools/process_sequence_properties.py +++ b/mapillary_tools/process_sequence_properties.py @@ -1,5 +1,6 @@ from __future__ import annotations +import functools import itertools import logging import math @@ -12,77 +13,95 @@ LOG = logging.getLogger(__name__) -SeqItem = T.TypeVar("SeqItem") +S = T.TypeVar("S") +R = T.TypeVar("R") PointSequence = T.List[geo.PointLike] def split_sequence_by( - sequence: T.Sequence[SeqItem], - should_split: T.Callable[[SeqItem, SeqItem], bool], -) -> list[list[SeqItem]]: + sequence: T.Iterable[S], reduce: T.Callable[[R, S], tuple[R, bool]], initial: R +) -> list[list[S]]: """ - Split a sequence into multiple sequences by should_split(prev, cur) => True - """ - output_sequences: list[list[SeqItem]] = [] + Split a sequence into multiple subsequences based on a reduction function. - if sequence: - output_sequences.append([sequence[0]]) + The function processes each element through a reduce function that maintains + state and determines whether to split the sequence at that point. When a split + is triggered, a new subsequence starts with the current element. - for prev, cur in geo.pairwise(sequence): - # invariant: prev is processed - if should_split(prev, cur): - output_sequences.append([cur]) - else: - output_sequences[-1].append(cur) - # invariant: cur is processed + Args: + sequence: An iterable of elements to split + reduce: A function that takes (accumulated_state, current_element) and + returns (new_state, should_split). If should_split is True, + a new subsequence starts with the current element. + initial: The initial state value passed to the reduce function - assert sum(len(s) for s in output_sequences) == len(sequence), ( - output_sequences, - sequence, - ) + Returns: + A list of subsequences, where each subsequence is a list of elements - return output_sequences - - -def split_sequence_by_agg( - sequence: T.Sequence[SeqItem], - should_split_with_sequence_state: T.Callable[[SeqItem, dict], bool], -) -> list[list[SeqItem]]: - """ - Split a sequence by should_split_with_sequence_state(cur, sequence_state) => True + Examples: + >>> # Split on even numbers + >>> def split_on_even(count, x): + ... return count + 1, x % 2 == 0 + >>> split_sequence_by([1, 3, 2, 4, 5, 6, 7], split_on_even, 0) + [[1, 3], [2], [4, 5], [6, 7]] + + >>> # Split when sum exceeds threshold + >>> def split_when_sum_exceeds_5(total, x): + ... total += x + ... return (x, True) if total > 5 else (total, False) + >>> split_sequence_by([1, 2, 3, 4, 1, 2], split_when_sum_exceeds_5, 0) + [[1, 2], [3], [4, 1], [2]] + + >>> # Split on specific values + >>> def split_on_zero(_, x): + ... return None, x == 0 + >>> split_sequence_by([1, 2, 0, 3, 4, 0, 5], split_on_zero, None) + [[1, 2], [0, 3, 4], [0, 5]] + + >>> # Empty sequence + >>> split_sequence_by([], lambda s, x: (s, False), 0) + [] + + >>> # Single element + >>> split_sequence_by([42], lambda s, x: (s, False), 0) + [[42]] """ - output_sequences: list[list[SeqItem]] = [] - sequence_state: dict = {} - - for cur in sequence: - start_new_sequence = should_split_with_sequence_state(cur, sequence_state) - if not output_sequences: - output_sequences.append([]) + output_sequences: list[list[S]] = [] - if start_new_sequence: - # DO NOT reset the state because it contains the information of current item - # sequence_state = {} - if output_sequences[-1]: - output_sequences.append([]) + value = initial - output_sequences[-1].append(cur) + for element in sequence: + value, should = reduce(value, element) - assert sum(len(s) for s in output_sequences) == len(sequence) + if should: + output_sequences.append([element]) + else: + if output_sequences: + output_sequences[-1].append(element) + else: + output_sequences.append([element]) return output_sequences def duplication_check( sequence: PointSequence, + *, max_duplicate_distance: float, max_duplicate_angle: float, ) -> tuple[PointSequence, list[types.ErrorMetadata]]: + """ + >>> duplication_check([], max_duplicate_distance=1, max_duplicate_angle=2) + ([], []) + """ + dedups: PointSequence = [] dups: list[types.ErrorMetadata] = [] it = iter(sequence) - prev = next(it) + prev = next(it, None) + if prev is None: return dedups, dups @@ -90,10 +109,7 @@ def duplication_check( for cur in it: # invariant: prev is processed - distance = geo.gps_distance( - (prev.lat, prev.lon), - (cur.lat, cur.lon), - ) + distance = geo.gps_distance((prev.lat, prev.lon), (cur.lat, cur.lon)) if prev.angle is not None and cur.angle is not None: angle_diff = geo.diff_bearing(prev.angle, cur.angle) @@ -104,15 +120,14 @@ def duplication_check( angle_diff is None or angle_diff <= max_duplicate_angle ): msg = f"Duplicate of its previous image in terms of distance <= {max_duplicate_distance} and angle <= {max_duplicate_angle}" + ex = exceptions.MapillaryDuplicationError( + msg, + DescriptionJSONSerializer.as_desc(cur), + distance=distance, + angle_diff=angle_diff, + ) dup = types.describe_error_metadata( - exceptions.MapillaryDuplicationError( - msg, - DescriptionJSONSerializer.as_desc(cur), - distance=distance, - angle_diff=angle_diff, - ), - cur.filename, - filetype=types.FileType.IMAGE, + ex, cur.filename, filetype=types.FileType.IMAGE ) dups.append(dup) # prev does not change @@ -124,9 +139,9 @@ def duplication_check( return dedups, dups -def _group_by( +def _group_images_by( image_metadatas: T.Iterable[types.ImageMetadata], - group_key_func=T.Callable[[types.ImageMetadata], T.Hashable], + group_key_func: T.Callable[[types.ImageMetadata], T.Hashable], ) -> dict[T.Hashable, list[types.ImageMetadata]]: grouped: dict[T.Hashable, list[types.ImageMetadata]] = {} for metadata in image_metadatas: @@ -136,11 +151,21 @@ def _group_by( def _interpolate_subsecs_for_sorting(sequence: PointSequence) -> None: """ - Update the timestamps make sure they are unique and sorted + Update the timestamps to make sure they are unique and sorted in the same order by interpolating subseconds + Examples: - - Input: 1, 1, 1, 1, 1, 2 - - Output: 1, 1.2, 1.4, 1.6, 1.8, 2 + >>> def make_point(t): + ... return geo.Point(lat=0, lon=0, time=t, alt=None, angle=None) + >>> points = [make_point(t) for t in [1, 1, 1, 1, 1, 2]] + >>> _interpolate_subsecs_for_sorting(points) + >>> [p.time for p in points] + [1.0, 1.2, 1.4, 1.6, 1.8, 2] + + >>> points = [make_point(t) for t in [1.1]] + >>> _interpolate_subsecs_for_sorting(points) + >>> [p.time for p in points] + [1.1] """ gidx = 0 @@ -172,63 +197,6 @@ def _interpolate_subsecs_for_sorting(sequence: PointSequence) -> None: ) -def _parse_filesize_in_bytes(filesize_str: str) -> int: - filesize_str = filesize_str.strip().upper() - - try: - if filesize_str.endswith("B"): - return int(filesize_str[:-1]) - elif filesize_str.endswith("K"): - return int(filesize_str[:-1]) * 1024 - elif filesize_str.endswith("M"): - return int(filesize_str[:-1]) * 1024 * 1024 - elif filesize_str.endswith("G"): - return int(filesize_str[:-1]) * 1024 * 1024 * 1024 - else: - return int(filesize_str) - except ValueError: - raise exceptions.MapillaryBadParameterError( - f"Expect valid file size that ends with B, K, M, or G, but got {filesize_str}" - ) - - -def _parse_pixels(pixels_str: str) -> int: - pixels_str = pixels_str.strip().upper() - - try: - if pixels_str.endswith("K"): - return int(pixels_str[:-1]) * 1000 - elif pixels_str.endswith("M"): - return int(pixels_str[:-1]) * 1000 * 1000 - elif pixels_str.endswith("G"): - return int(pixels_str[:-1]) * 1000 * 1000 * 1000 - else: - return int(pixels_str) - except ValueError: - raise exceptions.MapillaryBadParameterError( - f"Expect valid number of pixels that ends with K, M, or G, but got {pixels_str}" - ) - - -def _avg_speed(sequence: T.Sequence[geo.PointLike]) -> float: - total_distance = 0.0 - for cur, nxt in geo.pairwise(sequence): - total_distance += geo.gps_distance( - (cur.lat, cur.lon), - (nxt.lat, nxt.lon), - ) - - if sequence: - time_diff = sequence[-1].time - sequence[0].time - else: - time_diff = 0.0 - - if time_diff == 0.0: - return float("inf") - - return total_distance / time_diff - - def _is_video_stationary( sequence: T.Sequence[geo.PointLike], max_radius_in_meters: float ) -> bool: @@ -246,7 +214,7 @@ def _is_video_stationary( def _check_video_limits( video_metadatas: T.Iterable[types.VideoMetadata], - max_sequence_filesize_in_bytes: int, + max_sequence_filesize_in_bytes: int | None, max_avg_speed: float, max_radius_for_stationary_check: float, ) -> tuple[list[types.VideoMetadata], list[types.ErrorMetadata]]: @@ -262,27 +230,28 @@ def _check_video_limits( if is_stationary: raise exceptions.MapillaryStationaryVideoError("Stationary video") - video_filesize = ( - utils.get_file_size(video_metadata.filename) - if video_metadata.filesize is None - else video_metadata.filesize - ) - if video_filesize > max_sequence_filesize_in_bytes: - raise exceptions.MapillaryFileTooLargeError( - f"Video file size exceeds the maximum allowed file size ({max_sequence_filesize_in_bytes} bytes)", + if max_sequence_filesize_in_bytes is not None: + video_filesize = ( + utils.get_file_size(video_metadata.filename) + if video_metadata.filesize is None + else video_metadata.filesize ) + if video_filesize > max_sequence_filesize_in_bytes: + raise exceptions.MapillaryFileTooLargeError( + f"Video file size exceeds the maximum allowed file size ({max_sequence_filesize_in_bytes} bytes)", + ) contains_null_island = any( p.lat == 0 and p.lon == 0 for p in video_metadata.points ) if contains_null_island: raise exceptions.MapillaryNullIslandError( - "Found GPS coordinates in Null Island (0, 0)", + "GPS coordinates in Null Island (0, 0)" ) too_fast = ( len(video_metadata.points) >= 2 - and _avg_speed(video_metadata.points) > max_avg_speed + and geo.avg_speed(video_metadata.points) > max_avg_speed ) if too_fast: raise exceptions.MapillaryCaptureSpeedTooFastError( @@ -299,46 +268,45 @@ def _check_video_limits( else: output_video_metadatas.append(video_metadata) - LOG.info( - "Found %s videos and %s errors after video limit checks", - len(output_video_metadatas), - len(error_metadatas), - ) + if error_metadatas: + LOG.info( + f"Video validation: {len(output_video_metadatas)} valid, {len(error_metadatas)} errors" + ) return output_video_metadatas, error_metadatas def _check_sequences_by_limits( input_sequences: T.Sequence[PointSequence], - max_sequence_filesize_in_bytes: int, + max_sequence_filesize_in_bytes: int | None, max_avg_speed: float, ) -> tuple[list[PointSequence], list[types.ErrorMetadata]]: output_sequences: list[PointSequence] = [] output_errors: list[types.ErrorMetadata] = [] for sequence in input_sequences: - sequence_filesize = sum( - utils.get_file_size(image.filename) - if image.filesize is None - else image.filesize - for image in sequence - ) - try: - if sequence_filesize > max_sequence_filesize_in_bytes: - raise exceptions.MapillaryFileTooLargeError( - f"Sequence file size exceeds the maximum allowed file size ({max_sequence_filesize_in_bytes} bytes)", + if max_sequence_filesize_in_bytes is not None: + sequence_filesize = sum( + utils.get_file_size(image.filename) + if image.filesize is None + else image.filesize + for image in sequence ) + if sequence_filesize > max_sequence_filesize_in_bytes: + raise exceptions.MapillaryFileTooLargeError( + f"Sequence file size exceeds the maximum allowed file size ({max_sequence_filesize_in_bytes} bytes)", + ) contains_null_island = any( image.lat == 0 and image.lon == 0 for image in sequence ) if contains_null_island: raise exceptions.MapillaryNullIslandError( - "Found GPS coordinates in Null Island (0, 0)", + "GPS coordinates in Null Island (0, 0)" ) - too_fast = len(sequence) >= 2 and _avg_speed(sequence) > max_avg_speed + too_fast = len(sequence) >= 2 and geo.avg_speed(sequence) > max_avg_speed if too_fast: raise exceptions.MapillaryCaptureSpeedTooFastError( f"Capture speed too fast (exceeds {round(max_avg_speed, 3)} m/s)", @@ -347,9 +315,7 @@ def _check_sequences_by_limits( for image in sequence: output_errors.append( types.describe_error_metadata( - exc=ex, - filename=image.filename, - filetype=types.FileType.IMAGE, + exc=ex, filename=image.filename, filetype=types.FileType.IMAGE ) ) @@ -360,11 +326,10 @@ def _check_sequences_by_limits( len(s) for s in input_sequences ) - LOG.info( - "Found %s sequences and %s errors after sequence limit checks", - len(output_sequences), - len(output_errors), - ) + if output_errors: + LOG.info( + f"Sequence validation: {len(output_sequences)} valid, {len(output_errors)} errors" + ) return output_sequences, output_errors @@ -372,7 +337,7 @@ def _check_sequences_by_limits( def _group_by_folder_and_camera( image_metadatas: list[types.ImageMetadata], ) -> list[list[types.ImageMetadata]]: - grouped = _group_by( + grouped = _group_images_by( image_metadatas, lambda metadata: ( str(metadata.filename.parent), @@ -383,89 +348,10 @@ def _group_by_folder_and_camera( ), ) for key in grouped: - LOG.debug("Group sequences by %s: %s images", key, len(grouped[key])) + LOG.debug(f"Grouped {len(grouped[key])} images by {key}") output_sequences = list(grouped.values()) - LOG.info( - "Found %s sequences from different folders and cameras", - len(output_sequences), - ) - - return output_sequences - - -def _split_sequences_by_cutoff_time( - input_sequences: T.Sequence[PointSequence], cutoff_time: float -) -> list[PointSequence]: - def _should_split_by_cutoff_time( - prev: types.ImageMetadata, cur: types.ImageMetadata - ) -> bool: - time_diff = cur.time - prev.time - assert 0 <= time_diff, "sequence must be sorted by capture times" - should = cutoff_time < time_diff - if should: - LOG.debug( - "Split because the capture time gap %s seconds exceeds cutoff_time (%s seconds): %s: %s -> %s", - round(time_diff, 2), - round(cutoff_time, 2), - prev.filename.parent, - prev.filename.name, - cur.filename.name, - ) - return should - - output_sequences = [] - for sequence in input_sequences: - output_sequences.extend( - split_sequence_by(sequence, should_split=_should_split_by_cutoff_time) - ) - - assert sum(len(s) for s in output_sequences) == sum(len(s) for s in input_sequences) - - LOG.info( - "Found %s sequences after split by cutoff_time %d seconds", - len(output_sequences), - cutoff_time, - ) - - return output_sequences - - -def _split_sequences_by_cutoff_distance( - input_sequences: T.Sequence[PointSequence], cutoff_distance: float -) -> list[PointSequence]: - def _should_split_by_cutoff_distance( - prev: types.ImageMetadata, cur: types.ImageMetadata - ) -> bool: - distance = geo.gps_distance( - (prev.lat, prev.lon), - (cur.lat, cur.lon), - ) - should = cutoff_distance < distance - if should: - LOG.debug( - "Split because the distance gap %s meters exceeds cutoff_distance (%s meters): %s: %s -> %s", - round(distance, 2), - round(cutoff_distance, 2), - prev.filename.parent, - prev.filename.name, - cur.filename.name, - ) - return should - - output_sequences = [] - for sequence in input_sequences: - output_sequences.extend( - split_sequence_by(sequence, _should_split_by_cutoff_distance) - ) - - assert sum(len(s) for s in output_sequences) == sum(len(s) for s in input_sequences) - - LOG.info( - "Found %s sequences after split by cutoff_distance %d meters", - len(output_sequences), - cutoff_distance, - ) + LOG.info(f"Created {len(output_sequences)} sequences by folders and cameras") return output_sequences @@ -485,95 +371,218 @@ def _check_sequences_duplication( max_duplicate_angle=duplicate_angle, ) assert len(sequence) == len(output_sequence) + len(errors) - output_sequences.append(output_sequence) + if output_sequence: + output_sequences.append(output_sequence) output_errors.extend(errors) + # All input images should be accounted for either in output sequences or errors assert sum(len(s) for s in output_sequences) + len(output_errors) == sum( len(s) for s in input_sequences ) - LOG.info( - "Found %s sequences and %s errors after duplication check", - len(output_sequences), - len(output_errors), - ) + if output_errors: + LOG.info( + f"Duplication check: {len(output_errors)} image duplicates removed (with {duplicate_distance=} and {duplicate_angle=})" + ) return output_sequences, output_errors +class SplitState(T.TypedDict, total=False): + sequence_images: int + sequence_file_size: int + sequence_pixels: int + image: types.ImageMetadata + + +def _should_split_by_max_sequence_images( + state: SplitState, + image: types.ImageMetadata, + max_sequence_images: int, + split: bool = False, +) -> tuple[SplitState, bool]: + if not split: + new_sequence_images = state.get("sequence_images", 0) + 1 + split = max_sequence_images < new_sequence_images + if split: + LOG.info( + f"Split sequence at {image.filename.name}: too many images ({new_sequence_images} > {max_sequence_images})" + ) + + if split: + new_sequence_images = 1 + + state["sequence_images"] = new_sequence_images + + return state, split + + +def _should_split_by_cutoff_time( + state: SplitState, + image: types.ImageMetadata, + cutoff_time: float, + split: bool = False, +) -> tuple[SplitState, bool]: + if not split: + last_image = state.get("image") + if last_image is not None: + diff = image.time - last_image.time + split = cutoff_time < diff + if split: + LOG.info( + f"Split sequence at {image.filename.name}: time gap too large ({diff:.6g} seconds > {cutoff_time:.6g} seconds)" + ) + + state["image"] = image + + return state, split + + +def _should_split_by_cutoff_distance( + state: SplitState, + image: types.ImageMetadata, + cutoff_distance: float, + split: bool = False, +) -> tuple[SplitState, bool]: + if not split: + last_image = state.get("image") + if last_image is not None: + diff = geo.gps_distance( + (last_image.lat, last_image.lon), (image.lat, image.lon) + ) + split = cutoff_distance < diff + if split: + LOG.info( + f"Split sequence at {image.filename.name}: distance gap too large ({diff:.6g} meters > {cutoff_distance:.6g} meters)" + ) + + state["image"] = image + + return state, split + + +def _should_split_by_max_sequence_filesize( + state: SplitState, + image: types.ImageMetadata, + max_sequence_filesize_in_bytes: int, + split: bool = False, +) -> tuple[SplitState, bool]: + if image.filesize is None: + filesize = os.path.getsize(image.filename) + else: + filesize = image.filesize + + if not split: + new_sequence_file_size = state.get("sequence_file_size", 0) + filesize + split = max_sequence_filesize_in_bytes < new_sequence_file_size + if split: + LOG.info( + f"Split sequence at {image.filename.name}: filesize too large ({new_sequence_file_size} > {max_sequence_filesize_in_bytes})" + ) + + if split: + new_sequence_file_size = filesize + + state["sequence_file_size"] = new_sequence_file_size + + return state, split + + +def _should_split_by_max_sequence_pixels( + state: SplitState, + image: types.ImageMetadata, + max_sequence_pixels: int, + split: bool = False, +) -> tuple[SplitState, bool]: + # Default values if width/height not available + width = 1024 if image.width is None else image.width + height = 1024 if image.height is None else image.height + pixels = width * height + + if not split: + new_sequence_pixels = state.get("sequence_pixels", 0) + pixels + split = max_sequence_pixels < new_sequence_pixels + if split: + LOG.info( + f"Split sequence at {image.filename.name}: pixels too large ({new_sequence_pixels} > {max_sequence_pixels})" + ) + + if split: + new_sequence_pixels = pixels + + state["sequence_pixels"] = new_sequence_pixels + + return state, split + + def _split_sequences_by_limits( input_sequences: T.Sequence[PointSequence], - max_sequence_filesize_in_bytes: float, - max_sequence_pixels: float, + max_sequence_filesize_in_bytes: int | None = None, + max_sequence_pixels: int | None = None, + max_sequence_images: int | None = None, + cutoff_time: float | None = None, + cutoff_distance: float | None = None, ) -> list[PointSequence]: - max_sequence_images = constants.MAX_SEQUENCE_LENGTH - max_sequence_filesize = max_sequence_filesize_in_bytes + should_splits = [] - def _should_split(image: types.ImageMetadata, sequence_state: dict) -> bool: - last_sequence_images = sequence_state.get("last_sequence_images", 0) - last_sequence_file_size = sequence_state.get("last_sequence_file_size", 0) - last_sequence_pixels = sequence_state.get("last_sequence_pixels", 0) - - # decent default values if width/height not available - width = 1024 if image.width is None else image.width - height = 1024 if image.height is None else image.height - pixels = width * height - - if image.filesize is None: - filesize = os.path.getsize(image.filename) - else: - filesize = image.filesize + if max_sequence_images is not None: + should_splits.append( + functools.partial( + _should_split_by_max_sequence_images, + max_sequence_images=max_sequence_images, + ) + ) - new_sequence_images = last_sequence_images + 1 - new_sequence_file_size = last_sequence_file_size + filesize - new_sequence_pixels = last_sequence_pixels + pixels + if cutoff_time is not None: + should_splits.append( + functools.partial(_should_split_by_cutoff_time, cutoff_time=cutoff_time) + ) - if max_sequence_images < new_sequence_images: - LOG.debug( - "Split because the current sequence (%s) reaches the max number of images (%s)", - new_sequence_images, - max_sequence_images, + if cutoff_distance is not None: + should_splits.append( + functools.partial( + _should_split_by_cutoff_distance, cutoff_distance=cutoff_distance ) - start_new_sequence = True - elif max_sequence_filesize < new_sequence_file_size: - LOG.debug( - "Split because the current sequence (%s) reaches the max filesize (%s)", - new_sequence_file_size, - max_sequence_filesize, + ) + + if max_sequence_filesize_in_bytes is not None: + should_splits.append( + functools.partial( + _should_split_by_max_sequence_filesize, + max_sequence_filesize_in_bytes=max_sequence_filesize_in_bytes, ) - start_new_sequence = True - elif max_sequence_pixels < new_sequence_pixels: - LOG.debug( - "Split because the current sequence (%s) reaches the max pixels (%s)", - new_sequence_pixels, - max_sequence_pixels, + ) + + if max_sequence_pixels is not None: + should_splits.append( + functools.partial( + _should_split_by_max_sequence_pixels, + max_sequence_pixels=max_sequence_pixels, ) - start_new_sequence = True - else: - start_new_sequence = False + ) - if not start_new_sequence: - sequence_state["last_sequence_images"] = new_sequence_images - sequence_state["last_sequence_file_size"] = new_sequence_file_size - sequence_state["last_sequence_pixels"] = new_sequence_pixels - else: - sequence_state["last_sequence_images"] = 1 - sequence_state["last_sequence_file_size"] = filesize - sequence_state["last_sequence_pixels"] = pixels + def _should_split_agg( + state: SplitState, image: types.ImageMetadata + ) -> tuple[SplitState, bool]: + split = False + + for should_split in should_splits: + state, split = should_split(state, image, split=split) - return start_new_sequence + return state, split output_sequences = [] for sequence in input_sequences: output_sequences.extend( - split_sequence_by_agg( - sequence, should_split_with_sequence_state=_should_split + split_sequence_by( + sequence, _should_split_agg, initial=T.cast(SplitState, {}) ) ) assert sum(len(s) for s in output_sequences) == sum(len(s) for s in input_sequences) - LOG.info("Found %s sequences after split by sequence limits", len(output_sequences)) + if len(input_sequences) != len(output_sequences): + LOG.info(f"Split sequences: {len(input_sequences)} -> {len(output_sequences)}") return output_sequences @@ -587,10 +596,8 @@ def process_sequence_properties( duplicate_angle: float = constants.DUPLICATE_ANGLE, max_avg_speed: float = constants.MAX_AVG_SPEED, ) -> list[types.MetadataOrError]: - max_sequence_filesize_in_bytes = _parse_filesize_in_bytes( - constants.MAX_SEQUENCE_FILESIZE - ) - max_sequence_pixels = _parse_pixels(constants.MAX_SEQUENCE_PIXELS) + max_sequence_filesize_in_bytes = constants.MAX_SEQUENCE_FILESIZE + max_sequence_pixels = constants.MAX_SEQUENCE_PIXELS error_metadatas: list[types.ErrorMetadata] = [] image_metadatas: list[types.ImageMetadata] = [] @@ -632,9 +639,15 @@ def process_sequence_properties( for sequence in sequences: _interpolate_subsecs_for_sorting(sequence) - # Split sequences by cutoff time + # Split sequences by max number of images, max filesize, max pixels, and cutoff time # NOTE: Do not split by distance here because it affects the speed limit check - sequences = _split_sequences_by_cutoff_time(sequences, cutoff_time=cutoff_time) + sequences = _split_sequences_by_limits( + sequences, + max_sequence_filesize_in_bytes=max_sequence_filesize_in_bytes, + max_sequence_pixels=max_sequence_pixels, + max_sequence_images=constants.MAX_SEQUENCE_LENGTH, + cutoff_time=cutoff_time, + ) # Duplication check sequences, errors = _check_sequences_duplication( @@ -651,13 +664,6 @@ def process_sequence_properties( image.angle = None geo.interpolate_directions_if_none(sequence) - # Split sequences by max number of images, max filesize, and max pixels - sequences = _split_sequences_by_limits( - sequences, - max_sequence_filesize_in_bytes=max_sequence_filesize_in_bytes, - max_sequence_pixels=max_sequence_pixels, - ) - # Check limits for sequences sequences, errors = _check_sequences_by_limits( sequences, @@ -667,8 +673,8 @@ def process_sequence_properties( error_metadatas.extend(errors) # Split sequences by cutoff distance - # NOTE: The speed limit check probably rejects most of anomalies - sequences = _split_sequences_by_cutoff_distance( + # NOTE: The speed limit check probably rejects most anomalies + sequences = _split_sequences_by_limits( sequences, cutoff_distance=cutoff_distance ) @@ -691,7 +697,7 @@ def process_sequence_properties( results = error_metadatas + image_metadatas + video_metadatas assert len(metadatas) == len(results), ( - f"expected {len(metadatas)} results but got {len(results)}" + f"Expected {len(metadatas)} results but got {len(results)}" ) return results diff --git a/tests/unit/test_sequence_processing.py b/tests/unit/test_sequence_processing.py index 5ff30639..0034fb90 100644 --- a/tests/unit/test_sequence_processing.py +++ b/tests/unit/test_sequence_processing.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import itertools import typing as T from pathlib import Path @@ -6,7 +8,6 @@ import pytest from mapillary_tools import ( - constants, exceptions, geo, process_geotag_properties as pgp, @@ -21,22 +22,19 @@ def _make_image_metadata( lng: float, lat: float, time: float, - angle: T.Optional[float] = None, + angle: float | None = None, + filesize: int = 0, **kwargs, ) -> types.ImageMetadata: - filename = filename.resolve() - if not filename.exists(): - filename.parent.mkdir(parents=True, exist_ok=True) - with filename.open("w"): - pass return types.ImageMetadata( - filename=filename, + filename=filename.resolve(), + filesize=filesize, lon=lng, lat=lat, time=time, alt=None, - **kwargs, angle=angle, + **kwargs, ) @@ -349,7 +347,6 @@ def test_interpolation(tmpdir: py.path.local): def test_subsec_interpolation(tmpdir: py.path.local): - constants.MAX_SEQUENCE_LENGTH = 2 curdir = tmpdir.mkdir("hello222").mkdir("world333") sequence: T.List[types.Metadata] = [ # s1 @@ -449,7 +446,7 @@ def test_process_finalize(setup_data): { "filename": str(test_exif), "filetype": "image", - "filesize": None, + "filesize": 0, "MAPFilename": "test_exif.jpg", "MAPLatitude": 1, "MAPLongitude": 1, @@ -488,11 +485,10 @@ def test_process_finalize(setup_data): def test_cut_by_pixels(tmpdir: py.path.local): - curdir = tmpdir.mkdir("hello77").mkdir("world88") sequence: T.List[types.Metadata] = [ # s2 _make_image_metadata( - Path(curdir) / Path("./a.jpg"), + Path(tmpdir) / Path("./a.jpg"), 2, 2, 1, @@ -501,7 +497,7 @@ def test_cut_by_pixels(tmpdir: py.path.local): height=2, ), _make_image_metadata( - Path(curdir) / Path("./b.jpg"), + Path(tmpdir) / Path("./b.jpg"), 2.00001, 2.00001, 20, @@ -511,7 +507,7 @@ def test_cut_by_pixels(tmpdir: py.path.local): ), # s1 _make_image_metadata( - Path(curdir) / Path("./c.jpg"), + Path(tmpdir) / Path("./c.jpg"), 2.00002, 2.00002, 30, @@ -541,10 +537,9 @@ def test_cut_by_pixels(tmpdir: py.path.local): def test_video_error(tmpdir: py.path.local): - curdir = tmpdir.mkdir("hello222").mkdir("videos") sequence: T.List[types.Metadata] = [ types.VideoMetadata( - Path(curdir) / Path("test_video_null_island.mp4"), + Path(tmpdir) / Path("test_video_null_island.mp4"), types.FileType.VIDEO, points=[ geo.Point(1, -0.00001, -0.00001, 1, angle=None), @@ -556,7 +551,7 @@ def test_video_error(tmpdir: py.path.local): filesize=123, ), types.VideoMetadata( - Path(curdir) / Path("test_video_too_fast.mp4"), + Path(tmpdir) / Path("test_video_too_fast.mp4"), types.FileType.VIDEO, points=[ geo.Point(1, 1, 1, 1, angle=None), @@ -568,7 +563,7 @@ def test_video_error(tmpdir: py.path.local): filesize=123, ), types.VideoMetadata( - Path(curdir) / Path("test_video_file_too_large.mp4"), + Path(tmpdir) / Path("test_video_file_too_large.mp4"), types.FileType.VIDEO, points=[ geo.Point(1, 1, 1, 1, angle=None), @@ -579,7 +574,7 @@ def test_video_error(tmpdir: py.path.local): filesize=1024 * 1024 * 1024 * 200, ), types.VideoMetadata( - Path(curdir) / Path("test_good.mp4"), + Path(tmpdir) / Path("test_good.mp4"), types.FileType.VIDEO, points=[ geo.Point(1, 1, 1, 1, angle=None), @@ -620,101 +615,87 @@ def test_video_error(tmpdir: py.path.local): ) -def test_split_sequence_by(): - """Test split_sequence_by function.""" - # Create test points - p1 = geo.Point(1, 1.00000, 1.00000, 1, angle=0) - p2 = geo.Point(2, 1.00001, 1.00001, 2, angle=0) - p3 = geo.Point(3, 1.00002, 1.00002, 3, angle=0) - p4 = geo.Point(10, 1.00003, 1.00003, 4, angle=0) # Large time gap - p5 = geo.Point(11, 1.00004, 1.00004, 5, angle=0) - p6 = geo.Point(12, 1.10000, 1.10000, 6, angle=0) # Large distance gap - p7 = geo.Point(13, 1.10001, 1.10001, 7, angle=0) - - # Create a sequence of points - sequence = [p1, p2, p3, p4, p5, p6, p7] - - # Test split by time gaps (> 5 seconds) - split_by_time = lambda prev, cur: cur.time - prev.time > 5 - sequences = psp.split_sequence_by(sequence, split_by_time) - - # Should be split into two sequences [p1,p2,p3], [p4,p5,p6,p7] - assert len(sequences) == 2 - assert sequences[0] == [p1, p2, p3] - assert sequences[1] == [p4, p5, p6, p7] - - # Test split by large distance gaps - def split_by_distance(prev, cur): - distance = geo.gps_distance( - (prev.lat, prev.lon), - (cur.lat, cur.lon), +def test_split_sequence_by_filesize(tmpdir): + sequence: T.List[types.Metadata] = [ + # s1 + _make_image_metadata( + Path(tmpdir) / Path("./a.jpg"), 2, 2, 1, filesize=110 * 1024 * 1024 * 1024 + ), + # s2 + _make_image_metadata( + Path(tmpdir) / Path("./b.jpg"), 2.00001, 2.00001, 2, filesize=1 + ), + _make_image_metadata( + Path(tmpdir) / Path("./c.jpg"), 2.00002, 2.00002, 2, filesize=1 + ), + ] + + metadatas = psp.process_sequence_properties(sequence) + assert 2 == len({m.MAPSequenceUUID for m in metadatas}) # type: ignore + + +def test_split_sequence_by_image_count(tmpdir): + max_allowed_images = 1000 + + sequence = [] + for i in range(1, max_allowed_images + 1): + image = _make_image_metadata( + Path(tmpdir) / Path(f"./a{i}.jpg"), + 1 + i * 0.00001, + 1 + i * 0.00001, + i, + filesize=1, ) - should = distance > 1000 # Split if distance > 1000 meters - return should + sequence.append(image) - sequences = psp.split_sequence_by(sequence, split_by_distance) + metadatas = psp.process_sequence_properties(sequence) + assert 1 == len({m.MAPSequenceUUID for m in metadatas}), metadatas # type: ignore - # Should be split into two sequences [p1,p2,p3,p4,p5], [p6,p7] - assert len(sequences) == 2 - assert sequences[0] == [p1, p2, p3, p4, p5] - assert sequences[1] == [p6, p7] - # Test empty sequence - empty_sequences = psp.split_sequence_by([], split_by_time) - assert len(empty_sequences) == 0 +def test_split_sequence_by_image_count_split(tmpdir): + max_allowed_images = 1000 - # Test single point sequence - single_point = [p1] - single_sequences = psp.split_sequence_by(single_point, split_by_time) - assert len(single_sequences) == 1 - assert single_sequences[0] == [p1] + sequence = [] + for i in range(1, max_allowed_images + 2): + image = _make_image_metadata( + Path(tmpdir) / Path(f"./a{i}.jpg"), + 1 + i * 0.00001, + 1 + i * 0.00001, + i, + filesize=1, + ) + sequence.append(image) - sequences = psp.split_sequence_by([], split_by_time) - assert len(sequences) == 0 + metadatas = psp.process_sequence_properties(sequence) + assert 2 == len({m.MAPSequenceUUID for m in metadatas}), metadatas # type: ignore -def test_split_sequence_by_agg(tmpdir): - curdir = tmpdir.mkdir("hello77").mkdir("world88") +def test_split_sequence_by_cutoff_time(tmpdir): sequence: T.List[types.Metadata] = [ # s1 - _make_image_metadata( - Path(curdir) / Path("./a.jpg"), - 2, - 2, - 1, - filesize=110 * 1024 * 1024 * 1024, - ), + _make_image_metadata(Path(tmpdir) / Path("./a.jpg"), 1, 1, 1, filesize=1), # s2 _make_image_metadata( - Path(curdir) / Path("./b.jpg"), - 2.00001, - 2.00001, - 2, - filesize=1, + Path(tmpdir) / Path("./b.jpg"), 1.00001, 1.00001, 600, filesize=1 ), - # s3 _make_image_metadata( - Path(curdir) / Path("./c.jpg"), - 2.00002, - 2.00002, - 3, - filesize=110 * 1024 * 1024 * 1024 - 1, - ), - _make_image_metadata( - Path(curdir) / Path("./c.jpg"), - 2.00003, - 2.00003, - 4, - filesize=1, + Path(tmpdir) / Path("./c.jpg"), 1.00002, 1.00002, 601, filesize=1 ), ] - metadatas = psp.process_sequence_properties( - sequence, - cutoff_distance=1000000000, - cutoff_time=100, - interpolate_directions=True, - duplicate_distance=0.1, - duplicate_angle=0.1, - ) - assert 3 == len({m.MAPSequenceUUID for m in metadatas}) # type: ignore + metadatas = psp.process_sequence_properties(sequence) + assert 2 == len({m.MAPSequenceUUID for m in metadatas}), metadatas # type: ignore + + +def test_split_sequence_no_split(tmpdir): + sequence: T.List[types.Metadata] = [ + # s1 + _make_image_metadata(Path(tmpdir) / Path("./a.jpg"), 1, 1, 1), + # s2 + _make_image_metadata(Path(tmpdir) / Path("./b.jpg"), 1.00001, 1.00001, 2), + # s3 + _make_image_metadata(Path(tmpdir) / Path("./c.jpg"), 1.00002, 1.00002, 3), + ] + + metadatas = psp.process_sequence_properties(sequence) + assert 1 == len({m.MAPSequenceUUID for m in metadatas}), metadatas # type: ignore