Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 207 additions & 30 deletions pkg/sip/media_port.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ type MediaOptions struct {
Stats *PortStats
EnableJitterBuffer bool
NoInputResample bool
EnableGapFilling bool // Generate silence for DTX/silence suppression gaps
}

func NewMediaPort(tid traceid.ID, log logger.Logger, mon *stats.CallMonitor, opts *MediaOptions, sampleRate int) (*MediaPort, error) {
Expand Down Expand Up @@ -253,19 +254,31 @@ func NewMediaPortWith(tid traceid.ID, log logger.Logger, mon *stats.CallMonitor,
inSampleRate = -1 // set only after SDP is accepted
}
p := &MediaPort{
tid: tid,
log: log,
opts: opts,
mon: mon,
externalIP: opts.IP,
mediaTimeout: mediaTimeout,
timeoutResetTick: make(chan time.Duration, 1),
jitterEnabled: opts.EnableJitterBuffer,
port: newUDPConn(log, conn),
audioOut: msdk.NewSwitchWriter(sampleRate),
audioIn: msdk.NewSwitchWriter(inSampleRate),
stats: opts.Stats,
}
tid: tid,
log: log,
opts: opts,
mon: mon,
externalIP: opts.IP,
mediaTimeout: mediaTimeout,
timeoutResetTick: make(chan time.Duration, 1),
jitterEnabled: opts.EnableJitterBuffer,
port: newUDPConn(log, conn),
audioOut: msdk.NewSwitchWriter(sampleRate),
audioIn: msdk.NewSwitchWriter(inSampleRate),
stats: opts.Stats,
gapFillingEnabled: opts.EnableGapFilling,
}

// Initialize stats baseline so the first update has a sane starting point.
p.stats.mu.Lock()
if p.stats.last.Time.IsZero() {
now := time.Now()
p.stats.last.Time = now
p.stats.last.AudioInSamples = p.stats.AudioInSamples.Load()
p.stats.last.AudioOutSamples = p.stats.AudioOutSamples.Load()
}
p.stats.mu.Unlock()

p.timeoutInitial.Store(&opts.MediaTimeoutInitial)
p.timeoutGeneral.Store(&opts.MediaTimeout)
go p.timeoutLoop(tid, func() {
Expand All @@ -277,23 +290,24 @@ func NewMediaPortWith(tid traceid.ID, log logger.Logger, mon *stats.CallMonitor,

// MediaPort combines all functionality related to sending and accepting SIP media.
type MediaPort struct {
tid traceid.ID
log logger.Logger
opts *MediaOptions
mon *stats.CallMonitor
externalIP netip.Addr
port *udpConn
mediaReceived core.Fuse
packetCount atomic.Uint64
mediaTimeout <-chan struct{}
timeoutStart atomic.Pointer[time.Time]
timeoutResetTick chan time.Duration
timeoutInitial atomic.Pointer[time.Duration]
timeoutGeneral atomic.Pointer[time.Duration]
closed core.Fuse
stats *PortStats
dtmfAudioEnabled bool
jitterEnabled bool
tid traceid.ID
log logger.Logger
opts *MediaOptions
mon *stats.CallMonitor
externalIP netip.Addr
port *udpConn
mediaReceived core.Fuse
packetCount atomic.Uint64
mediaTimeout <-chan struct{}
timeoutStart atomic.Pointer[time.Time]
timeoutResetTick chan time.Duration
timeoutInitial atomic.Pointer[time.Duration]
timeoutGeneral atomic.Pointer[time.Duration]
closed core.Fuse
stats *PortStats
dtmfAudioEnabled bool
jitterEnabled bool
gapFillingEnabled bool

mu sync.Mutex
conf *MediaConf
Expand Down Expand Up @@ -723,6 +737,148 @@ func (p *MediaPort) setupOutput(tid traceid.ID) error {
return nil
}

// newGapFillerHandler creates an RTP handler that detects timestamp gaps (DTX/silence suppression)
// and generates silence PCM samples to fill those gaps.
// It also implements PCM16Writer to track actual frame durations from decoded samples.
func newGapFillerHandler(
log logger.Logger,
next rtp.Handler,
audioWriter msdk.PCM16Writer,
clockRate uint32,
) rtp.Handler {
return &gapFillerHandler{
log: log,
next: next,
underlyingWriter: audioWriter,
clockRate: clockRate,
sampleRate: audioWriter.SampleRate(),
expectedTS: 0,
initialized: false,
}
}

type gapFillerHandler struct {
log logger.Logger
next rtp.Handler
underlyingWriter msdk.PCM16Writer // unwrapped writer for gap filling and forwarding decoded samples
clockRate uint32
sampleRate int
lastTS uint32 // RTP timestamp of last processed packet
currentTS uint32 // RTP timestamp of packet currently being decoded
expectedTS uint32 // Expected next RTP timestamp based on actual frame durations
initialized bool
}

func (g *gapFillerHandler) String() string {
return "GapFiller -> " + g.next.String()
}

// Implement PCM16Writer interface so we can track frame durations directly
func (g *gapFillerHandler) SampleRate() int {
return g.sampleRate
}

func (g *gapFillerHandler) Close() error {
return g.underlyingWriter.Close()
}

func (g *gapFillerHandler) WriteSample(sample msdk.PCM16Sample) error {
ts := g.currentTS

// Calculate frame duration in RTP timestamp units from actual samples
if len(sample) > 0 {
// frameDurationTS = (samples / sampleRate) * clockRate
frameDurTS := uint32(uint64(len(sample)) * uint64(g.clockRate) / uint64(g.sampleRate))

// Update expected next timestamp based on actual frame duration
// Only update if this is the frame we just processed (ts == g.lastTS)
// This ensures we're updating expectedTS for the correct packet
// Note: ts can be 0 for the first packet, so we don't check ts > 0
if ts == g.lastTS {
g.expectedTS = ts + frameDurTS
}
}

return g.underlyingWriter.WriteSample(sample)
}

func (g *gapFillerHandler) HandleRTP(h *rtp.Header, payload []byte) error {
ts := h.Timestamp

// Set current timestamp before decoding so WriteSample can track it
g.currentTS = ts

if !g.initialized {
g.lastTS = ts
g.expectedTS = 0 // Will be set after first frame is decoded
g.initialized = true
return g.next.HandleRTP(h, payload)
}

// Only detect gaps if we have learned the expected timestamp from actual frame durations
// Without knowing the actual frame duration, we can't accurately detect gaps
if g.expectedTS == 0 {
// Haven't learned frame duration yet - just update lastTS and forward
g.lastTS = ts
return g.next.HandleRTP(h, payload)
}

// Calculate gap based on expected timestamp (from actual frame durations)
var gapTS int64
// Compare to expected timestamp, handling wrap-around
if ts >= g.expectedTS {
gapTS = int64(ts - g.expectedTS)
} else {
// Handle wrap-around
gapTS = int64(ts) + int64(^uint32(0)) - int64(g.expectedTS) + 1
}

// Check if there's a gap (more than tolerance)
// Use a small tolerance (e.g., 5ms worth of timestamps) to account for jitter
toleranceTS := uint32(uint64(g.clockRate) * 5 / 1000) // 5ms
if gapTS > int64(toleranceTS) {
// Calculate samples for the gap based on actual timestamp difference
// Convert timestamp gap to samples: gapTS / clockRate * sampleRate
gapSamples := int64(g.sampleRate) * gapTS / int64(g.clockRate)

// Limit gap filling to reasonable amounts (e.g., max 1 second)
maxSamples := int64(g.sampleRate)
if gapSamples > maxSamples {
gapSamples = maxSamples
}

if gapSamples > 0 {
// Generate silence samples for the exact gap
// Split into reasonable chunks (e.g., 20ms frames) for compatibility
const chunkSamples = 160 // 20ms at 8kHz, will scale with sample rate
chunkSize := int64(g.sampleRate) * 20 / 1000 // 20ms worth of samples
if chunkSize == 0 {
chunkSize = 1
}

// Use underlying writer directly
underlying := g.underlyingWriter
for remaining := gapSamples; remaining > 0; {
chunk := remaining
if chunk > chunkSize {
chunk = chunkSize
}
silence := make(msdk.PCM16Sample, chunk)
if err := underlying.WriteSample(silence); err != nil {
g.log.Debugw("failed to write silence sample", "error", err)
// Continue even if one fails
}
remaining -= chunk
}
}
}

g.lastTS = ts
// expectedTS will be updated by WriteSample after decoding

return g.next.HandleRTP(h, payload)
}

func (p *MediaPort) setupInput() {
// Decoding pipeline (SIP RTP -> LK PCM)
codec := p.conf.Audio.Codec
Expand All @@ -735,7 +891,28 @@ func (p *MediaPort) setupInput() {
if p.stats != nil {
audioWriter = newMediaWriterCount(audioWriter, &p.stats.AudioInFrames, &p.stats.AudioInSamples)
}

// Create gap filler handler if enabled (it implements both RTP Handler and PCM16Writer)
var gapFiller *gapFillerHandler
if p.gapFillingEnabled {
gapFiller = newGapFillerHandler(
p.log,
nil, // will be set below
audioWriter,
uint32(codecInfo.RTPClockRate),
).(*gapFillerHandler)
// Use gap filler as the writer so it can track frame durations
audioWriter = gapFiller
}

// Create codec decoder that writes to audioWriter (which may be the gap filler)
audioHandler := p.conf.Audio.Codec.DecodeRTP(audioWriter, p.conf.Audio.Type)

// Set the next handler in gap filler
if gapFiller != nil {
gapFiller.next = audioHandler
audioHandler = gapFiller
}
p.audioInHandler = audioHandler

mux := rtp.NewMux(nil)
Expand Down
Loading
Loading