From 2af644ae00936647206954bb5b4c726c5b32e3b8 Mon Sep 17 00:00:00 2001 From: "Cain, Daniel" Date: Wed, 3 Jul 2024 17:05:28 -0700 Subject: [PATCH 1/5] Added ogg opus wrapper class --- pyogg/ogg_opus_wrapper.py | 404 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 404 insertions(+) create mode 100644 pyogg/ogg_opus_wrapper.py diff --git a/pyogg/ogg_opus_wrapper.py b/pyogg/ogg_opus_wrapper.py new file mode 100644 index 0000000..b7eef3e --- /dev/null +++ b/pyogg/ogg_opus_wrapper.py @@ -0,0 +1,404 @@ +import builtins +import copy +import ctypes +import random +import struct +from io import BytesIO +from typing import ( + Optional, + Union, + BinaryIO +) + +from . import ogg +from . import opus +from .opus_buffered_encoder import OpusBufferedEncoder +#from .opus_encoder import OpusEncoder +from .pyogg_error import PyOggError + +class OggOpusWrapper(): + """Encodes PCM data into an OggOpus file.""" + + def __init__(self, + custom_pre_skip: Optional[int] = None) -> None: + """Construct an OggOpusWrapper. + + f may be either a string giving the path to the file, or + an already-opened file handle. + + If f is an already-opened file handle, then it is the + user's responsibility to close the file when they are + finished with it. The file should be opened for writing + in binary (not text) mode. + + The encoder should be a + OpusBufferedEncoder and should be fully configured before the + first call to the `write()` method. + + The Opus encoder requires an amount of "warm up" and when + stored in an Ogg container that warm up can be skipped. When + `custom_pre_skip` is None, the required amount of warm up + silence is automatically calculated and inserted. If a custom + (non-silent) pre-skip is desired, then `custom_pre_skip` + should be specified as the number of samples (per channel). + It is then the user's responsibility to pass the non-silent + pre-skip samples to `encode()`. + + """ + # Store the custom pre skip + self._custom_pre_skip = custom_pre_skip + + # Create a new stream state with a random serial number + self._stream_state = self._create_stream_state() + + # Create a packet (reused for each pass) + self._ogg_packet = ogg.ogg_packet() + self._packet_valid = False + + # Create a page (reused for each pass) + self._ogg_page = ogg.ogg_page() + + # Counter for the number of packets written into Ogg stream + self._count_packets = 0 + + # Counter for the number of samples encoded into Opus + # packets + self._count_samples = 0 + + self._channels = 2 + + # Flag to indicate if the headers have been written + self._headers_written = False + + # Flag to indicate that the stream has been finished (the + # EOS bit was set in a final packet) + self._finished = False + + # Reference to the current encoded packet (written only + # when we know if it the last) + self._current_encoded_packet: Optional[bytes] = None + self._file = BytesIO() + + def __del__(self) -> None: + if not self._finished: + self.close() + + def set_channels(self, channels: int): + self._channels = channels + + # + # User visible methods + # + + def read(self, n_bytes: Optional[int] = None) -> bytes: + return self._file.read(n_bytes) + + def write(self, pcm: memoryview) -> None: + """Encode the PCM and write out the Ogg Opus stream. + + Encoders the PCM using the provided encoder. + + """ + # Check that the stream hasn't already been finished + if self._finished: + raise PyOggError( + "Stream has already ended. Perhaps close() was "+ + "called too early?") + + # If we haven't already written out the headers, do so + # now. Then, write a frame of silence to warm up the + # encoder. + if not self._headers_written: + pre_skip = self._write_headers(self._custom_pre_skip) + if self._custom_pre_skip is None: + self._write_silence(pre_skip) + + # Call the internal method to encode the bytes + self._write_to_oggopus(pcm) + + + def _write_to_oggopus(self, encoded_packet: memoryview) -> None: + # Cast memoryview to ctypes Array + Buffer = ctypes.c_ubyte * len(encoded_packet) + encoded_packet_ctypes = Buffer.from_buffer(encoded_packet) + + # Obtain a pointer to the encoded packet + encoded_packet_ptr = ctypes.cast( + encoded_packet_ctypes, + ctypes.POINTER(ctypes.c_ubyte) + ) + + # Increase the count of the number of samples written + self._count_samples += len(encoded_packet) + + # Place data into the packet + self._ogg_packet.packet = encoded_packet_ptr + self._ogg_packet.bytes = len(encoded_packet) + self._ogg_packet.b_o_s = int(not self._headers_written) + self._ogg_packet.e_o_s = int(self._finished) + self._ogg_packet.granulepos = self._count_samples + self._ogg_packet.packetno = self._count_packets + + # Increase the counter of the number of packets + # in the stream + self._count_packets += 1 + + # Write the packet into the stream + self._write_packet() + + + def close(self) -> None: + # Check we haven't already closed this stream + if self._finished: + # We're attempting to close an already closed stream, + # do nothing more. + return + + # Flush the underlying buffered encoder + self._write_to_oggopus(memoryview(bytearray(b""))) + + # The current packet must be the end of the stream, update + # the packet's details + self._ogg_packet.e_o_s = 1 + + # Write the packet to the stream + if self._packet_valid: + self._write_packet() + + # Flush the stream of any unwritten pages + self._flush() + + # Mark the stream as finished + self._finished = True + + # Close the file if we opened it + if self._i_opened_the_file: + self._file.close() + self._i_opened_the_file = False + + # Clean up the Ogg-related memory + ogg.ogg_stream_clear(self._stream_state) + + # Clean up the reference to the encoded packet (as it must + # now have been written) + del self._current_encoded_packet + + # + # Internal methods + # + + def _create_random_serial_no(self) -> ctypes.c_int: + sizeof_c_int = ctypes.sizeof(ctypes.c_int) + min_int = -2**(sizeof_c_int*8-1) + max_int = 2**(sizeof_c_int*8-1)-1 + serial_no = ctypes.c_int(random.randint(min_int, max_int)) + + return serial_no + + def _create_stream_state(self) -> ogg.ogg_stream_state: + # Create a random serial number + serial_no = self._create_random_serial_no() + + # Create an ogg_stream_state + ogg_stream_state = ogg.ogg_stream_state() + + # Initialise the stream state + ogg.ogg_stream_init( + ctypes.pointer(ogg_stream_state), + serial_no + ) + + return ogg_stream_state + + def _make_identification_header(self, pre_skip: int, input_sampling_rate: int = 0) -> bytes: + """Make the OggOpus identification header. + + An input_sampling rate may be set to zero to mean 'unspecified'. + + Only channel mapping family 0 is currently supported. + This allows mono and stereo signals. + + See https://tools.ietf.org/html/rfc7845#page-12 for more + details. + + """ + signature = b"OpusHead" + version = 1 + output_channels = self._channels + output_gain = 0 + channel_mapping_family = 0 + data = struct.pack( + " int: + """ Returns pre-skip. """ + if custom_pre_skip is not None: + # Use the user-specified amount of pre-skip + pre_skip = custom_pre_skip + else: + # Obtain the algorithmic delay of the Opus encoder. See + # https://tools.ietf.org/html/rfc7845#page-27 + delay_samples = self._encoder.get_algorithmic_delay() + + # Extra samples are recommended. See + # https://tools.ietf.org/html/rfc7845#page-27 + extra_samples = 120 + + # We will just fill a whole frame with silence. Calculate + # the minimum frame length, which we'll use as the + # pre-skip. + frame_durations = [2.5, 5, 10, 20, 40, 60] # milliseconds + frame_lengths = [ + x * self._encoder._samples_per_second // 1000 + for x in frame_durations + ] + for frame_length in frame_lengths: + if frame_length > delay_samples + extra_samples: + pre_skip = frame_length + break + + # Create the identification header + id_header = self._make_identification_header( + pre_skip = pre_skip + ) + + # Specify the packet containing the identification header + self._ogg_packet.packet = ctypes.cast(id_header, ogg.c_uchar_p) # type: ignore + self._ogg_packet.bytes = len(id_header) + self._ogg_packet.b_o_s = 1 + self._ogg_packet.e_o_s = 0 + self._ogg_packet.granulepos = 0 + self._ogg_packet.packetno = self._count_packets + self._count_packets += 1 + + # Write the identification header + result = ogg.ogg_stream_packetin( + self._stream_state, + self._ogg_packet + ) + + if result != 0: + raise PyOggError( + "Failed to write Opus identification header" + ) + + return pre_skip + + def _make_comment_header(self): + """Make the OggOpus comment header. + + See https://tools.ietf.org/html/rfc7845#page-22 for more + details. + + """ + signature = b"OpusTags" + vendor_string = b"ENCODER=PyOgg" + vendor_string_length = struct.pack(" Date: Fri, 5 Jul 2024 08:50:34 -0700 Subject: [PATCH 2/5] It looks like this is working based on FFMPEG output --- pyogg/__init__.py | 3 ++ pyogg/ogg_opus_wrapper.py | 71 +++++++++++++++------------------------ 2 files changed, 31 insertions(+), 43 deletions(-) diff --git a/pyogg/__init__.py b/pyogg/__init__.py index a97b0d2..3ce0c41 100644 --- a/pyogg/__init__.py +++ b/pyogg/__init__.py @@ -106,3 +106,6 @@ def __init__(*args, **kw): class FlacFileStream: # type: ignore def __init__(*args, **kw): raise PyOggError("The FLAC libraries weren't found or couldn't be loaded (maybe you're trying to use 64bit libraries with 32bit Python?)") + + +from .ogg_opus_wrapper import OggOpusWrapper diff --git a/pyogg/ogg_opus_wrapper.py b/pyogg/ogg_opus_wrapper.py index b7eef3e..bfeb9b3 100644 --- a/pyogg/ogg_opus_wrapper.py +++ b/pyogg/ogg_opus_wrapper.py @@ -3,6 +3,7 @@ import ctypes import random import struct +from collections import deque from io import BytesIO from typing import ( Optional, @@ -20,6 +21,8 @@ class OggOpusWrapper(): """Encodes PCM data into an OggOpus file.""" def __init__(self, + channels: int = 2, + sampling_freq: int = 48000, custom_pre_skip: Optional[int] = None) -> None: """Construct an OggOpusWrapper. @@ -65,7 +68,8 @@ def __init__(self, # packets self._count_samples = 0 - self._channels = 2 + self._channels = channels + self._sampling_freq = sampling_freq # Flag to indicate if the headers have been written self._headers_written = False @@ -77,21 +81,27 @@ def __init__(self, # Reference to the current encoded packet (written only # when we know if it the last) self._current_encoded_packet: Optional[bytes] = None - self._file = BytesIO() - - def __del__(self) -> None: - if not self._finished: - self.close() + self._pages = [] def set_channels(self, channels: int): self._channels = channels + def set_sampling_freq(self, samples_per_second: int) -> None: + self._sampling_freq = samples_per_second + # # User visible methods # - def read(self, n_bytes: Optional[int] = None) -> bytes: - return self._file.read(n_bytes) + def read(self, n_pages: Optional[int] = None) -> bytes: + if n_pages is None: + n_pages = len(self._pages) + else: + n_pages = min(n_pages, len(self._pages)) + stream_bytes = b"".join(bytes(p) for p in self._pages[:n_pages]) + self._pages = self._pages[n_pages:] + return stream_bytes + def write(self, pcm: memoryview) -> None: """Encode the PCM and write out the Ogg Opus stream. @@ -123,7 +133,7 @@ def _write_to_oggopus(self, encoded_packet: memoryview) -> None: encoded_packet_ctypes = Buffer.from_buffer(encoded_packet) # Obtain a pointer to the encoded packet - encoded_packet_ptr = ctypes.cast( + encoded_packet_ptr = ctypes.cast( encoded_packet_ctypes, ctypes.POINTER(ctypes.c_ubyte) ) @@ -171,11 +181,6 @@ def close(self) -> None: # Mark the stream as finished self._finished = True - # Close the file if we opened it - if self._i_opened_the_file: - self._file.close() - self._i_opened_the_file = False - # Clean up the Ogg-related memory ogg.ogg_stream_clear(self._stream_state) @@ -239,36 +244,14 @@ def _make_identification_header(self, pre_skip: int, input_sampling_rate: int = return signature+data - def _write_identification_header_packet(self, custom_pre_skip: int) -> int: + def _write_identification_header_packet(self, custom_pre_skip: Optional[int] = None) -> int: """ Returns pre-skip. """ - if custom_pre_skip is not None: - # Use the user-specified amount of pre-skip - pre_skip = custom_pre_skip - else: - # Obtain the algorithmic delay of the Opus encoder. See - # https://tools.ietf.org/html/rfc7845#page-27 - delay_samples = self._encoder.get_algorithmic_delay() - - # Extra samples are recommended. See - # https://tools.ietf.org/html/rfc7845#page-27 - extra_samples = 120 - - # We will just fill a whole frame with silence. Calculate - # the minimum frame length, which we'll use as the - # pre-skip. - frame_durations = [2.5, 5, 10, 20, 40, 60] # milliseconds - frame_lengths = [ - x * self._encoder._samples_per_second // 1000 - for x in frame_durations - ] - for frame_length in frame_lengths: - if frame_length > delay_samples + extra_samples: - pre_skip = frame_length - break + pre_skip = custom_pre_skip or 0 # Create the identification header id_header = self._make_identification_header( - pre_skip = pre_skip + pre_skip = pre_skip, + input_sampling_rate = self._sampling_freq ) # Specify the packet containing the identification header @@ -317,7 +300,7 @@ def _write_comment_header_packet(self): comment_header = self._make_comment_header() # Specify the packet containing the identification header - self._ogg_packet.packet = ctypes.cast(comment_header, ogg.c_uchar_p) + self._ogg_packet.packet = ctypes.cast(comment_header, ogg.c_uchar_p) #type: ignore self._ogg_packet.bytes = len(comment_header) self._ogg_packet.b_o_s = 0 self._ogg_packet.e_o_s = 0 @@ -342,11 +325,13 @@ def _write_page(self): # write without issues. HeaderBufferPtr = ctypes.POINTER(ctypes.c_ubyte * self._ogg_page.header_len) header = HeaderBufferPtr(self._ogg_page.header.contents)[0] - self._file.write(header) + # Copy memory since not writing to a file + self._pages.append(bytes((header[i] for i in range(self._ogg_page.header_len)))) BodyBufferPtr = ctypes.POINTER(ctypes.c_ubyte * self._ogg_page.body_len) body = BodyBufferPtr(self._ogg_page.body.contents)[0] - self._file.write(body) + # Copy memory since not writing to a file + self._pages.append(bytes((body[i] for i in range(self._ogg_page.body_len)))) def _flush(self): """ Flush all pages to the file. """ From b6d2d907551a19b4a0b42d0eb3c37061dece769a Mon Sep 17 00:00:00 2001 From: "Cain, Daniel" Date: Fri, 5 Jul 2024 10:03:42 -0700 Subject: [PATCH 3/5] Fixed bug with user comments section --- pyogg/ogg_opus_wrapper.py | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/pyogg/ogg_opus_wrapper.py b/pyogg/ogg_opus_wrapper.py index bfeb9b3..bbeb7a9 100644 --- a/pyogg/ogg_opus_wrapper.py +++ b/pyogg/ogg_opus_wrapper.py @@ -1,3 +1,5 @@ +import inspect + import builtins import copy import ctypes @@ -103,11 +105,8 @@ def read(self, n_pages: Optional[int] = None) -> bytes: return stream_bytes - def write(self, pcm: memoryview) -> None: - """Encode the PCM and write out the Ogg Opus stream. - - Encoders the PCM using the provided encoder. - + def write(self, opus_packet: memoryview) -> None: + """Write the encoded Opus packet out the Ogg Opus stream. """ # Check that the stream hasn't already been finished if self._finished: @@ -120,11 +119,11 @@ def write(self, pcm: memoryview) -> None: # encoder. if not self._headers_written: pre_skip = self._write_headers(self._custom_pre_skip) - if self._custom_pre_skip is None: + if pre_skip > 0: self._write_silence(pre_skip) # Call the internal method to encode the bytes - self._write_to_oggopus(pcm) + self._write_to_oggopus(opus_packet) def _write_to_oggopus(self, encoded_packet: memoryview) -> None: @@ -286,13 +285,20 @@ def _make_comment_header(self): signature = b"OpusTags" vendor_string = b"ENCODER=PyOgg" vendor_string_length = struct.pack(" Date: Mon, 8 Jul 2024 08:26:10 -0700 Subject: [PATCH 4/5] Got stream reset working with ffprobe --- pyogg/ogg_opus_wrapper.py | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/pyogg/ogg_opus_wrapper.py b/pyogg/ogg_opus_wrapper.py index bbeb7a9..4d0dd1e 100644 --- a/pyogg/ogg_opus_wrapper.py +++ b/pyogg/ogg_opus_wrapper.py @@ -53,8 +53,11 @@ def __init__(self, # Store the custom pre skip self._custom_pre_skip = custom_pre_skip - # Create a new stream state with a random serial number - self._stream_state = self._create_stream_state() + # Create a random serial number + self._serial_no = self._create_random_serial_no() + + # Create a new stream state with specified serial number + self._stream_state = self._create_stream_state(self._serial_no) # Create a packet (reused for each pass) self._ogg_packet = ogg.ogg_packet() @@ -143,7 +146,7 @@ def _write_to_oggopus(self, encoded_packet: memoryview) -> None: # Place data into the packet self._ogg_packet.packet = encoded_packet_ptr self._ogg_packet.bytes = len(encoded_packet) - self._ogg_packet.b_o_s = int(not self._headers_written) + self._ogg_packet.b_o_s = 0 self._ogg_packet.e_o_s = int(self._finished) self._ogg_packet.granulepos = self._count_samples self._ogg_packet.packetno = self._count_packets @@ -187,6 +190,12 @@ def close(self) -> None: # now have been written) del self._current_encoded_packet + def reset_stream_state(self): + # Create a new stream state with a random serial number + self._stream_state = self._create_stream_state(self._serial_no) + self._count_packets = 0 + self._count_samples = 0 + # # Internal methods # @@ -199,10 +208,7 @@ def _create_random_serial_no(self) -> ctypes.c_int: return serial_no - def _create_stream_state(self) -> ogg.ogg_stream_state: - # Create a random serial number - serial_no = self._create_random_serial_no() - + def _create_stream_state(self, serial_no: ctypes.c_int) -> ogg.ogg_stream_state: # Create an ogg_stream_state ogg_stream_state = ogg.ogg_stream_state() @@ -344,18 +350,24 @@ def _flush(self): while ogg.ogg_stream_flush( ctypes.pointer(self._stream_state), ctypes.pointer(self._ogg_page)) != 0: + print(f"Flushing packet, BOS {self._ogg_packet.b_o_s}") self._write_page() def _write_headers(self, custom_pre_skip): """ Write the two Opus header packets.""" + print("Writing ID header packet") pre_skip = self._write_identification_header_packet( custom_pre_skip ) + self._flush() + + print("Writing comment header packet") self._write_comment_header_packet() # Store that the headers have been written self._headers_written = True + print("Flusing header packets") # Write out pages to file to ensure that the headers are # the only packets to appear on the first page. If this # is not done, the file cannot be read by the library From d7da27dc8c28771ced4349e77ae81dac343f4ab9 Mon Sep 17 00:00:00 2001 From: "Cain, Daniel" Date: Mon, 8 Jul 2024 11:03:56 -0700 Subject: [PATCH 5/5] Removed inspect debugging. Was unnecessary and too slow --- pyogg/ogg_opus_wrapper.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/pyogg/ogg_opus_wrapper.py b/pyogg/ogg_opus_wrapper.py index 4d0dd1e..f77a9a4 100644 --- a/pyogg/ogg_opus_wrapper.py +++ b/pyogg/ogg_opus_wrapper.py @@ -350,24 +350,24 @@ def _flush(self): while ogg.ogg_stream_flush( ctypes.pointer(self._stream_state), ctypes.pointer(self._ogg_page)) != 0: - print(f"Flushing packet, BOS {self._ogg_packet.b_o_s}") + # print(f"Flushing packet, BOS {self._ogg_packet.b_o_s}") self._write_page() def _write_headers(self, custom_pre_skip): """ Write the two Opus header packets.""" - print("Writing ID header packet") + # print("Writing ID header packet") pre_skip = self._write_identification_header_packet( custom_pre_skip ) self._flush() - print("Writing comment header packet") + # print("Writing comment header packet") self._write_comment_header_packet() # Store that the headers have been written self._headers_written = True - print("Flusing header packets") + # print("Flusing header packets") # Write out pages to file to ensure that the headers are # the only packets to appear on the first page. If this # is not done, the file cannot be read by the library @@ -377,9 +377,6 @@ def _write_headers(self, custom_pre_skip): return pre_skip def _write_packet(self): - current_frame = inspect.currentframe() - call_frames = inspect.getouterframes(current_frame, 2) - parent_funcs = [cf[3] for cf in call_frames] # Place the packet into the stream result = ogg.ogg_stream_packetin( self._stream_state,