From 4807a60d6b026180a205112ddf55d24c8e0e96af Mon Sep 17 00:00:00 2001 From: Milos Pesic Date: Thu, 25 Dec 2025 15:24:19 +0100 Subject: [PATCH] Handle RTP silence suppression --- pkg/sip/media_port.go | 237 +++++++++++++++++++--- pkg/sip/media_port_test.go | 405 +++++++++++++++++++++++++++++++++++++ 2 files changed, 612 insertions(+), 30 deletions(-) diff --git a/pkg/sip/media_port.go b/pkg/sip/media_port.go index aea93cc6..759e2747 100644 --- a/pkg/sip/media_port.go +++ b/pkg/sip/media_port.go @@ -221,6 +221,7 @@ type MediaOptions struct { Stats *PortStats EnableJitterBuffer bool NoInputResample bool + EnableGapFilling bool // Generate silence for DTX/silence suppression gaps } func NewMediaPort(tid traceid.ID, log logger.Logger, mon *stats.CallMonitor, opts *MediaOptions, sampleRate int) (*MediaPort, error) { @@ -253,19 +254,31 @@ func NewMediaPortWith(tid traceid.ID, log logger.Logger, mon *stats.CallMonitor, inSampleRate = -1 // set only after SDP is accepted } p := &MediaPort{ - tid: tid, - log: log, - opts: opts, - mon: mon, - externalIP: opts.IP, - mediaTimeout: mediaTimeout, - timeoutResetTick: make(chan time.Duration, 1), - jitterEnabled: opts.EnableJitterBuffer, - port: newUDPConn(log, conn), - audioOut: msdk.NewSwitchWriter(sampleRate), - audioIn: msdk.NewSwitchWriter(inSampleRate), - stats: opts.Stats, - } + tid: tid, + log: log, + opts: opts, + mon: mon, + externalIP: opts.IP, + mediaTimeout: mediaTimeout, + timeoutResetTick: make(chan time.Duration, 1), + jitterEnabled: opts.EnableJitterBuffer, + port: newUDPConn(log, conn), + audioOut: msdk.NewSwitchWriter(sampleRate), + audioIn: msdk.NewSwitchWriter(inSampleRate), + stats: opts.Stats, + gapFillingEnabled: opts.EnableGapFilling, + } + + // Initialize stats baseline so the first update has a sane starting point. + p.stats.mu.Lock() + if p.stats.last.Time.IsZero() { + now := time.Now() + p.stats.last.Time = now + p.stats.last.AudioInSamples = p.stats.AudioInSamples.Load() + p.stats.last.AudioOutSamples = p.stats.AudioOutSamples.Load() + } + p.stats.mu.Unlock() + p.timeoutInitial.Store(&opts.MediaTimeoutInitial) p.timeoutGeneral.Store(&opts.MediaTimeout) go p.timeoutLoop(tid, func() { @@ -277,23 +290,24 @@ func NewMediaPortWith(tid traceid.ID, log logger.Logger, mon *stats.CallMonitor, // MediaPort combines all functionality related to sending and accepting SIP media. type MediaPort struct { - tid traceid.ID - log logger.Logger - opts *MediaOptions - mon *stats.CallMonitor - externalIP netip.Addr - port *udpConn - mediaReceived core.Fuse - packetCount atomic.Uint64 - mediaTimeout <-chan struct{} - timeoutStart atomic.Pointer[time.Time] - timeoutResetTick chan time.Duration - timeoutInitial atomic.Pointer[time.Duration] - timeoutGeneral atomic.Pointer[time.Duration] - closed core.Fuse - stats *PortStats - dtmfAudioEnabled bool - jitterEnabled bool + tid traceid.ID + log logger.Logger + opts *MediaOptions + mon *stats.CallMonitor + externalIP netip.Addr + port *udpConn + mediaReceived core.Fuse + packetCount atomic.Uint64 + mediaTimeout <-chan struct{} + timeoutStart atomic.Pointer[time.Time] + timeoutResetTick chan time.Duration + timeoutInitial atomic.Pointer[time.Duration] + timeoutGeneral atomic.Pointer[time.Duration] + closed core.Fuse + stats *PortStats + dtmfAudioEnabled bool + jitterEnabled bool + gapFillingEnabled bool mu sync.Mutex conf *MediaConf @@ -723,6 +737,148 @@ func (p *MediaPort) setupOutput(tid traceid.ID) error { return nil } +// newGapFillerHandler creates an RTP handler that detects timestamp gaps (DTX/silence suppression) +// and generates silence PCM samples to fill those gaps. +// It also implements PCM16Writer to track actual frame durations from decoded samples. +func newGapFillerHandler( + log logger.Logger, + next rtp.Handler, + audioWriter msdk.PCM16Writer, + clockRate uint32, +) rtp.Handler { + return &gapFillerHandler{ + log: log, + next: next, + underlyingWriter: audioWriter, + clockRate: clockRate, + sampleRate: audioWriter.SampleRate(), + expectedTS: 0, + initialized: false, + } +} + +type gapFillerHandler struct { + log logger.Logger + next rtp.Handler + underlyingWriter msdk.PCM16Writer // unwrapped writer for gap filling and forwarding decoded samples + clockRate uint32 + sampleRate int + lastTS uint32 // RTP timestamp of last processed packet + currentTS uint32 // RTP timestamp of packet currently being decoded + expectedTS uint32 // Expected next RTP timestamp based on actual frame durations + initialized bool +} + +func (g *gapFillerHandler) String() string { + return "GapFiller -> " + g.next.String() +} + +// Implement PCM16Writer interface so we can track frame durations directly +func (g *gapFillerHandler) SampleRate() int { + return g.sampleRate +} + +func (g *gapFillerHandler) Close() error { + return g.underlyingWriter.Close() +} + +func (g *gapFillerHandler) WriteSample(sample msdk.PCM16Sample) error { + ts := g.currentTS + + // Calculate frame duration in RTP timestamp units from actual samples + if len(sample) > 0 { + // frameDurationTS = (samples / sampleRate) * clockRate + frameDurTS := uint32(uint64(len(sample)) * uint64(g.clockRate) / uint64(g.sampleRate)) + + // Update expected next timestamp based on actual frame duration + // Only update if this is the frame we just processed (ts == g.lastTS) + // This ensures we're updating expectedTS for the correct packet + // Note: ts can be 0 for the first packet, so we don't check ts > 0 + if ts == g.lastTS { + g.expectedTS = ts + frameDurTS + } + } + + return g.underlyingWriter.WriteSample(sample) +} + +func (g *gapFillerHandler) HandleRTP(h *rtp.Header, payload []byte) error { + ts := h.Timestamp + + // Set current timestamp before decoding so WriteSample can track it + g.currentTS = ts + + if !g.initialized { + g.lastTS = ts + g.expectedTS = 0 // Will be set after first frame is decoded + g.initialized = true + return g.next.HandleRTP(h, payload) + } + + // Only detect gaps if we have learned the expected timestamp from actual frame durations + // Without knowing the actual frame duration, we can't accurately detect gaps + if g.expectedTS == 0 { + // Haven't learned frame duration yet - just update lastTS and forward + g.lastTS = ts + return g.next.HandleRTP(h, payload) + } + + // Calculate gap based on expected timestamp (from actual frame durations) + var gapTS int64 + // Compare to expected timestamp, handling wrap-around + if ts >= g.expectedTS { + gapTS = int64(ts - g.expectedTS) + } else { + // Handle wrap-around + gapTS = int64(ts) + int64(^uint32(0)) - int64(g.expectedTS) + 1 + } + + // Check if there's a gap (more than tolerance) + // Use a small tolerance (e.g., 5ms worth of timestamps) to account for jitter + toleranceTS := uint32(uint64(g.clockRate) * 5 / 1000) // 5ms + if gapTS > int64(toleranceTS) { + // Calculate samples for the gap based on actual timestamp difference + // Convert timestamp gap to samples: gapTS / clockRate * sampleRate + gapSamples := int64(g.sampleRate) * gapTS / int64(g.clockRate) + + // Limit gap filling to reasonable amounts (e.g., max 1 second) + maxSamples := int64(g.sampleRate) + if gapSamples > maxSamples { + gapSamples = maxSamples + } + + if gapSamples > 0 { + // Generate silence samples for the exact gap + // Split into reasonable chunks (e.g., 20ms frames) for compatibility + const chunkSamples = 160 // 20ms at 8kHz, will scale with sample rate + chunkSize := int64(g.sampleRate) * 20 / 1000 // 20ms worth of samples + if chunkSize == 0 { + chunkSize = 1 + } + + // Use underlying writer directly + underlying := g.underlyingWriter + for remaining := gapSamples; remaining > 0; { + chunk := remaining + if chunk > chunkSize { + chunk = chunkSize + } + silence := make(msdk.PCM16Sample, chunk) + if err := underlying.WriteSample(silence); err != nil { + g.log.Debugw("failed to write silence sample", "error", err) + // Continue even if one fails + } + remaining -= chunk + } + } + } + + g.lastTS = ts + // expectedTS will be updated by WriteSample after decoding + + return g.next.HandleRTP(h, payload) +} + func (p *MediaPort) setupInput() { // Decoding pipeline (SIP RTP -> LK PCM) codec := p.conf.Audio.Codec @@ -735,7 +891,28 @@ func (p *MediaPort) setupInput() { if p.stats != nil { audioWriter = newMediaWriterCount(audioWriter, &p.stats.AudioInFrames, &p.stats.AudioInSamples) } + + // Create gap filler handler if enabled (it implements both RTP Handler and PCM16Writer) + var gapFiller *gapFillerHandler + if p.gapFillingEnabled { + gapFiller = newGapFillerHandler( + p.log, + nil, // will be set below + audioWriter, + uint32(codecInfo.RTPClockRate), + ).(*gapFillerHandler) + // Use gap filler as the writer so it can track frame durations + audioWriter = gapFiller + } + + // Create codec decoder that writes to audioWriter (which may be the gap filler) audioHandler := p.conf.Audio.Codec.DecodeRTP(audioWriter, p.conf.Audio.Type) + + // Set the next handler in gap filler + if gapFiller != nil { + gapFiller.next = audioHandler + audioHandler = gapFiller + } p.audioInHandler = audioHandler mux := rtp.NewMux(nil) diff --git a/pkg/sip/media_port_test.go b/pkg/sip/media_port_test.go index 246eea60..5ec11ef9 100644 --- a/pkg/sip/media_port_test.go +++ b/pkg/sip/media_port_test.go @@ -23,6 +23,7 @@ import ( "slices" "strconv" "strings" + "sync" "sync/atomic" "testing" "time" @@ -542,3 +543,407 @@ func TestMediaTimeout(t *testing.T) { } }) } + +// testPCM16Writer is a mock PCM16Writer that captures all written samples +type testPCM16Writer struct { + samples []msdk.PCM16Sample + silenceOnly []msdk.PCM16Sample // only silence samples (all zeros) + rate int + mu sync.Mutex +} + +func newTestPCM16Writer(rate int) *testPCM16Writer { + return &testPCM16Writer{ + samples: make([]msdk.PCM16Sample, 0), + silenceOnly: make([]msdk.PCM16Sample, 0), + rate: rate, + } +} + +func (w *testPCM16Writer) String() string { + return "testPCM16Writer" +} + +func (w *testPCM16Writer) SampleRate() int { + return w.rate +} + +func (w *testPCM16Writer) WriteSample(sample msdk.PCM16Sample) error { + w.mu.Lock() + defer w.mu.Unlock() + w.samples = append(w.samples, slices.Clone(sample)) + // Track silence samples separately (all zeros) + if len(sample) > 0 && !slices.ContainsFunc(sample, func(v int16) bool { return v != 0 }) { + w.silenceOnly = append(w.silenceOnly, slices.Clone(sample)) + } + return nil +} + +func (w *testPCM16Writer) Close() error { + return nil +} + +func (w *testPCM16Writer) GetSamples() []msdk.PCM16Sample { + w.mu.Lock() + defer w.mu.Unlock() + return slices.Clone(w.samples) +} + +func (w *testPCM16Writer) GetSampleCount() int { + w.mu.Lock() + defer w.mu.Unlock() + return len(w.samples) +} + +func (w *testPCM16Writer) GetSilenceSamples() []msdk.PCM16Sample { + w.mu.Lock() + defer w.mu.Unlock() + return slices.Clone(w.silenceOnly) +} + +func (w *testPCM16Writer) GetSilenceSampleCount() int { + w.mu.Lock() + defer w.mu.Unlock() + return len(w.silenceOnly) +} + +// testRTPHandler is a mock RTP handler that tracks calls +type testRTPHandler struct { + calls []struct { + ts uint32 + payload []byte + } + mu sync.Mutex +} + +func newTestRTPHandler() *testRTPHandler { + return &testRTPHandler{ + calls: make([]struct { + ts uint32 + payload []byte + }, 0), + } +} + +func (h *testRTPHandler) String() string { + return "testRTPHandler" +} + +func (h *testRTPHandler) HandleRTP(header *rtp.Header, payload []byte) error { + h.mu.Lock() + defer h.mu.Unlock() + h.calls = append(h.calls, struct { + ts uint32 + payload []byte + }{ts: header.Timestamp, payload: slices.Clone(payload)}) + return nil +} + +func (h *testRTPHandler) GetCalls() []struct { + ts uint32 + payload []byte +} { + h.mu.Lock() + defer h.mu.Unlock() + return slices.Clone(h.calls) +} + +// simulateDecoding simulates the decoder writing samples after HandleRTP +// This is needed because in real usage, decoding happens synchronously during HandleRTP +// Note: currentTS should already be set by HandleRTP before calling this +func simulateDecoding(handler rtp.Handler, samplesPerFrame int) { + if gf, ok := handler.(*gapFillerHandler); ok { + // Get the current timestamp that was set by HandleRTP + // Set currentTS to match lastTS so WriteSample can update expectedTS + gf.currentTS = gf.lastTS + + sample := make(msdk.PCM16Sample, samplesPerFrame) + // Fill with non-zero values to distinguish from silence + for j := range sample { + sample[j] = 1000 + } + _ = gf.WriteSample(sample) + } +} + +func TestGapFillerHandler(t *testing.T) { + const ( + sampleRate = 8000 + clockRate = 8000 + ) + + t.Run("no gap - normal packets", func(t *testing.T) { + audioWriter := newTestPCM16Writer(sampleRate) + nextHandler := newTestRTPHandler() + handler := newGapFillerHandler( + logger.GetLogger(), + nextHandler, + audioWriter, + clockRate, + ) + + // Calculate frame duration in RTP timestamp units + frameDurTS := uint32(uint64(clockRate) * uint64(rtp.DefFrameDur) / uint64(time.Second)) + samplesPerFrame := sampleRate / int(time.Second/rtp.DefFrameDur) + + // Send packets with normal spacing (no gaps) + for i := 0; i < 5; i++ { + h := &rtp.Header{ + Timestamp: uint32(i) * frameDurTS, + PayloadType: 0, + } + payload := []byte{0x01, 0x02, 0x03} + err := handler.HandleRTP(h, payload) + require.NoError(t, err) + + // Simulate decoder writing samples AFTER HandleRTP + // This updates expectedTS for the next packet + simulateDecoding(handler, samplesPerFrame) + } + + // Should have no silence samples (no gaps) - only decoded samples + require.Equal(t, 0, audioWriter.GetSilenceSampleCount()) + // Should have forwarded all packets + require.Equal(t, 5, len(nextHandler.GetCalls())) + }) + + t.Run("single frame gap", func(t *testing.T) { + audioWriter := newTestPCM16Writer(sampleRate) + nextHandler := newTestRTPHandler() + handler := newGapFillerHandler( + logger.GetLogger(), + nextHandler, + audioWriter, + clockRate, + ) + + frameDurTS := uint32(uint64(clockRate) * uint64(rtp.DefFrameDur) / uint64(time.Second)) + samplesPerFrame := sampleRate / int(time.Second/rtp.DefFrameDur) + + // First packet + h1 := &rtp.Header{Timestamp: 0, PayloadType: 0} + err := handler.HandleRTP(h1, []byte{0x01}) + require.NoError(t, err) + + // Simulate decoder writing samples for first packet + simulateDecoding(handler, samplesPerFrame) + + // Skip one frame (gap of 2 frames) + h2 := &rtp.Header{Timestamp: 2 * frameDurTS, PayloadType: 0} + err = handler.HandleRTP(h2, []byte{0x02}) + require.NoError(t, err) + + // Simulate decoder writing samples for second packet + simulateDecoding(handler, samplesPerFrame) + + // Should have generated silence for 1 missing frame + silenceSamples := audioWriter.GetSilenceSamples() + require.Equal(t, 1, len(silenceSamples)) + require.Equal(t, samplesPerFrame, len(silenceSamples[0])) + // All samples should be zero (silence) + for _, s := range silenceSamples[0] { + require.Equal(t, int16(0), s) + } + // Should have forwarded both packets + require.Equal(t, 2, len(nextHandler.GetCalls())) + }) + + t.Run("multiple frame gap", func(t *testing.T) { + audioWriter := newTestPCM16Writer(sampleRate) + nextHandler := newTestRTPHandler() + handler := newGapFillerHandler( + logger.GetLogger(), + nextHandler, + audioWriter, + clockRate, + ) + + frameDurTS := uint32(uint64(clockRate) * uint64(rtp.DefFrameDur) / uint64(time.Second)) + samplesPerFrame := sampleRate / int(time.Second/rtp.DefFrameDur) + + // First packet + h1 := &rtp.Header{Timestamp: 0, PayloadType: 0} + err := handler.HandleRTP(h1, []byte{0x01}) + require.NoError(t, err) + + // Simulate decoder writing samples for first packet + simulateDecoding(handler, samplesPerFrame) + + // Skip 5 frames (gap of 6 frames) + h2 := &rtp.Header{Timestamp: 6 * frameDurTS, PayloadType: 0} + err = handler.HandleRTP(h2, []byte{0x02}) + require.NoError(t, err) + + // Simulate decoder writing samples for second packet + simulateDecoding(handler, samplesPerFrame) + + // Should have generated silence for 5 missing frames + silenceSamples := audioWriter.GetSilenceSamples() + require.Equal(t, 5, len(silenceSamples)) + for _, sample := range silenceSamples { + require.Equal(t, samplesPerFrame, len(sample)) + for _, s := range sample { + require.Equal(t, int16(0), s) + } + } + // Should have forwarded both packets + require.Equal(t, 2, len(nextHandler.GetCalls())) + }) + + t.Run("timestamp wrap-around", func(t *testing.T) { + audioWriter := newTestPCM16Writer(sampleRate) + nextHandler := newTestRTPHandler() + handler := newGapFillerHandler( + logger.GetLogger(), + nextHandler, + audioWriter, + clockRate, + ) + + frameDurTS := uint32(uint64(clockRate) * uint64(rtp.DefFrameDur) / uint64(time.Second)) + samplesPerFrame := sampleRate / int(time.Second/rtp.DefFrameDur) + + // Set lastTS near wrap-around + lastTS := ^uint32(0) - frameDurTS + h1 := &rtp.Header{Timestamp: lastTS, PayloadType: 0} + err := handler.HandleRTP(h1, []byte{0x01}) + require.NoError(t, err) + + // Simulate decoder writing samples for first packet + simulateDecoding(handler, samplesPerFrame) + + // Next packet after wrap-around with a gap + h2 := &rtp.Header{Timestamp: 2 * frameDurTS, PayloadType: 0} + err = handler.HandleRTP(h2, []byte{0x02}) + require.NoError(t, err) + + // Simulate decoder writing samples for second packet + simulateDecoding(handler, samplesPerFrame) + + // Should have generated silence for the gap (accounting for wrap-around) + silenceSamples := audioWriter.GetSilenceSamples() + require.Greater(t, len(silenceSamples), 0) + // Should have forwarded both packets + require.Equal(t, 2, len(nextHandler.GetCalls())) + }) + + t.Run("large gap capped at 1 second", func(t *testing.T) { + audioWriter := newTestPCM16Writer(sampleRate) + nextHandler := newTestRTPHandler() + handler := newGapFillerHandler( + logger.GetLogger(), + nextHandler, + audioWriter, + clockRate, + ) + + frameDurTS := uint32(uint64(clockRate) * uint64(rtp.DefFrameDur) / uint64(time.Second)) + framesPerSecond := clockRate / frameDurTS + samplesPerFrame := sampleRate / int(time.Second/rtp.DefFrameDur) + + // First packet + h1 := &rtp.Header{Timestamp: 0, PayloadType: 0} + err := handler.HandleRTP(h1, []byte{0x01}) + require.NoError(t, err) + + // Simulate decoder writing samples for first packet + simulateDecoding(handler, samplesPerFrame) + + // Very large gap (more than 1 second) + largeGap := uint32(framesPerSecond) * 2 * frameDurTS + h2 := &rtp.Header{Timestamp: largeGap, PayloadType: 0} + err = handler.HandleRTP(h2, []byte{0x02}) + require.NoError(t, err) + + // Simulate decoder writing samples for second packet + simulateDecoding(handler, samplesPerFrame) + + // Should have generated silence capped at 1 second worth of frames + silenceSamples := audioWriter.GetSilenceSamples() + require.Equal(t, int(framesPerSecond), len(silenceSamples)) + // Should have forwarded both packets + require.Equal(t, 2, len(nextHandler.GetCalls())) + }) + + t.Run("small gap within tolerance", func(t *testing.T) { + audioWriter := newTestPCM16Writer(sampleRate) + nextHandler := newTestRTPHandler() + handler := newGapFillerHandler( + logger.GetLogger(), + nextHandler, + audioWriter, + clockRate, + ) + + frameDurTS := uint32(uint64(clockRate) * uint64(rtp.DefFrameDur) / uint64(time.Second)) + samplesPerFrame := sampleRate / int(time.Second/rtp.DefFrameDur) + + // First packet + h1 := &rtp.Header{Timestamp: 0, PayloadType: 0} + err := handler.HandleRTP(h1, []byte{0x01}) + require.NoError(t, err) + + // Simulate decoder writing samples for first packet + simulateDecoding(handler, samplesPerFrame) + + // Small gap within tolerance (should not trigger gap filling) + // Use a gap smaller than 5ms tolerance (40 timestamps at 8kHz) + smallGap := uint32(uint64(clockRate) * 3 / 1000) // 3ms, less than 5ms tolerance + h2 := &rtp.Header{Timestamp: frameDurTS + smallGap, PayloadType: 0} + err = handler.HandleRTP(h2, []byte{0x02}) + require.NoError(t, err) + + // Simulate decoder writing samples for second packet + simulateDecoding(handler, samplesPerFrame) + + // Should have no silence samples (gap within tolerance) + require.Equal(t, 0, audioWriter.GetSilenceSampleCount()) + // Should have forwarded both packets + require.Equal(t, 2, len(nextHandler.GetCalls())) + }) +} + +func TestGapFillingIntegration(t *testing.T) { + t.Run("gap filling disabled", func(t *testing.T) { + m1, _ := newMediaPair(t, &MediaOptions{ + EnableGapFilling: false, + }, nil) + + // Verify gap filling is not in the chain + chain := PrintAudioInWriter(m1) + require.NotContains(t, chain, "GapFiller") + }) + + t.Run("gap filling enabled", func(t *testing.T) { + m1, m2 := newMediaPair(t, &MediaOptions{ + EnableGapFilling: true, + }, nil) + + // Verify gap filling is in the chain + chain := PrintAudioInWriter(m1) + require.Contains(t, chain, "GapFiller") + + // Send a packet to initialize + w2 := m2.GetAudioWriter() + err := w2.WriteSample(msdk.PCM16Sample{100, 200}) + require.NoError(t, err) + + // Wait for packet to be received + select { + case <-time.After(time.Second / 4): + t.Fatal("no media received") + case <-m1.Received(): + } + + // Send another packet after a delay to simulate gap + time.Sleep(rtp.DefFrameDur * 3) + err = w2.WriteSample(msdk.PCM16Sample{300, 400}) + require.NoError(t, err) + + // Give time for processing + time.Sleep(time.Second / 4) + + // Verify m2 is functional by checking its audio writer + require.NotNil(t, m2.GetAudioWriter()) + }) +}