diff --git a/pyogg/__init__.py b/pyogg/__init__.py index a97b0d2..3ce0c41 100644 --- a/pyogg/__init__.py +++ b/pyogg/__init__.py @@ -106,3 +106,6 @@ def __init__(*args, **kw): class FlacFileStream: # type: ignore def __init__(*args, **kw): raise PyOggError("The FLAC libraries weren't found or couldn't be loaded (maybe you're trying to use 64bit libraries with 32bit Python?)") + + +from .ogg_opus_wrapper import OggOpusWrapper diff --git a/pyogg/ogg_opus_wrapper.py b/pyogg/ogg_opus_wrapper.py new file mode 100644 index 0000000..f77a9a4 --- /dev/null +++ b/pyogg/ogg_opus_wrapper.py @@ -0,0 +1,407 @@ +import inspect + +import builtins +import copy +import ctypes +import random +import struct +from collections import deque +from io import BytesIO +from typing import ( + Optional, + Union, + BinaryIO +) + +from . import ogg +from . import opus +from .opus_buffered_encoder import OpusBufferedEncoder +#from .opus_encoder import OpusEncoder +from .pyogg_error import PyOggError + +class OggOpusWrapper(): + """Encodes PCM data into an OggOpus file.""" + + def __init__(self, + channels: int = 2, + sampling_freq: int = 48000, + custom_pre_skip: Optional[int] = None) -> None: + """Construct an OggOpusWrapper. + + f may be either a string giving the path to the file, or + an already-opened file handle. + + If f is an already-opened file handle, then it is the + user's responsibility to close the file when they are + finished with it. The file should be opened for writing + in binary (not text) mode. + + The encoder should be a + OpusBufferedEncoder and should be fully configured before the + first call to the `write()` method. + + The Opus encoder requires an amount of "warm up" and when + stored in an Ogg container that warm up can be skipped. When + `custom_pre_skip` is None, the required amount of warm up + silence is automatically calculated and inserted. If a custom + (non-silent) pre-skip is desired, then `custom_pre_skip` + should be specified as the number of samples (per channel). + It is then the user's responsibility to pass the non-silent + pre-skip samples to `encode()`. + + """ + # Store the custom pre skip + self._custom_pre_skip = custom_pre_skip + + # Create a random serial number + self._serial_no = self._create_random_serial_no() + + # Create a new stream state with specified serial number + self._stream_state = self._create_stream_state(self._serial_no) + + # Create a packet (reused for each pass) + self._ogg_packet = ogg.ogg_packet() + self._packet_valid = False + + # Create a page (reused for each pass) + self._ogg_page = ogg.ogg_page() + + # Counter for the number of packets written into Ogg stream + self._count_packets = 0 + + # Counter for the number of samples encoded into Opus + # packets + self._count_samples = 0 + + self._channels = channels + self._sampling_freq = sampling_freq + + # Flag to indicate if the headers have been written + self._headers_written = False + + # Flag to indicate that the stream has been finished (the + # EOS bit was set in a final packet) + self._finished = False + + # Reference to the current encoded packet (written only + # when we know if it the last) + self._current_encoded_packet: Optional[bytes] = None + self._pages = [] + + def set_channels(self, channels: int): + self._channels = channels + + def set_sampling_freq(self, samples_per_second: int) -> None: + self._sampling_freq = samples_per_second + + # + # User visible methods + # + + def read(self, n_pages: Optional[int] = None) -> bytes: + if n_pages is None: + n_pages = len(self._pages) + else: + n_pages = min(n_pages, len(self._pages)) + stream_bytes = b"".join(bytes(p) for p in self._pages[:n_pages]) + self._pages = self._pages[n_pages:] + return stream_bytes + + + def write(self, opus_packet: memoryview) -> None: + """Write the encoded Opus packet out the Ogg Opus stream. + """ + # Check that the stream hasn't already been finished + if self._finished: + raise PyOggError( + "Stream has already ended. Perhaps close() was "+ + "called too early?") + + # If we haven't already written out the headers, do so + # now. Then, write a frame of silence to warm up the + # encoder. + if not self._headers_written: + pre_skip = self._write_headers(self._custom_pre_skip) + if pre_skip > 0: + self._write_silence(pre_skip) + + # Call the internal method to encode the bytes + self._write_to_oggopus(opus_packet) + + + def _write_to_oggopus(self, encoded_packet: memoryview) -> None: + # Cast memoryview to ctypes Array + Buffer = ctypes.c_ubyte * len(encoded_packet) + encoded_packet_ctypes = Buffer.from_buffer(encoded_packet) + + # Obtain a pointer to the encoded packet + encoded_packet_ptr = ctypes.cast( + encoded_packet_ctypes, + ctypes.POINTER(ctypes.c_ubyte) + ) + + # Increase the count of the number of samples written + self._count_samples += len(encoded_packet) + + # Place data into the packet + self._ogg_packet.packet = encoded_packet_ptr + self._ogg_packet.bytes = len(encoded_packet) + self._ogg_packet.b_o_s = 0 + self._ogg_packet.e_o_s = int(self._finished) + self._ogg_packet.granulepos = self._count_samples + self._ogg_packet.packetno = self._count_packets + + # Increase the counter of the number of packets + # in the stream + self._count_packets += 1 + + # Write the packet into the stream + self._write_packet() + + + def close(self) -> None: + # Check we haven't already closed this stream + if self._finished: + # We're attempting to close an already closed stream, + # do nothing more. + return + + # Flush the underlying buffered encoder + self._write_to_oggopus(memoryview(bytearray(b""))) + + # The current packet must be the end of the stream, update + # the packet's details + self._ogg_packet.e_o_s = 1 + + # Write the packet to the stream + if self._packet_valid: + self._write_packet() + + # Flush the stream of any unwritten pages + self._flush() + + # Mark the stream as finished + self._finished = True + + # Clean up the Ogg-related memory + ogg.ogg_stream_clear(self._stream_state) + + # Clean up the reference to the encoded packet (as it must + # now have been written) + del self._current_encoded_packet + + def reset_stream_state(self): + # Create a new stream state with a random serial number + self._stream_state = self._create_stream_state(self._serial_no) + self._count_packets = 0 + self._count_samples = 0 + + # + # Internal methods + # + + def _create_random_serial_no(self) -> ctypes.c_int: + sizeof_c_int = ctypes.sizeof(ctypes.c_int) + min_int = -2**(sizeof_c_int*8-1) + max_int = 2**(sizeof_c_int*8-1)-1 + serial_no = ctypes.c_int(random.randint(min_int, max_int)) + + return serial_no + + def _create_stream_state(self, serial_no: ctypes.c_int) -> ogg.ogg_stream_state: + # Create an ogg_stream_state + ogg_stream_state = ogg.ogg_stream_state() + + # Initialise the stream state + ogg.ogg_stream_init( + ctypes.pointer(ogg_stream_state), + serial_no + ) + + return ogg_stream_state + + def _make_identification_header(self, pre_skip: int, input_sampling_rate: int = 0) -> bytes: + """Make the OggOpus identification header. + + An input_sampling rate may be set to zero to mean 'unspecified'. + + Only channel mapping family 0 is currently supported. + This allows mono and stereo signals. + + See https://tools.ietf.org/html/rfc7845#page-12 for more + details. + + """ + signature = b"OpusHead" + version = 1 + output_channels = self._channels + output_gain = 0 + channel_mapping_family = 0 + data = struct.pack( + " int: + """ Returns pre-skip. """ + pre_skip = custom_pre_skip or 0 + + # Create the identification header + id_header = self._make_identification_header( + pre_skip = pre_skip, + input_sampling_rate = self._sampling_freq + ) + + # Specify the packet containing the identification header + self._ogg_packet.packet = ctypes.cast(id_header, ogg.c_uchar_p) # type: ignore + self._ogg_packet.bytes = len(id_header) + self._ogg_packet.b_o_s = 1 + self._ogg_packet.e_o_s = 0 + self._ogg_packet.granulepos = 0 + self._ogg_packet.packetno = self._count_packets + self._count_packets += 1 + + # Write the identification header + result = ogg.ogg_stream_packetin( + self._stream_state, + self._ogg_packet + ) + + if result != 0: + raise PyOggError( + "Failed to write Opus identification header" + ) + + return pre_skip + + def _make_comment_header(self): + """Make the OggOpus comment header. + + See https://tools.ietf.org/html/rfc7845#page-22 for more + details. + + """ + signature = b"OpusTags" + vendor_string = b"ENCODER=PyOgg" + vendor_string_length = struct.pack("