diff --git a/mapillary_tools/ffmpeg.py b/mapillary_tools/ffmpeg.py index f00db1934..3efad87ee 100644 --- a/mapillary_tools/ffmpeg.py +++ b/mapillary_tools/ffmpeg.py @@ -13,8 +13,7 @@ from pathlib import Path LOG = logging.getLogger(__name__) -FRAME_EXT = ".jpg" -NA_STREAM_IDX = "NA" +_MAX_STDERR_LENGTH = 2048 class StreamTag(T.TypedDict): @@ -31,6 +30,9 @@ class Stream(T.TypedDict): index: int tags: StreamTag width: int + r_frame_rate: str + avg_frame_rate: str + nb_frames: str class ProbeOutput(T.TypedDict): @@ -41,9 +43,6 @@ class FFmpegNotFoundError(Exception): pass -_MAX_STDERR_LENGTH = 2048 - - def _truncate_begin(s: str) -> str: if _MAX_STDERR_LENGTH < len(s): return "..." + s[-_MAX_STDERR_LENGTH:] @@ -74,6 +73,8 @@ def __str__(self) -> str: class FFMPEG: + FRAME_EXT = ".jpg" + def __init__( self, ffmpeg_path: str = "ffmpeg", @@ -81,71 +82,33 @@ def __init__( stderr: int | None = None, ) -> None: """ - ffmpeg_path: path to ffmpeg binary - ffprobe_path: path to ffprobe binary - stderr: param passed to subprocess.run to control whether to capture stderr + Initialize FFMPEG wrapper with paths to ffmpeg and ffprobe binaries. + + Args: + ffmpeg_path: Path to ffmpeg binary executable + ffprobe_path: Path to ffprobe binary executable + stderr: Parameter passed to subprocess.run to control stderr capture. + Use subprocess.PIPE to capture stderr, None to inherit from parent """ self.ffmpeg_path = ffmpeg_path self.ffprobe_path = ffprobe_path self.stderr = stderr - def _run_ffprobe_json(self, cmd: list[str]) -> dict: - full_cmd: list[str] = [self.ffprobe_path, "-print_format", "json", *cmd] - LOG.info(f"Extracting video information: {' '.join(full_cmd)}") - try: - completed = subprocess.run( - full_cmd, - check=True, - stdout=subprocess.PIPE, - stderr=self.stderr, - ) - except FileNotFoundError: - raise FFmpegNotFoundError( - f'The ffprobe command "{self.ffprobe_path}" not found' - ) - except subprocess.CalledProcessError as ex: - raise FFmpegCalledProcessError(ex) from ex - - try: - stdout = completed.stdout.decode("utf-8") - except UnicodeDecodeError: - raise RuntimeError( - f"Error decoding ffprobe output as unicode: {_truncate_end(str(completed.stdout))}" - ) - - try: - output = json.loads(stdout) - except json.JSONDecodeError: - raise RuntimeError( - f"Error JSON decoding ffprobe output: {_truncate_end(stdout)}" - ) + def probe_format_and_streams(self, video_path: Path) -> ProbeOutput: + """ + Probe video file to extract format and stream information using ffprobe. - # This check is for macOS: - # ffprobe -hide_banner -print_format json not_exists - # you will get exit code == 0 with the following stdout and stderr: - # { - # } - # not_exists: No such file or directory - if not output: - raise RuntimeError( - f"Empty JSON ffprobe output with STDERR: {_truncate_begin(str(completed.stderr))}" - ) + Args: + video_path: Path to the video file to probe - return output + Returns: + Dictionary containing streams and format information from ffprobe - def _run_ffmpeg(self, cmd: list[str]) -> None: - full_cmd: list[str] = [self.ffmpeg_path, *cmd] - LOG.info(f"Extracting frames: {' '.join(full_cmd)}") - try: - subprocess.run(full_cmd, check=True, stderr=self.stderr) - except FileNotFoundError: - raise FFmpegNotFoundError( - f'The ffmpeg command "{self.ffmpeg_path}" not found' - ) - except subprocess.CalledProcessError as ex: - raise FFmpegCalledProcessError(ex) from ex - - def probe_format_and_streams(self, video_path: Path) -> ProbeOutput: + Raises: + FFmpegNotFoundError: If ffprobe binary is not found + FFmpegCalledProcessError: If ffprobe command fails + RuntimeError: If output cannot be decoded or parsed as JSON + """ cmd: list[str] = [ "-hide_banner", "-show_format", @@ -154,47 +117,79 @@ def probe_format_and_streams(self, video_path: Path) -> ProbeOutput: ] return T.cast(ProbeOutput, self._run_ffprobe_json(cmd)) - def extract_frames( + def extract_frames_by_interval( self, video_path: Path, sample_dir: Path, sample_interval: float, - stream_idx: int | None = None, + stream_specifier: int | str = "v", ) -> None: """ - Extract frames by the sample interval from the specified video stream. - - stream_idx: the stream_index specifier to a **video stream**. If it's None, defaults to "v". See http://ffmpeg.org/ffmpeg.html#Stream-specifiers-1 + Extract frames from video at regular time intervals using fps filter. + + Args: + video_path: Path to input video file + sample_dir: Directory where extracted frame images will be saved + sample_interval: Time interval between extracted frames in seconds + stream_specifier: Stream specifier to target specific stream(s). + Can be an integer (stream index) or "v" (all video streams) + See https://ffmpeg.org/ffmpeg.html#Stream-specifiers-1 + + Raises: + FFmpegNotFoundError: If ffmpeg binary is not found + FFmpegCalledProcessError: If ffmpeg command fails """ + self._validate_stream_specifier(stream_specifier) + sample_prefix = sample_dir.joinpath(video_path.stem) - if stream_idx is not None: - stream_selector = ["-map", f"0:{stream_idx}"] - output_template = f"{sample_prefix}_{stream_idx}_%06d{FRAME_EXT}" - else: - stream_selector = [] - output_template = f"{sample_prefix}_{NA_STREAM_IDX}_%06d{FRAME_EXT}" + stream_selector = ["-map", f"0:{stream_specifier}"] + output_template = f"{sample_prefix}_{stream_specifier}_%06d{self.FRAME_EXT}" cmd: list[str] = [ - # global options should be specified first - *["-hide_banner", "-nostdin"], - # input 0 + # Global options should be specified first + *["-hide_banner"], + # Input 0 *["-i", str(video_path)], - # select stream + # Select stream *stream_selector, - # filter videos + # Filter videos *["-vf", f"fps=1/{sample_interval}"], - # video quality level (or the alias -q:v) - *["-qscale:v", "2"], + # Video quality level (or the alias -q:v) # -q:v=1 is the best quality but larger image sizes # see https://stackoverflow.com/a/10234065 # *["-qscale:v", "1", "-qmin", "1"], - # output + *["-qscale:v", "2"], + # Output output_template, ] - self._run_ffmpeg(cmd) + self.run_ffmpeg_non_interactive(cmd) + + @classmethod + def generate_binary_search(cls, sorted_frame_indices: list[int]) -> str: + """ + Generate a binary search expression for ffmpeg select filter. + + Creates an optimized filter expression that uses binary search logic + to efficiently select specific frame numbers from a video stream. + + Args: + sorted_frame_indices: List of frame numbers to select, must be sorted in ascending order + + Returns: + FFmpeg filter expression string using binary search logic + + Examples: + >>> FFMPEG.generate_binary_search([]) + '0' + >>> FFMPEG.generate_binary_search([1]) + 'eq(n\\\\,1)' + >>> FFMPEG.generate_binary_search([1, 2]) + 'if(lt(n\\\\,2)\\\\,eq(n\\\\,1)\\\\,eq(n\\\\,2))' + >>> FFMPEG.generate_binary_search([1, 2, 3]) + 'if(lt(n\\\\,2)\\\\,eq(n\\\\,1)\\\\,if(lt(n\\\\,3)\\\\,eq(n\\\\,2)\\\\,eq(n\\\\,3)))' + """ - def generate_binary_search(self, sorted_frame_indices: list[int]) -> str: length = len(sorted_frame_indices) if length == 0: @@ -203,37 +198,50 @@ def generate_binary_search(self, sorted_frame_indices: list[int]) -> str: if length == 1: return f"eq(n\\,{sorted_frame_indices[0]})" - middle = length // 2 - return f"if(lt(n\\,{sorted_frame_indices[middle]})\\,{self.generate_binary_search(sorted_frame_indices[:middle])}\\,{self.generate_binary_search(sorted_frame_indices[middle:])})" + middle_idx = length // 2 + left = cls.generate_binary_search(sorted_frame_indices[:middle_idx]) + right = cls.generate_binary_search(sorted_frame_indices[middle_idx:]) + + return f"if(lt(n\\,{sorted_frame_indices[middle_idx]})\\,{left}\\,{right})" def extract_specified_frames( self, video_path: Path, sample_dir: Path, frame_indices: set[int], - stream_idx: int | None = None, + stream_specifier: int | str = "v", ) -> None: """ - Extract specified frames from the specified video stream. - - stream_idx: the stream_index specifier to a **video stream**. If it's None, defaults to "v". See http://ffmpeg.org/ffmpeg.html#Stream-specifiers-1 + Extract specific frames from video by frame number using select filter. + + Uses a binary search filter expression to efficiently select only the + specified frame numbers from the video stream. + + Args: + video_path: Path to input video file + sample_dir: Directory where extracted frame images will be saved + frame_indices: Set of specific frame numbers to extract (0-based) + stream_specifier: Stream specifier to target specific stream(s). + Can be an integer (stream index) or "v" (all video streams) + See https://ffmpeg.org/ffmpeg.html#Stream-specifiers-1 + + Raises: + FFmpegNotFoundError: If ffmpeg binary is not found + FFmpegCalledProcessError: If ffmpeg command fails + + Note: + Frame indices are 0-based but ffmpeg output files are numbered starting from 1. + Creates temporary filter script file on Windows to avoid command line length limits. """ + self._validate_stream_specifier(stream_specifier) + if not frame_indices: return sample_prefix = sample_dir.joinpath(video_path.stem) - if stream_idx is not None: - stream_selector = ["-map", f"0:{stream_idx}"] - output_template = f"{sample_prefix}_{stream_idx}_%06d{FRAME_EXT}" - else: - stream_selector = [] - output_template = f"{sample_prefix}_{NA_STREAM_IDX}_%06d{FRAME_EXT}" - - # Write the select filter to a temp file because: - # The select filter could be large and - # the maximum command line length for the CreateProcess function is 32767 characters - # https://devblogs.microsoft.com/oldnewthing/20031210-00/?p=41553 + stream_selector = ["-map", f"0:{stream_specifier}"] + output_template = f"{sample_prefix}_{stream_specifier}_%06d{self.FRAME_EXT}" eqs = self.generate_binary_search(sorted(frame_indices)) @@ -243,6 +251,10 @@ def extract_specified_frames( else: delete = True + # Write the select filter to a temp file because: + # The select filter could be large and + # the maximum command line length for the CreateProcess function is 32767 characters + # https://devblogs.microsoft.com/oldnewthing/20031210-00/?p=41553 with tempfile.NamedTemporaryFile(mode="w+", delete=delete) as select_file: try: select_file.write(f"select={eqs}") @@ -251,13 +263,13 @@ def extract_specified_frames( if not delete: select_file.close() cmd: list[str] = [ - # global options should be specified first - *["-hide_banner", "-nostdin"], - # input 0 + # Global options should be specified first + *["-hide_banner"], + # Input 0 *["-i", str(video_path)], - # select stream + # Select stream *stream_selector, - # filter videos + # Filter videos *[ *["-filter_script:v", select_file.name], # Each frame is passed with its timestamp from the demuxer to the muxer @@ -274,15 +286,15 @@ def extract_specified_frames( # If set to 1, expand the filename with pts from pkt->pts. Default value is 0. # *["-frame_pts", "1"], ], - # video quality level (or the alias -q:v) - *["-qscale:v", "2"], + # Video quality level (or the alias -q:v) # -q:v=1 is the best quality but larger image sizes # see https://stackoverflow.com/a/10234065 # *["-qscale:v", "1", "-qmin", "1"], + *["-qscale:v", "2"], # output output_template, ] - self._run_ffmpeg(cmd) + self.run_ffmpeg_non_interactive(cmd) finally: if not delete: try: @@ -290,45 +302,286 @@ def extract_specified_frames( except FileNotFoundError: pass + @classmethod + def sort_selected_samples( + cls, + sample_dir: Path, + video_path: Path, + selected_stream_specifiers: list[int | str] | None = None, + ) -> list[tuple[int, list[Path | None]]]: + """ + Group extracted frame samples by frame index across multiple streams. + + Groups frames so that the Nth group contains all frames from the selected + streams at frame index N, allowing synchronized access to multi-stream frames. + + Args: + sample_dir: Directory containing extracted frame files + video_path: Original video file path (used to match frame filenames) + selected_stream_specifiers: List of stream specifiers to include in output. + Can contain integers (stream indices) or "v" (all video streams). + If None, defaults to ["v"] + + Returns: + List of tuples where each tuple contains: + - frame_idx (int): The frame index + - sample_paths (list[Path | None]): Paths to frame files from each selected stream, + or None if no frame exists for that stream at this index + + Note: + Output is sorted by frame index in ascending order. + """ + if selected_stream_specifiers is None: + selected_stream_specifiers = ["v"] + + for stream_specifier in selected_stream_specifiers: + cls._validate_stream_specifier(stream_specifier) + + stream_samples: dict[int, list[tuple[str, Path]]] = {} + for stream_specifier, frame_idx, sample_path in cls.iterate_samples( + sample_dir, video_path + ): + stream_samples.setdefault(frame_idx, []).append( + (str(stream_specifier), sample_path) + ) + + selected: list[tuple[int, list[Path | None]]] = [] + for frame_idx in sorted(stream_samples.keys()): + indexed_by_specifier = { + specifier: sample_path + for specifier, sample_path in stream_samples[frame_idx] + } + selected_sample_paths = [ + indexed_by_specifier.get(str(specifier)) + for specifier in selected_stream_specifiers + ] + selected.append((frame_idx, selected_sample_paths)) + return selected + + @classmethod + def iterate_samples( + cls, sample_dir: Path, video_path: Path + ) -> T.Generator[tuple[str, int, Path], None, None]: + """ + Iterate over all extracted frame samples in a directory. + + Searches for frame files matching the expected naming pattern and yields + information about each frame including stream specifier, frame index, and file path. + + Args: + sample_dir: Directory containing extracted frame files + video_path: Original video file path (used to match frame filenames) + + Yields: + Tuple containing: + - stream_specifier (str): Stream specifier (number or "v") + - frame_idx (int): Frame index (0-based or 1-based depending on extraction method) + - sample_path (Path): Path to the frame image file + + Note: + Expected filename pattern: {video_stem}_{stream_specifier}_{frame_idx:06d}.jpg + where stream_specifier can be a number or "v" for video streams. + """ + sample_basename_pattern = re.compile( + rf""" + ^{re.escape(video_path.stem)} # Match the video stem + _(?P\d+|v) # Stream specifier can be a number or "v" + _(?P\d+)$ # Frame index, can be 0-padded + """, + re.X, + ) + for sample_path in sample_dir.iterdir(): + result = cls._extract_stream_frame_idx( + sample_path.name, sample_basename_pattern + ) + if result is not None: + stream_specifier, frame_idx = result + yield (stream_specifier, frame_idx, sample_path) + + def run_ffmpeg_non_interactive(self, cmd: list[str]) -> None: + """ + Execute ffmpeg command in non-interactive mode. + + Runs ffmpeg with the given command arguments, automatically adding + the -nostdin flag to prevent interactive prompts. + + Args: + cmd: List of command line arguments to pass to ffmpeg + + Raises: + FFmpegNotFoundError: If ffmpeg binary is not found + FFmpegCalledProcessError: If ffmpeg command fails + """ + full_cmd: list[str] = [self.ffmpeg_path, "-nostdin", *cmd] + LOG.info(f"Running ffmpeg: {' '.join(full_cmd)}") + try: + subprocess.run(full_cmd, check=True, stderr=self.stderr) + except FileNotFoundError: + raise FFmpegNotFoundError( + f'The ffmpeg command "{self.ffmpeg_path}" not found' + ) + except subprocess.CalledProcessError as ex: + raise FFmpegCalledProcessError(ex) from ex + + @classmethod + def _extract_stream_frame_idx( + cls, sample_basename: str, pattern: T.Pattern[str] + ) -> tuple[str, int] | None: + """ + Extract stream specifier and frame index from sample basename + + Returns: + If returning None, it means the basename does not match the pattern + + Examples: + * basename GX010001_v_000000.jpg will extract ("v", 0) + * basename GX010001_1_000002.jpg will extract ("1", 2) + """ + image_no_ext, ext = os.path.splitext(sample_basename) + if ext.lower() != cls.FRAME_EXT.lower(): + return None + + match = pattern.match(image_no_ext) + if not match: + return None + + stream_specifier = match.group("stream_specifier") + + # Convert 0-padded numbers to int + # e.g. 000000 -> 0 + # e.g. 000001 -> 1 + frame_idx_str = match.group("frame_idx") + frame_idx_str = frame_idx_str.lstrip("0") or "0" + + try: + frame_idx = int(frame_idx_str) + except ValueError: + return None + + return stream_specifier, frame_idx + + def _run_ffprobe_json(self, cmd: list[str]) -> dict: + full_cmd: list[str] = [self.ffprobe_path, "-print_format", "json", *cmd] + LOG.info(f"Extracting video information: {' '.join(full_cmd)}") + try: + completed = subprocess.run( + full_cmd, check=True, stdout=subprocess.PIPE, stderr=self.stderr + ) + except FileNotFoundError: + raise FFmpegNotFoundError( + f'The ffprobe command "{self.ffprobe_path}" not found' + ) + except subprocess.CalledProcessError as ex: + raise FFmpegCalledProcessError(ex) from ex + + try: + stdout = completed.stdout.decode("utf-8") + except UnicodeDecodeError: + raise RuntimeError( + f"Error decoding ffprobe output as unicode: {_truncate_end(str(completed.stdout))}" + ) + + try: + output = json.loads(stdout) + except json.JSONDecodeError: + raise RuntimeError( + f"Error JSON decoding ffprobe output: {_truncate_end(stdout)}" + ) + + # This check is for macOS: + # ffprobe -hide_banner -print_format json not_exists + # you will get exit code == 0 with the following stdout and stderr: + # { + # } + # not_exists: No such file or directory + if not output: + raise RuntimeError( + f"Empty JSON ffprobe output with STDERR: {_truncate_begin(str(completed.stderr))}" + ) + + return output + + @classmethod + def _validate_stream_specifier(cls, stream_specifier: int | str) -> None: + if isinstance(stream_specifier, str): + if stream_specifier in ["v"]: + pass + else: + try: + int(stream_specifier) + except ValueError: + raise ValueError(f"Invalid stream specifier: {stream_specifier}") + class Probe: - probe: ProbeOutput + probe_output: ProbeOutput + + def __init__(self, probe_output: ProbeOutput) -> None: + """ + Initialize Probe with ffprobe output data. - def __init__(self, probe: ProbeOutput) -> None: - self.probe = probe + Args: + probe_output: Dictionary containing streams and format information from ffprobe + """ + self.probe_output = probe_output def probe_video_start_time(self) -> datetime.datetime | None: """ - Find video start time of the given video. - It searches video creation time and duration in video streams first and then the other streams. - Once found, return stream creation time - stream duration as the video start time. + Determine the start time of the video by analyzing stream metadata. + + Searches for creation time and duration information in video streams first, + then falls back to other stream types. Calculates start time as: + creation_time - duration + + Returns: + Video start time as datetime object, or None if cannot be determined + + Note: + Prioritizes video streams with highest resolution when multiple exist. """ - streams = self.probe.get("streams", []) + streams = self.probe_output.get("streams", []) - # search start time from video streams + # Search start time from video streams video_streams = self.probe_video_streams() video_streams.sort( key=lambda s: s.get("width", 0) * s.get("height", 0), reverse=True ) for stream in video_streams: - start_time = extract_stream_start_time(stream) + start_time = self.extract_stream_start_time(stream) if start_time is not None: return start_time - # search start time from the other streams + # Search start time from the other streams for stream in streams: if stream.get("codec_type") != "video": - start_time = extract_stream_start_time(stream) + start_time = self.extract_stream_start_time(stream) if start_time is not None: return start_time return None def probe_video_streams(self) -> list[Stream]: - streams = self.probe.get("streams", []) + """ + Extract all video streams from the probe output. + + Returns: + List of video stream dictionaries containing metadata like codec, + dimensions, frame rate, etc. + """ + streams = self.probe_output.get("streams", []) return [stream for stream in streams if stream.get("codec_type") == "video"] def probe_video_with_max_resolution(self) -> Stream | None: + """ + Find the video stream with the highest resolution. + + Sorts all video streams by width × height and returns the one with + the largest resolution. + + Returns: + Stream dictionary for the highest resolution video stream, + or None if no video streams exist + """ video_streams = self.probe_video_streams() video_streams.sort( key=lambda s: s.get("width", 0) * s.get("height", 0), reverse=True @@ -337,112 +590,37 @@ def probe_video_with_max_resolution(self) -> Stream | None: return None return video_streams[0] + @classmethod + def extract_stream_start_time(cls, stream: Stream) -> datetime.datetime | None: + """ + Calculate the start time of a specific stream. -def extract_stream_start_time(stream: Stream) -> datetime.datetime | None: - """ - Find the start time of the given stream. - Start time is the creation time of the stream minus the duration of the stream. - """ - duration_str = stream.get("duration") - LOG.debug("Extracted video duration: %s", duration_str) - if duration_str is None: - return None - duration = float(duration_str) - - creation_time_str = stream.get("tags", {}).get("creation_time") - LOG.debug("Extracted video creation time: %s", creation_time_str) - if creation_time_str is None: - return None - try: - creation_time = datetime.datetime.fromisoformat(creation_time_str) - except ValueError: - creation_time = datetime.datetime.strptime( - creation_time_str, "%Y-%m-%dT%H:%M:%S.%f%z" - ) - return creation_time - datetime.timedelta(seconds=duration) - - -def _extract_stream_frame_idx( - sample_basename: str, - sample_basename_pattern: T.Pattern[str], -) -> tuple[int | None, int] | None: - """ - extract stream id and frame index from sample basename - e.g. basename GX010001_NA_000000.jpg will extract (None, 0) - e.g. basename GX010001_1_000002.jpg will extract (1, 2) - If returning None, it means the basename does not match the pattern - """ - image_no_ext, ext = os.path.splitext(sample_basename) - if ext.lower() != FRAME_EXT.lower(): - return None - - match = sample_basename_pattern.match(image_no_ext) - if not match: - return None + Determines start time by subtracting stream duration from creation time: + start_time = creation_time - duration - g1 = match.group("stream_idx") - try: - if g1 == NA_STREAM_IDX: - stream_idx = None - else: - stream_idx = int(g1) - except ValueError: - return None + Args: + stream: Stream dictionary containing metadata including tags and duration - # convert 0-padded numbers to int - # e.g. 000000 -> 0 - # e.g. 000001 -> 1 - g2 = match.group("frame_idx") - g2 = g2.lstrip("0") or "0" + Returns: + Stream start time as datetime object, or None if required metadata is missing - try: - frame_idx = int(g2) - except ValueError: - return None + Note: + Handles multiple datetime formats including ISO format and custom patterns. + """ + duration_str = stream.get("duration") + LOG.debug("Extracted video duration: %s", duration_str) + if duration_str is None: + return None + duration = float(duration_str) - return stream_idx, frame_idx - - -def iterate_samples( - sample_dir: Path, video_path: Path -) -> T.Generator[tuple[int | None, int, Path], None, None]: - """ - Search all samples in the sample_dir, - and return a generator of the tuple: (stream ID, frame index, sample path). - The frame index could be 0-based or 1-based depending on how it's sampled. - """ - sample_basename_pattern = re.compile( - rf"^{re.escape(video_path.stem)}_(?P\d+|{re.escape(NA_STREAM_IDX)})_(?P\d+)$" - ) - for sample_path in sample_dir.iterdir(): - stream_frame_idx = _extract_stream_frame_idx( - sample_path.name, - sample_basename_pattern, - ) - if stream_frame_idx is not None: - stream_idx, frame_idx = stream_frame_idx - yield (stream_idx, frame_idx, sample_path) - - -def sort_selected_samples( - sample_dir: Path, video_path: Path, selected_stream_indices: list[int | None] -) -> list[tuple[int, list[Path | None]]]: - """ - Group frames by frame index, so that - the Nth group contains all the frames from the selected streams at frame index N. - """ - stream_samples: dict[int, list[tuple[int | None, Path]]] = {} - for stream_idx, frame_idx, sample_path in iterate_samples(sample_dir, video_path): - stream_samples.setdefault(frame_idx, []).append((stream_idx, sample_path)) - - selected: list[tuple[int, list[Path | None]]] = [] - for frame_idx in sorted(stream_samples.keys()): - indexed = { - stream_idx: sample_path - for stream_idx, sample_path in stream_samples[frame_idx] - } - selected_sample_paths = [ - indexed.get(stream_idx) for stream_idx in selected_stream_indices - ] - selected.append((frame_idx, selected_sample_paths)) - return selected + creation_time_str = stream.get("tags", {}).get("creation_time") + LOG.debug("Extracted video creation time: %s", creation_time_str) + if creation_time_str is None: + return None + try: + creation_time = datetime.datetime.fromisoformat(creation_time_str) + except ValueError: + creation_time = datetime.datetime.strptime( + creation_time_str, "%Y-%m-%dT%H:%M:%S.%f%z" + ) + return creation_time - datetime.timedelta(seconds=duration) diff --git a/mapillary_tools/sample_video.py b/mapillary_tools/sample_video.py index 414f60a69..3053e8bea 100644 --- a/mapillary_tools/sample_video.py +++ b/mapillary_tools/sample_video.py @@ -194,8 +194,8 @@ def _sample_single_video_by_interval( ) with wip_dir_context(wip_sample_dir(sample_dir), sample_dir) as wip_dir: - ffmpeg.extract_frames(video_path, wip_dir, sample_interval) - frame_samples = ffmpeglib.sort_selected_samples(wip_dir, video_path, [None]) + ffmpeg.extract_frames_by_interval(video_path, wip_dir, sample_interval) + frame_samples = ffmpeglib.FFMPEG.sort_selected_samples(wip_dir, video_path) for frame_idx_1based, sample_paths in frame_samples: assert len(sample_paths) == 1 if sample_paths[0] is None: @@ -322,11 +322,11 @@ def _sample_single_video_by_distance( video_path, wip_dir, frame_indices=set(sorted_sample_indices), - stream_idx=video_stream_idx, + stream_specifier=str(video_stream_idx), ) - frame_samples = ffmpeglib.sort_selected_samples( - wip_dir, video_path, [video_stream_idx] + frame_samples = ffmpeglib.FFMPEG.sort_selected_samples( + wip_dir, video_path, selected_stream_specifiers=[str(video_stream_idx)] ) if len(frame_samples) != len(sorted_sample_indices): raise exceptions.MapillaryVideoError( diff --git a/pyproject.toml b/pyproject.toml index 63311422e..110fcb4ec 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -81,4 +81,4 @@ module = [ "construct.*", "py.*", ] -ignore_missing_imports = true \ No newline at end of file +ignore_missing_imports = true diff --git a/tests/integration/test_gopro.py b/tests/integration/test_gopro.py index 8fcbbf121..a7609717f 100644 --- a/tests/integration/test_gopro.py +++ b/tests/integration/test_gopro.py @@ -26,7 +26,7 @@ } EXPECTED_DESCS: T.List[T.Any] = [ { - "filename": "hero8.mp4/hero8_NA_000001.jpg", + "filename": "hero8.mp4/hero8_v_000001.jpg", "filetype": "image", "MAPAltitude": 9540.24, "MAPCaptureTime": "2019_11_18_15_41_12_354", @@ -38,10 +38,10 @@ "MAPLongitude": -129.2943386, "MAPDeviceMake": "GoPro", "MAPDeviceModel": "HERO8 Black", - "MAPFilename": "hero8_NA_000001.jpg", + "MAPFilename": "hero8_v_000001.jpg", }, { - "filename": "hero8.mp4/hero8_NA_000002.jpg", + "filename": "hero8.mp4/hero8_v_000002.jpg", "filetype": "image", "MAPAltitude": 7112.573717404068, "MAPCaptureTime": "2019_11_18_15_41_14_354", @@ -53,10 +53,10 @@ "MAPLongitude": -126.85929159704702, "MAPDeviceMake": "GoPro", "MAPDeviceModel": "HERO8 Black", - "MAPFilename": "hero8_NA_000002.jpg", + "MAPFilename": "hero8_v_000002.jpg", }, { - "filename": "hero8.mp4/hero8_NA_000003.jpg", + "filename": "hero8.mp4/hero8_v_000003.jpg", "filetype": "image", "MAPAltitude": 7463.642846094319, "MAPCaptureTime": "2019_11_18_15_41_16_354", @@ -68,10 +68,10 @@ "MAPLongitude": -127.18475264566939, "MAPDeviceMake": "GoPro", "MAPDeviceModel": "HERO8 Black", - "MAPFilename": "hero8_NA_000003.jpg", + "MAPFilename": "hero8_v_000003.jpg", }, { - "filename": "hero8.mp4/hero8_NA_000004.jpg", + "filename": "hero8.mp4/hero8_v_000004.jpg", "filetype": "image", "MAPAltitude": 6909.8168472111465, "MAPCaptureTime": "2019_11_18_15_41_18_354", @@ -83,10 +83,10 @@ "MAPLongitude": -126.65905680405231, "MAPDeviceMake": "GoPro", "MAPDeviceModel": "HERO8 Black", - "MAPFilename": "hero8_NA_000004.jpg", + "MAPFilename": "hero8_v_000004.jpg", }, { - "filename": "hero8.mp4/hero8_NA_000005.jpg", + "filename": "hero8.mp4/hero8_v_000005.jpg", "filetype": "image", "MAPAltitude": 7212.594480737465, "MAPCaptureTime": "2019_11_18_15_41_20_354", @@ -98,10 +98,10 @@ "MAPLongitude": -126.93688762007304, "MAPDeviceMake": "GoPro", "MAPDeviceModel": "HERO8 Black", - "MAPFilename": "hero8_NA_000005.jpg", + "MAPFilename": "hero8_v_000005.jpg", }, { - "filename": "hero8.mp4/hero8_NA_000006.jpg", + "filename": "hero8.mp4/hero8_v_000006.jpg", "filetype": "image", "MAPAltitude": 7274.361994963208, "MAPCaptureTime": "2019_11_18_15_41_22_354", @@ -113,7 +113,7 @@ "MAPLongitude": -126.98833423074615, "MAPDeviceMake": "GoPro", "MAPDeviceModel": "HERO8 Black", - "MAPFilename": "hero8_NA_000006.jpg", + "MAPFilename": "hero8_v_000006.jpg", }, ] diff --git a/tests/integration/test_process_and_upload.py b/tests/integration/test_process_and_upload.py index 8b244eb04..c3d93c5ca 100644 --- a/tests/integration/test_process_and_upload.py +++ b/tests/integration/test_process_and_upload.py @@ -60,7 +60,7 @@ }, }, "gopro": { - "mly_tools_724084a74a44eebd025d0d97a1d5aa30_NA_000001.jpg": { + "mly_tools_724084a74a44eebd025d0d97a1d5aa30_v_000001.jpg": { "MAPAltitude": -22.18, "MAPCaptureTime": "2019_11_18_15_44_47_862", "MAPCompassHeading": {"MagneticHeading": 313.689, "TrueHeading": 313.689}, @@ -71,7 +71,7 @@ "MAPOrientation": 1, "filetype": "image", }, - "mly_tools_724084a74a44eebd025d0d97a1d5aa30_NA_000002.jpg": { + "mly_tools_724084a74a44eebd025d0d97a1d5aa30_v_000002.jpg": { "MAPAltitude": -21.62, "MAPCaptureTime": "2019_11_18_15_44_49_862", "MAPCompassHeading": {"MagneticHeading": 326.179, "TrueHeading": 326.179}, @@ -82,7 +82,7 @@ "MAPOrientation": 1, "filetype": "image", }, - "mly_tools_724084a74a44eebd025d0d97a1d5aa30_NA_000003.jpg": { + "mly_tools_724084a74a44eebd025d0d97a1d5aa30_v_000003.jpg": { "MAPAltitude": -21.896, "MAPCaptureTime": "2019_11_18_15_44_51_862", "MAPCompassHeading": {"MagneticHeading": 353.178, "TrueHeading": 353.178}, @@ -93,7 +93,7 @@ "MAPOrientation": 1, "filetype": "image", }, - "mly_tools_724084a74a44eebd025d0d97a1d5aa30_NA_000004.jpg": { + "mly_tools_724084a74a44eebd025d0d97a1d5aa30_v_000004.jpg": { "MAPAltitude": -21.997, "MAPCaptureTime": "2019_11_18_15_44_53_862", "MAPCompassHeading": {"MagneticHeading": 334.427, "TrueHeading": 334.427}, @@ -104,7 +104,7 @@ "MAPOrientation": 1, "filetype": "image", }, - "mly_tools_724084a74a44eebd025d0d97a1d5aa30_NA_000005.jpg": { + "mly_tools_724084a74a44eebd025d0d97a1d5aa30_v_000005.jpg": { "MAPAltitude": -22.364, "MAPCaptureTime": "2019_11_18_15_44_55_862", "MAPCompassHeading": {"MagneticHeading": 325.089, "TrueHeading": 325.089}, @@ -115,7 +115,7 @@ "MAPOrientation": 1, "filetype": "image", }, - "mly_tools_724084a74a44eebd025d0d97a1d5aa30_NA_000006.jpg": { + "mly_tools_724084a74a44eebd025d0d97a1d5aa30_v_000006.jpg": { "MAPAltitude": -22.539, "MAPCaptureTime": "2019_11_18_15_44_57_862", "MAPCompassHeading": {"MagneticHeading": 327.867, "TrueHeading": 327.867}, @@ -198,9 +198,9 @@ def test_video_process_and_upload( check=True, ) expected = { - "sample-5s_NA_000001.jpg": { - "filename": "sample-5s_NA_000001.jpg", - "MAPFilename": "sample-5s_NA_000001.jpg", + "sample-5s_v_000001.jpg": { + "filename": "sample-5s_v_000001.jpg", + "MAPFilename": "sample-5s_v_000001.jpg", "MAPAltitude": 94.75, "MAPCaptureTime": "2025_03_14_07_00_00_000", "MAPCompassHeading": { @@ -212,9 +212,9 @@ def test_video_process_and_upload( "MAPOrientation": 1, "filetype": "image", }, - "sample-5s_NA_000002.jpg": { - "filename": "sample-5s_NA_000002.jpg", - "MAPFilename": "sample-5s_NA_000002.jpg", + "sample-5s_v_000002.jpg": { + "filename": "sample-5s_v_000002.jpg", + "MAPFilename": "sample-5s_v_000002.jpg", "MAPAltitude": 93.347, "MAPCaptureTime": "2025_03_14_07_00_02_000", "MAPCompassHeading": { @@ -226,9 +226,9 @@ def test_video_process_and_upload( "MAPOrientation": 1, "filetype": "image", }, - "sample-5s_NA_000003.jpg": { - "filename": "sample-5s_NA_000003.jpg", - "MAPFilename": "sample-5s_NA_000003.jpg", + "sample-5s_v_000003.jpg": { + "filename": "sample-5s_v_000003.jpg", + "MAPFilename": "sample-5s_v_000003.jpg", "MAPAltitude": 92.492, "MAPCaptureTime": "2025_03_14_07_00_04_000", "MAPCompassHeading": { diff --git a/tests/unit/test_ffmpeg.py b/tests/unit/test_ffmpeg.py index db56d3168..3079e7eac 100644 --- a/tests/unit/test_ffmpeg.py +++ b/tests/unit/test_ffmpeg.py @@ -1,30 +1,171 @@ import datetime -import os import subprocess from pathlib import Path +import py.path + import pytest from mapillary_tools import ffmpeg +from ..integration.fixtures import IS_FFMPEG_INSTALLED, setup_data -def _ffmpeg_installed(): - ffmpeg_path = os.getenv("MAPILLARY_TOOLS_FFMPEG_PATH", "ffmpeg") - ffprobe_path = os.getenv("MAPILLARY_TOOLS_FFPROBE_PATH", "ffprobe") - try: - subprocess.run( - [ffmpeg_path, "-version"], stderr=subprocess.PIPE, stdout=subprocess.PIPE - ) - # In Windows, ffmpeg is installed but ffprobe is not? - subprocess.run( - [ffprobe_path, "-version"], stderr=subprocess.PIPE, stdout=subprocess.PIPE - ) - except FileNotFoundError: - return False - return True +def test_ffmpeg_run_ok(): + if not IS_FFMPEG_INSTALLED: + pytest.skip("ffmpeg not installed") + ff = ffmpeg.FFMPEG() + ff.run_ffmpeg_non_interactive(["-version"]) + + +@pytest.mark.xfail( + reason="ffmpeg run_ffmpeg_non_interactive should raise FFmpegCalledProcessError", + raises=ffmpeg.FFmpegCalledProcessError, +) +def test_ffmpeg_run_raise(): + if not IS_FFMPEG_INSTALLED: + pytest.skip("ffmpeg not installed") + ff = ffmpeg.FFMPEG() + ff.run_ffmpeg_non_interactive(["foo"]) + + +def test_ffmpeg_extract_frames_ok(setup_data: py.path.local): + if not IS_FFMPEG_INSTALLED: + pytest.skip("ffmpeg not installed") + + ff = ffmpeg.FFMPEG() + + video_path = Path(setup_data.join("videos/sample-5s.mp4")) -IS_FFMPEG_INSTALLED = _ffmpeg_installed() + sample_dir = Path(setup_data.join("videos/samples")) + sample_dir.mkdir() + + ff.extract_frames_by_interval(video_path, sample_dir, sample_interval=1) + + results = list(ff.sort_selected_samples(sample_dir, video_path)) + assert len(results) == 6 + for idx, (file_idx, frame_paths) in enumerate(results): + assert idx + 1 == file_idx + assert 1 == len(frame_paths) + assert frame_paths[0] is not None + assert frame_paths[0].exists() + + results = list(ff.sort_selected_samples(sample_dir, video_path, ["0"])) + assert len(results) == 6 + for idx, (file_idx, frame_paths) in enumerate(results): + assert idx + 1 == file_idx + assert 1 == len(frame_paths) + assert frame_paths[0] is None + + +def test_ffmpeg_extract_frames_with_specifier_ok(setup_data: py.path.local): + if not IS_FFMPEG_INSTALLED: + pytest.skip("ffmpeg not installed") + + ff = ffmpeg.FFMPEG() + + video_path = Path(setup_data.join("videos/sample-5s.mp4")) + + sample_dir = Path(setup_data.join("videos/samples")) + sample_dir.mkdir() + + ff.extract_frames_by_interval( + video_path, + sample_dir, + sample_interval=1, + stream_specifier=0, + ) + + results = list(ff.sort_selected_samples(sample_dir, video_path, [0])) + assert len(results) == 6 + for idx, (file_idx, frame_paths) in enumerate(results): + assert idx + 1 == file_idx + assert 1 == len(frame_paths) + assert frame_paths[0] is not None + assert frame_paths[0].exists() + + results = list(ff.sort_selected_samples(sample_dir, video_path, [1])) + assert len(results) == 6 + for idx, (file_idx, frame_paths) in enumerate(results): + assert idx + 1 == file_idx + assert 1 == len(frame_paths) + assert frame_paths[0] is None + + +def test_ffmpeg_extract_specified_frames_ok(setup_data: py.path.local): + if not IS_FFMPEG_INSTALLED: + pytest.skip("ffmpeg not installed") + + ff = ffmpeg.FFMPEG() + + video_path = Path(setup_data.join("videos/sample-5s.mp4")) + + sample_dir = Path(setup_data.join("videos/samples")) + sample_dir.mkdir() + + ff.extract_specified_frames(video_path, sample_dir, frame_indices={2, 9}) + + results = list(ff.sort_selected_samples(sample_dir, video_path)) + assert len(results) == 2 + + for idx, (file_idx, frame_paths) in enumerate(results): + assert idx + 1 == file_idx + assert frame_paths[0] is not None + assert frame_paths[0].exists() + + +def test_ffmpeg_extract_specified_frames_empty_ok(setup_data: py.path.local): + if not IS_FFMPEG_INSTALLED: + pytest.skip("ffmpeg not installed") + + ff = ffmpeg.FFMPEG() + + video_path = Path(setup_data.join("videos/sample-5s.mp4")) + + sample_dir = Path(setup_data.join("videos/samples")) + sample_dir.mkdir() + + ff.extract_specified_frames(video_path, sample_dir, frame_indices=set()) + + results = list(ff.sort_selected_samples(sample_dir, video_path)) + assert len(results) == 0 + + +def test_probe_format_and_streams_ok(setup_data: py.path.local): + if not IS_FFMPEG_INSTALLED: + pytest.skip("ffmpeg not installed") + + video_path = Path(setup_data.join("videos/sample-5s.mp4")) + + ff = ffmpeg.FFMPEG() + probe_output = ff.probe_format_and_streams(video_path) + probe = ffmpeg.Probe(probe_output) + + start_time = probe.probe_video_start_time() + assert start_time is None + max_stream = probe.probe_video_with_max_resolution() + assert max_stream is not None + assert max_stream["index"] == 0 + assert max_stream["codec_type"] == "video" + + +def test_probe_format_and_streams_gopro_ok(setup_data: py.path.local): + if not IS_FFMPEG_INSTALLED: + pytest.skip("ffmpeg not installed") + + video_path = Path(setup_data.join("gopro_data/hero8.mp4")) + + ff = ffmpeg.FFMPEG() + probe_output = ff.probe_format_and_streams(video_path) + probe = ffmpeg.Probe(probe_output) + + start_time = probe.probe_video_start_time() + assert start_time is not None + assert datetime.datetime.isoformat(start_time) == "2019-11-18T15:41:12.354033+00:00" + max_stream = probe.probe_video_with_max_resolution() + assert max_stream is not None + assert max_stream["index"] == 0 + assert max_stream["codec_type"] == "video" def test_ffmpeg_not_exists(): @@ -33,7 +174,9 @@ def test_ffmpeg_not_exists(): ff = ffmpeg.FFMPEG() try: - ff.extract_frames(Path("not_exist_a"), Path("not_exist_b"), sample_interval=2) + ff.extract_frames_by_interval( + Path("not_exist_a"), Path("not_exist_b"), sample_interval=2 + ) except ffmpeg.FFmpegCalledProcessError as ex: assert "STDERR:" not in str(ex) else: @@ -41,7 +184,9 @@ def test_ffmpeg_not_exists(): ff = ffmpeg.FFMPEG(stderr=subprocess.PIPE) try: - ff.extract_frames(Path("not_exist_a"), Path("not_exist_b"), sample_interval=2) + ff.extract_frames_by_interval( + Path("not_exist_a"), Path("not_exist_b"), sample_interval=2 + ) except ffmpeg.FFmpegCalledProcessError as ex: assert "STDERR:" in str(ex) else: diff --git a/tests/unit/test_sample_video.py b/tests/unit/test_sample_video.py index 6e0d416d1..c575c3ec5 100644 --- a/tests/unit/test_sample_video.py +++ b/tests/unit/test_sample_video.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import datetime import json import os @@ -15,12 +17,12 @@ class MOCK_FFMPEG(ffmpeg.FFMPEG): - def extract_frames( + def extract_frames_by_interval( self, video_path: Path, sample_path: Path, video_sample_interval: float, - stream_idx: T.Optional[int] = None, + stream_specifier: int | str = "v", ): probe = self.probe_format_and_streams(video_path) video_streams = [ @@ -31,10 +33,7 @@ def extract_frames( frame_path_prefix = os.path.join(sample_path, video_basename_no_ext) src = os.path.join(_PWD, "data/test_exif.jpg") for idx in range(0, int(duration / video_sample_interval)): - if stream_idx is None: - sample = f"{frame_path_prefix}_NA_{idx + 1:06d}.jpg" - else: - sample = f"{frame_path_prefix}_{stream_idx}_{idx + 1:06d}.jpg" + sample = f"{frame_path_prefix}_{stream_specifier}_{idx + 1:06d}.jpg" shutil.copyfile(src, sample) def probe_format_and_streams(self, video_path: Path) -> ffmpeg.ProbeOutput: @@ -50,7 +49,7 @@ def setup_mock(monkeypatch): def _validate_interval(samples: T.Sequence[Path], video_start_time): assert len(samples), "expect samples but got none" for idx, sample in enumerate(sorted(samples)): - assert sample.name == f"hello_NA_{idx + 1:06d}.jpg" + assert sample.name == f"hello_v_{idx + 1:06d}.jpg" exif = exif_read.ExifRead(sample) expected_dt = video_start_time + datetime.timedelta(seconds=2 * idx) assert exif.extract_capture_time() == expected_dt