// Package direct implements a P2P WebRTC media peer using pion/webrtc. // It replaces LiveKit for 1:1 scenarios, eliminating the external SFU dependency. // Signaling (offer/answer/ICE) flows through the existing WebSocket hub. package direct import ( "context" "encoding/binary" "fmt" "log" "math" "sync" "sync/atomic" "time" "github.com/cyberverse/server/internal/mediapeer" "github.com/pion/interceptor/pkg/cc" "github.com/pion/webrtc/v4" "github.com/pion/webrtc/v4/pkg/media" opus "gopkg.in/hraban/opus.v2" ) // Compile-time check: DirectPeer implements mediapeer.MediaPeer. var _ mediapeer.MediaPeer = (*DirectPeer)(nil) func directVoiceTrace(event string, label string, format string, args ...any) { if label == "" { return } sinceUserFinal := "-" if len(args) > 0 { if ts, ok := args[0].(time.Time); ok { if !ts.IsZero() { sinceUserFinal = fmt.Sprintf("%d", time.Since(ts).Milliseconds()) } args = args[1:] } } prefix := fmt.Sprintf( "voice_trace event=%-30s %s since_user_final_ms=%s", event, label, sinceUserFinal, ) if format == "" { log.Print(prefix) return } allArgs := append([]any{prefix}, args...) log.Printf("%s "+format, allArgs...) } // SignalingFunc sends a signaling message to the browser via the WebSocket hub. type SignalingFunc func(sessionID string, msg map[string]any) const ( defaultDirectVideoBitrateKbps = 1800 minDirectVideoBitrateKbps = 500 maxDirectVideoBitrateKbps = 1800 videoBitrateSafetyPercent = 65 maxAudioDelayMS int64 = 800 audioDelayStepMS int64 = 160 audioDelayFeedbackMaxAge = 4 * time.Second ) // DirectPeer is a P2P WebRTC media peer using pion/webrtc directly. type DirectPeer struct { sessionID string signalingFn SignalingFunc iceServers []webrtc.ICEServer webrtcAPI *webrtc.API estimatorCh <-chan cc.BandwidthEstimator pc *webrtc.PeerConnection videoTrack *webrtc.TrackLocalStaticSample audioTrack *webrtc.TrackLocalStaticSample userAudioCh chan []byte // Opus encoder for outgoing PCM → Opus opusMu sync.Mutex opusEncoder *opus.Encoder opusEncoderSR int // Connection state connected chan struct{} mu sync.Mutex mediaMu sync.Mutex // AV pipeline (same pattern as Bot) encodeCh chan *mediapeer.RawAVSegment publishCh chan *mediapeer.AVSegment avPipelineCtx context.Context avPipelineCancel context.CancelFunc avPipelineWg sync.WaitGroup // RTP timestamp gap correction: tracks the wall-clock time of the // last WriteSample call so the next segment's first sample can carry // a Duration that advances the RTP timestamp over the idle gap. lastVideoWriteTime time.Time lastAudioWriteTime time.Time playbackEpoch atomic.Uint64 avCalibrationEnabled atomic.Bool avCalibrationMarkerSeq atomic.Int64 targetBitrateBps atomic.Int64 audioDelayMu sync.Mutex audioDelayTargetMS int64 audioDelayCurrentMS int64 audioDelayPCM []byte audioDelaySampleRate int audioDelayEpoch uint64 audioDelayLastFeedback time.Time } // NewDirectPeer creates a new P2P WebRTC peer for the given session. // api should be created via NewWebRTCAPI (with interceptors); estimatorCh receives the GCC bandwidth estimator. func NewDirectPeer(sessionID string, signalingFn SignalingFunc, iceServers []webrtc.ICEServer, api *webrtc.API, estimatorCh <-chan cc.BandwidthEstimator) *DirectPeer { return &DirectPeer{ sessionID: sessionID, signalingFn: signalingFn, iceServers: iceServers, webrtcAPI: api, estimatorCh: estimatorCh, userAudioCh: make(chan []byte, 64), connected: make(chan struct{}), } } // Connect creates the PeerConnection and local tracks. // It does NOT start negotiation — call StartNegotiation() after // the browser signals readiness via "webrtc_ready". func (p *DirectPeer) Connect(ctx context.Context) error { config := webrtc.Configuration{ ICEServers: p.iceServers, } var pc *webrtc.PeerConnection var err error if p.webrtcAPI != nil { pc, err = p.webrtcAPI.NewPeerConnection(config) } else { pc, err = webrtc.NewPeerConnection(config) } if err != nil { return fmt.Errorf("create PeerConnection: %w", err) } // Create VP8 video track videoTrack, err := webrtc.NewTrackLocalStaticSample( webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8}, "assistant-video", "cyberverse", ) if err != nil { pc.Close() return fmt.Errorf("create video track: %w", err) } videoSender, err := pc.AddTrack(videoTrack) if err != nil { pc.Close() return fmt.Errorf("add video track: %w", err) } // Create Opus audio track audioTrack, err := webrtc.NewTrackLocalStaticSample( webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "assistant-audio", "cyberverse", ) if err != nil { pc.Close() return fmt.Errorf("create audio track: %w", err) } audioSender, err := pc.AddTrack(audioTrack) if err != nil { pc.Close() return fmt.Errorf("add audio track: %w", err) } // RTCP reader goroutines — MUST read continuously so that // NACK/TWCC/GCC interceptors receive browser feedback and work correctly. go readRTCP(videoSender) go readRTCP(audioSender) // Add a recvonly audio transceiver so the SDP offer explicitly requests // the browser's microphone audio. Without this, pion's sendrecv transceiver // from AddTrack may not produce an OnTrack callback for the browser's mic. if _, err := pc.AddTransceiverFromKind( webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}, ); err != nil { pc.Close() return fmt.Errorf("add mic receive transceiver: %w", err) } // Handle incoming user audio track (mic) pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { if !p.isActivePeerConnection(pc) { return } if track.Kind() != webrtc.RTPCodecTypeAudio { return } log.Printf("[DirectPeer] session=%s subscribed to user audio track codec=%s", p.sessionID, track.Codec().MimeType) go p.readUserAudio(track) }) // Forward ICE candidates to browser via signaling pc.OnICECandidate(func(c *webrtc.ICECandidate) { if !p.isActivePeerConnection(pc) { return } if c == nil { log.Printf("[DirectPeer] session=%s ICE gathering complete", p.sessionID) return } init := c.ToJSON() log.Printf("[DirectPeer] session=%s local ICE candidate: %s | sdp: %s", p.sessionID, c.String(), init.Candidate) msg := map[string]any{ "type": "ice_candidate", "candidate": init.Candidate, } if init.SDPMid != nil { msg["sdp_mid"] = *init.SDPMid } if init.SDPMLineIndex != nil { msg["sdp_mline_index"] = *init.SDPMLineIndex } p.signalingFn(p.sessionID, msg) }) // Track ICE connection state (more granular than PeerConnection state) pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { if !p.isActivePeerConnection(pc) { return } log.Printf("[DirectPeer] session=%s ICE connection state: %s", p.sessionID, state.String()) }) // Track connection state pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { if !p.isActivePeerConnection(pc) { return } log.Printf("[DirectPeer] session=%s connection state: %s", p.sessionID, state.String()) if state == webrtc.PeerConnectionStateConnected { select { case <-p.connected: default: close(p.connected) } } }) p.mu.Lock() p.pc = pc p.videoTrack = videoTrack p.audioTrack = audioTrack p.mu.Unlock() // Monitor GCC bandwidth estimation if p.estimatorCh != nil { go func() { select { case estimator, ok := <-p.estimatorCh: if !ok { return } estimator.OnTargetBitrateChange(func(bitrate int) { p.targetBitrateBps.Store(int64(bitrate)) log.Printf("[DirectPeer] session=%s GCC target bitrate: %d kbps", p.sessionID, bitrate/1000) }) case <-time.After(10 * time.Second): log.Printf("[DirectPeer] session=%s GCC estimator unavailable; using default VP8 bitrate", p.sessionID) } }() } return nil } func (p *DirectPeer) isActivePeerConnection(pc *webrtc.PeerConnection) bool { p.mu.Lock() defer p.mu.Unlock() return p.pc == pc } func (p *DirectPeer) resetAudioDelayState() { p.audioDelayMu.Lock() defer p.audioDelayMu.Unlock() p.audioDelayTargetMS = 0 p.audioDelayCurrentMS = 0 p.audioDelayPCM = nil p.audioDelaySampleRate = 0 p.audioDelayEpoch = 0 p.audioDelayLastFeedback = time.Time{} } func (p *DirectPeer) prepareMediaPathReset() *webrtc.PeerConnection { p.mu.Lock() oldPC := p.pc p.pc = nil p.videoTrack = nil p.audioTrack = nil p.connected = make(chan struct{}) p.lastVideoWriteTime = time.Time{} p.lastAudioWriteTime = time.Time{} p.mu.Unlock() p.targetBitrateBps.Store(0) p.opusMu.Lock() p.opusEncoder = nil p.opusEncoderSR = 0 p.opusMu.Unlock() p.resetAudioDelayState() return oldPC } // ResetMediaPath rebuilds the Direct WebRTC PeerConnection without closing // the user audio subscription channel owned by the session. func (p *DirectPeer) ResetMediaPath(ctx context.Context) error { p.mediaMu.Lock() defer p.mediaMu.Unlock() log.Printf("[DirectPeer] session=%s Direct media path reset requested", p.sessionID) oldPC := p.prepareMediaPathReset() if oldPC != nil { if err := oldPC.Close(); err != nil { log.Printf("[DirectPeer] session=%s close old PeerConnection during media reset failed: %v", p.sessionID, err) } } if err := p.Connect(ctx); err != nil { return err } if err := p.StartNegotiation(); err != nil { return err } log.Printf("[DirectPeer] session=%s Direct media path reset negotiation started", p.sessionID) return nil } // SetAVCalibrationEnabled toggles explicit in-band AV marker injection for diagnostics. func (p *DirectPeer) SetAVCalibrationEnabled(enabled bool) { p.avCalibrationEnabled.Store(enabled) } // HandleAVSyncFeedback updates the server-side audio delay target from browser diagnostics. func (p *DirectPeer) HandleAVSyncFeedback(turnSeq uint64, excessVideoLagMS, jitterBufferDeltaMS float64, likely string) { if likely != "video_late_audio_leads" { return } currentEpoch := p.playbackEpoch.Load() if turnSeq > 0 && currentEpoch > 0 && turnSeq != currentEpoch { return } if math.IsNaN(excessVideoLagMS) || math.IsInf(excessVideoLagMS, 0) || math.IsNaN(jitterBufferDeltaMS) || math.IsInf(jitterBufferDeltaMS, 0) { return } if jitterBufferDeltaMS <= 0 { return } wanted := int64(math.Round(math.Max(math.Max(excessVideoLagMS, 0), jitterBufferDeltaMS))) if wanted < 0 { wanted = 0 } if wanted > maxAudioDelayMS { wanted = maxAudioDelayMS } p.audioDelayMu.Lock() p.audioDelayTargetMS = wanted p.audioDelayLastFeedback = time.Now() current := p.audioDelayCurrentMS p.audioDelayMu.Unlock() log.Printf( "[DirectPeer] session=%s AV sync feedback turn=%d excess_video_lag=%.1fms jb_delta=%.1fms audio_delay_target=%dms current=%dms", p.sessionID, turnSeq, excessVideoLagMS, jitterBufferDeltaMS, wanted, current, ) } // readRTCP continuously reads RTCP packets from an RTPSender. // This is required for NACK/TWCC/GCC interceptors to receive browser feedback. func readRTCP(sender *webrtc.RTPSender) { rtcpBuf := make([]byte, 1500) for { if _, _, err := sender.Read(rtcpBuf); err != nil { return } } } // StartNegotiation creates an SDP offer and sends it to the browser. // Call this after the browser sends "webrtc_ready" via WebSocket. // It waits for ICE gathering to complete so all candidates are embedded // in the SDP offer itself (no trickle needed). func (p *DirectPeer) StartNegotiation() error { p.mu.Lock() pc := p.pc p.mu.Unlock() if pc == nil { return fmt.Errorf("PeerConnection not created") } offer, err := pc.CreateOffer(nil) if err != nil { return fmt.Errorf("create offer: %w", err) } if err := pc.SetLocalDescription(offer); err != nil { return fmt.Errorf("set local description: %w", err) } // Wait for ICE gathering to complete so all candidates are in the SDP. // For ICE-TCP with a single TCPMux listener this is nearly instant. gatherComplete := webrtc.GatheringCompletePromise(pc) select { case <-gatherComplete: case <-time.After(5 * time.Second): log.Printf("[DirectPeer] session=%s ICE gathering timeout, sending partial offer", p.sessionID) } // Send the complete SDP (with candidates embedded) completeOffer := pc.LocalDescription() p.signalingFn(p.sessionID, map[string]any{ "type": "webrtc_offer", "sdp": completeOffer.SDP, }) log.Printf("[DirectPeer] session=%s SDP offer sent (with candidates)", p.sessionID) return nil } // HandleSignaling processes incoming signaling messages from the browser. func (p *DirectPeer) HandleSignaling(msgType, sdp, candidate string, sdpMid *string, sdpMLineIndex *uint16) { p.mu.Lock() pc := p.pc p.mu.Unlock() if pc == nil { return } switch msgType { case "webrtc_answer": if err := pc.SetRemoteDescription(webrtc.SessionDescription{ Type: webrtc.SDPTypeAnswer, SDP: sdp, }); err != nil { log.Printf("[DirectPeer] session=%s set remote description failed: %v", p.sessionID, err) } else { log.Printf("[DirectPeer] session=%s SDP answer set", p.sessionID) } case "ice_candidate": if candidate == "" { return } log.Printf("[DirectPeer] session=%s remote ICE candidate: %s", p.sessionID, candidate) init := webrtc.ICECandidateInit{ Candidate: candidate, } if sdpMid != nil { init.SDPMid = sdpMid } if sdpMLineIndex != nil { init.SDPMLineIndex = sdpMLineIndex } if err := pc.AddICECandidate(init); err != nil { log.Printf("[DirectPeer] session=%s add ICE candidate failed: %v", p.sessionID, err) } } } // readUserAudio reads Opus RTP packets from the user's mic track, // decodes to 16kHz mono PCM, and writes to userAudioCh. func (p *DirectPeer) readUserAudio(track *webrtc.TrackRemote) { decoder, err := opus.NewDecoder(16000, 1) if err != nil { log.Printf("[DirectPeer] session=%s opus decoder creation failed: %v", p.sessionID, err) return } pcmBuf := make([]int16, 16000) // up to 1 second buffer for { pkt, _, err := track.ReadRTP() if err != nil { log.Printf("[DirectPeer] session=%s user audio track closed: %v", p.sessionID, err) return } n, err := decoder.Decode(pkt.Payload, pcmBuf) if err != nil { continue } if n == 0 { continue } // Convert int16 → little-endian bytes (matching VoiceLLM expected format) out := make([]byte, n*2) for i := 0; i < n; i++ { binary.LittleEndian.PutUint16(out[i*2:], uint16(pcmBuf[i])) } select { case p.userAudioCh <- out: default: // Drop if consumer is too slow (same backpressure as Bot) } } } // SubscribeUserAudio returns the channel receiving decoded user mic PCM. func (p *DirectPeer) SubscribeUserAudio() <-chan []byte { return p.userAudioCh } // --- AV Pipeline (same pattern as Bot) --- // StartAVPipeline launches encode and publish goroutines. func (p *DirectPeer) StartAVPipeline(ctx context.Context) { p.avPipelineCtx, p.avPipelineCancel = context.WithCancel(ctx) p.encodeCh = make(chan *mediapeer.RawAVSegment, 1) p.publishCh = make(chan *mediapeer.AVSegment, 1) p.avPipelineWg.Add(2) go p.runEncoder() go p.runPublisher() } // SendAVSegment enqueues a raw segment for encoding and publishing. func (p *DirectPeer) SendAVSegment(seg *mediapeer.RawAVSegment) error { seg.QueuedAt = time.Now() if p.isPlaybackStale(seg.Epoch) { return nil } directVoiceTrace( "direct_segment_enqueued", seg.TraceLabel, "", seg.UserFinalAt, ) select { case p.encodeCh <- seg: return nil case <-p.avPipelineCtx.Done(): return fmt.Errorf("av pipeline cancelled") } } func (p *DirectPeer) AdvancePlaybackEpoch(epoch uint64) { if epoch == 0 { return } for { current := p.playbackEpoch.Load() if epoch <= current { return } if p.playbackEpoch.CompareAndSwap(current, epoch) { return } } } // WaitAVDrain blocks until all queued segments are published. func (p *DirectPeer) WaitAVDrain(timeout time.Duration) { if p.encodeCh == nil { return } fence := make(chan struct{}) select { case p.encodeCh <- &mediapeer.RawAVSegment{Fence: fence}: case <-p.avPipelineCtx.Done(): return case <-time.After(timeout): return } select { case <-fence: case <-p.avPipelineCtx.Done(): case <-time.After(timeout): } } func (p *DirectPeer) currentVideoBitrateKbps() int { targetBps := p.targetBitrateBps.Load() if targetBps <= 0 { return defaultDirectVideoBitrateKbps } kbps := int((targetBps * videoBitrateSafetyPercent) / 100 / 1000) if kbps < minDirectVideoBitrateKbps { return minDirectVideoBitrateKbps } if kbps > maxDirectVideoBitrateKbps { return maxDirectVideoBitrateKbps } return kbps } // StopAVPipeline shuts down the AV pipeline goroutines. func (p *DirectPeer) StopAVPipeline() { if p.avPipelineCancel != nil { p.avPipelineCancel() } if p.encodeCh != nil { close(p.encodeCh) } p.avPipelineWg.Wait() } func (p *DirectPeer) runEncoder() { defer p.avPipelineWg.Done() defer close(p.publishCh) for raw := range p.encodeCh { if p.avPipelineCtx.Err() != nil { return } // Fence marker: pass through if raw.Fence != nil && len(raw.RGB) == 0 { select { case p.publishCh <- &mediapeer.AVSegment{Fence: raw.Fence}: case <-p.avPipelineCtx.Done(): return } continue } if p.isPlaybackStale(raw.Epoch) { continue } encodeStart := time.Now() videoBitrateKbps := p.currentVideoBitrateKbps() directVoiceTrace( "direct_vp8_encode_started", raw.TraceLabel, "queue_ms=%d bitrate_kbps=%d", raw.UserFinalAt, time.Since(raw.QueuedAt).Milliseconds(), videoBitrateKbps, ) if p.avCalibrationEnabled.Load() { p.injectAVCalibrationMarker(raw) } vp8Samples, err := mediapeer.EncodeRGBChunkToVP8SamplesWithBitrate( raw.RGB, raw.Width, raw.Height, raw.NumFrames, raw.FPS, videoBitrateKbps, ) if err != nil { log.Printf("[DirectPeer] encode failed: %v", err) continue } if len(vp8Samples) == 0 { continue } directVoiceTrace( "direct_vp8_encode_done", raw.TraceLabel, "encode_ms=%d", raw.UserFinalAt, time.Since(encodeStart).Milliseconds(), ) seg := &mediapeer.AVSegment{ TraceLabel: raw.TraceLabel, Epoch: raw.Epoch, SegmentSeq: raw.SegmentSeq, MediaStartMS: raw.MediaStartMS, DurationMS: raw.DurationMS, MarkerID: raw.MarkerID, MarkerMediaMS: raw.MarkerMediaMS, MarkerDurationMS: raw.MarkerDurationMS, MarkerFrequencyHz: raw.MarkerFrequencyHz, VP8Samples: vp8Samples, PCM: raw.PCM, UserFinalAt: raw.UserFinalAt, SampleRate: raw.SampleRate, Width: raw.Width, Height: raw.Height, FPS: raw.FPS, NumFrames: raw.NumFrames, QueuedAt: raw.QueuedAt, } select { case p.publishCh <- seg: case <-p.avPipelineCtx.Done(): return } } } func (p *DirectPeer) injectAVCalibrationMarker(raw *mediapeer.RawAVSegment) { if raw == nil || raw.NumFrames <= 0 || raw.Width <= 0 || raw.Height <= 0 || raw.FPS <= 0 { return } if len(raw.RGB) < raw.Width*raw.Height*3*raw.NumFrames || len(raw.PCM) == 0 || raw.SampleRate <= 0 { return } frameIndex := 0 if raw.NumFrames > 4 { frameIndex = 2 } markerFrames := 1 if raw.NumFrames-frameIndex > 1 { markerFrames = 2 } markerOffsetMS := int64(math.Round(float64(frameIndex) * 1000 / float64(raw.FPS))) markerDurationMS := int64(math.Round(float64(markerFrames) * 1000 / float64(raw.FPS))) if markerDurationMS < 60 { markerDurationMS = 60 } raw.MarkerID = p.avCalibrationMarkerSeq.Add(1) raw.MarkerMediaMS = raw.MediaStartMS + markerOffsetMS raw.MarkerDurationMS = markerDurationMS raw.MarkerFrequencyHz = 1800 paintAVCalibrationFlash(raw, frameIndex, markerFrames) mixAVCalibrationChirp(raw.PCM, raw.SampleRate, markerOffsetMS, markerDurationMS, raw.MarkerFrequencyHz) } func paintAVCalibrationFlash(raw *mediapeer.RawAVSegment, frameIndex, markerFrames int) { blockW := minInt(raw.Width, maxInt(48, raw.Width/5)) blockH := minInt(raw.Height, maxInt(48, raw.Height/5)) frameSize := raw.Width * raw.Height * 3 for f := frameIndex; f < frameIndex+markerFrames && f < raw.NumFrames; f++ { base := f * frameSize for y := 0; y < blockH; y++ { row := base + y*raw.Width*3 for x := 0; x < blockW; x++ { i := row + x*3 raw.RGB[i] = 255 raw.RGB[i+1] = 0 raw.RGB[i+2] = 255 } } } } func mixAVCalibrationChirp(pcm []byte, sampleRate int, offsetMS, durationMS int64, frequencyHz int) { start := int((offsetMS * int64(sampleRate)) / 1000) count := int((durationMS * int64(sampleRate)) / 1000) if count <= 0 || start < 0 || start >= len(pcm)/2 { return } if start+count > len(pcm)/2 { count = len(pcm)/2 - start } const amplitude = 22000 fadeSamples := maxInt(1, sampleRate/200) for i := 0; i < count; i++ { t := float64(i) / float64(sampleRate) fade := 1.0 if i < fadeSamples { fade = float64(i) / float64(fadeSamples) } else if remain := count - i; remain < fadeSamples { fade = float64(remain) / float64(fadeSamples) } add := int(math.Round(math.Sin(2*math.Pi*float64(frequencyHz)*t) * amplitude * fade)) idx := (start + i) * 2 current := int(int16(binary.LittleEndian.Uint16(pcm[idx:]))) next := current + add if next > 32767 { next = 32767 } else if next < -32768 { next = -32768 } binary.LittleEndian.PutUint16(pcm[idx:], uint16(int16(next))) } } func minInt(a, b int) int { if a < b { return a } return b } func maxInt(a, b int) int { if a > b { return a } return b } func audioDelayPCMBytes(delayMS int64, sampleRate int) int { if delayMS <= 0 || sampleRate <= 0 { return 0 } samples := int((delayMS * int64(sampleRate)) / 1000) return samples * 2 } func (p *DirectPeer) applyAudioDelay(epoch uint64, pcm []byte, sampleRate int) []byte { if len(pcm) == 0 || sampleRate <= 0 { return pcm } p.audioDelayMu.Lock() defer p.audioDelayMu.Unlock() if p.audioDelayEpoch != epoch || p.audioDelaySampleRate != sampleRate { p.audioDelayEpoch = epoch p.audioDelaySampleRate = sampleRate p.audioDelayTargetMS = 0 p.audioDelayCurrentMS = 0 p.audioDelayPCM = nil p.audioDelayLastFeedback = time.Time{} } if p.audioDelayTargetMS > 0 && !p.audioDelayLastFeedback.IsZero() && time.Since(p.audioDelayLastFeedback) > audioDelayFeedbackMaxAge { p.audioDelayTargetMS -= audioDelayStepMS if p.audioDelayTargetMS < 0 { p.audioDelayTargetMS = 0 } p.audioDelayLastFeedback = time.Now() } previousDelayMS := p.audioDelayCurrentMS if p.audioDelayTargetMS > p.audioDelayCurrentMS+audioDelayStepMS { p.audioDelayCurrentMS += audioDelayStepMS } else if p.audioDelayTargetMS < p.audioDelayCurrentMS-audioDelayStepMS { p.audioDelayCurrentMS -= audioDelayStepMS } else { p.audioDelayCurrentMS = p.audioDelayTargetMS } if p.audioDelayCurrentMS > 0 || p.audioDelayTargetMS > 0 || previousDelayMS > 0 { log.Printf( "[DirectPeer] session=%s audio delay apply epoch=%d current=%dms target=%dms", p.sessionID, epoch, p.audioDelayCurrentMS, p.audioDelayTargetMS, ) } desiredBytes := audioDelayPCMBytes(p.audioDelayCurrentMS, sampleRate) if desiredBytes <= 0 { p.audioDelayPCM = nil return pcm } if len(p.audioDelayPCM) < desiredBytes { p.audioDelayPCM = append(p.audioDelayPCM, make([]byte, desiredBytes-len(p.audioDelayPCM))...) } else if len(p.audioDelayPCM) > desiredBytes { p.audioDelayPCM = p.audioDelayPCM[len(p.audioDelayPCM)-desiredBytes:] } combined := make([]byte, 0, len(p.audioDelayPCM)+len(pcm)) combined = append(combined, p.audioDelayPCM...) combined = append(combined, pcm...) out := make([]byte, len(pcm)) copy(out, combined) if len(combined) > len(pcm) { p.audioDelayPCM = append([]byte(nil), combined[len(pcm):]...) } else { p.audioDelayPCM = nil } return out } func (p *DirectPeer) runPublisher() { defer p.avPipelineWg.Done() for seg := range p.publishCh { if p.avPipelineCtx.Err() != nil { return } // Fence marker if seg.Fence != nil && len(seg.VP8Samples) == 0 { close(seg.Fence) continue } if p.isPlaybackStale(seg.Epoch) { continue } p.publishAVSegment(seg) } } func (p *DirectPeer) waitConnected(timeout time.Duration) bool { select { case <-p.connected: return true case <-p.avPipelineCtx.Done(): return false case <-time.After(timeout): return false } } func (p *DirectPeer) publishAVSegment(seg *mediapeer.AVSegment) { p.mediaMu.Lock() defer p.mediaMu.Unlock() // Wait for connection to be established if !p.waitConnected(10 * time.Second) { if p.avPipelineCtx.Err() == nil { log.Printf("[DirectPeer] session=%s publish timeout waiting for connection", p.sessionID) } return } p.mu.Lock() videoTrack := p.videoTrack audioTrack := p.audioTrack p.mu.Unlock() if videoTrack == nil { log.Printf("[DirectPeer] session=%s publish skipped: video track is not ready", p.sessionID) return } publishStart := time.Now() directVoiceTrace( "direct_publish_started", seg.TraceLabel, "queue_ms=%d", seg.UserFinalAt, time.Since(seg.QueuedAt).Milliseconds(), ) fps := seg.FPS if fps <= 0 { fps = 25 } frameDur := time.Second / time.Duration(fps) // Encode entire PCM buffer into Opus frames up-front to avoid // the sample-loss caused by slicing PCM per video frame. var opusFrames []media.Sample if len(seg.PCM) > 0 && seg.SampleRate > 0 { if audioTrack == nil { log.Printf("[DirectPeer] session=%s publish skipped: audio track is not ready", p.sessionID) return } var err error pcm := p.applyAudioDelay(seg.Epoch, seg.PCM, seg.SampleRate) opusFrames, err = mediapeer.EncodePCMToOpusSamples(pcm, seg.SampleRate) if err != nil { log.Printf("[DirectPeer] audio encode error: %v", err) return } } p.sendAVSegmentDiagnostic(seg, publishStart) // --- RTP timestamp gap correction --- // Between segments WriteSample pauses; without advancing the RTP clock the // browser jitter buffer treats the next frame as very late. Pion advances // RTP timestamps after WriteSample, so the idle gap must be skipped with an // empty sample before the first real audio/video sample in this segment. now := time.Now() rtpGap := time.Duration(0) if !p.lastVideoWriteTime.IsZero() { wallGap := now.Sub(p.lastVideoWriteTime) rtpGap = rtpGapToSkip(wallGap, frameDur) if rtpGap > 0 { if rtpGap != wallGap { log.Printf("[DirectPeer] session=%s RTP timestamp gap correction: wall=%v skipped=%v", p.sessionID, wallGap, rtpGap) } else { log.Printf("[DirectPeer] session=%s RTP timestamp gap correction skipped=%v", p.sessionID, rtpGap) } } } if rtpGap > 0 { if len(opusFrames) > 0 { if err := audioTrack.WriteSample(media.Sample{Duration: rtpGap}); err != nil { log.Printf("[DirectPeer] audio RTP gap skip error: %v", err) return } } if len(seg.VP8Samples) > 0 { if err := videoTrack.WriteSample(media.Sample{Duration: rtpGap}); err != nil { log.Printf("[DirectPeer] video RTP gap skip error: %v", err) return } } } nVideo := len(seg.VP8Samples) segmentStartWall := time.Now() segmentDuration := avSegmentWallDuration(seg, frameDur, opusFrames) maxSlip := time.Duration(0) firstVideoWritten := false videoIndex := 0 audioIndex := 0 audioOffset := time.Duration(0) for videoIndex < nVideo || audioIndex < len(opusFrames) { if p.isPlaybackStale(seg.Epoch) { return } videoDeadline := segmentStartWall.Add(time.Duration(videoIndex) * frameDur) audioDeadline := segmentStartWall.Add(audioOffset) if videoIndex < nVideo && (audioIndex >= len(opusFrames) || !videoDeadline.After(audioDeadline)) { slip, err := p.sleepUntil(videoDeadline) if err != nil { log.Printf("[DirectPeer] video pacing error: %v", err) return } if slip > maxSlip { maxSlip = slip } if err := videoTrack.WriteSample(seg.VP8Samples[videoIndex]); err != nil { log.Printf("[DirectPeer] video write error: %v", err) return } if !firstVideoWritten { firstVideoWritten = true directVoiceTrace( "direct_first_video_sample_written", seg.TraceLabel, "publish_ms=%d", seg.UserFinalAt, time.Since(publishStart).Milliseconds(), ) } videoIndex++ continue } if audioIndex < len(opusFrames) { slip, err := p.sleepUntil(audioDeadline) if err != nil { log.Printf("[DirectPeer] audio pacing error: %v", err) return } if slip > maxSlip { maxSlip = slip } sample := opusFrames[audioIndex] if err := audioTrack.WriteSample(sample); err != nil { log.Printf("[DirectPeer] audio write error: %v", err) return } audioOffset += mediaSampleDuration(sample, 20*time.Millisecond) audioIndex++ } } if segmentDuration > 0 { slip, err := p.sleepUntil(segmentStartWall.Add(segmentDuration)) if err != nil { log.Printf("[DirectPeer] segment pacing error: %v", err) return } if slip > maxSlip { maxSlip = slip } } if maxSlip >= 20*time.Millisecond { directVoiceTrace( "direct_publish_pacing_slip", seg.TraceLabel, "seg=%d max_slip_ms=%d publish_ms=%d", seg.UserFinalAt, seg.SegmentSeq, maxSlip.Milliseconds(), time.Since(publishStart).Milliseconds(), ) } // Record the last write time for gap correction on the next segment. if segmentDuration > 0 { p.lastVideoWriteTime = segmentStartWall.Add(segmentDuration) } else { p.lastVideoWriteTime = time.Now() } p.lastAudioWriteTime = p.lastVideoWriteTime } func mediaSampleDuration(sample media.Sample, fallback time.Duration) time.Duration { if sample.Duration > 0 { return sample.Duration } return fallback } func avSegmentWallDuration(seg *mediapeer.AVSegment, frameDur time.Duration, opusFrames []media.Sample) time.Duration { if seg.DurationMS > 0 { return time.Duration(seg.DurationMS) * time.Millisecond } videoDuration := time.Duration(len(seg.VP8Samples)) * frameDur audioDuration := time.Duration(0) for _, sample := range opusFrames { audioDuration += mediaSampleDuration(sample, 20*time.Millisecond) } if audioDuration > videoDuration { return audioDuration } return videoDuration } func (p *DirectPeer) sendAVSegmentDiagnostic(seg *mediapeer.AVSegment, publishStart time.Time) { if p.signalingFn == nil || seg == nil || seg.SegmentSeq <= 0 { return } fps := seg.FPS if fps <= 0 { fps = 25 } durationMS := seg.DurationMS if durationMS <= 0 && seg.NumFrames > 0 && fps > 0 { durationMS = int64(math.Round(float64(seg.NumFrames) * 1000 / float64(fps))) } p.signalingFn(p.sessionID, map[string]any{ "type": "av_segment_diagnostic", "turn_seq": seg.Epoch, "segment_seq": seg.SegmentSeq, "media_start_ms": seg.MediaStartMS, "duration_ms": durationMS, "video_frames": seg.NumFrames, "fps": fps, "audio_samples": len(seg.PCM) / 2, "sample_rate": seg.SampleRate, "publish_wall_ms": publishStart.UnixMilli(), "marker_id": seg.MarkerID, "marker_media_time_ms": seg.MarkerMediaMS, "marker_duration_ms": seg.MarkerDurationMS, "marker_frequency_hz": seg.MarkerFrequencyHz, }) } func (p *DirectPeer) isPlaybackStale(epoch uint64) bool { if epoch == 0 { return false } current := p.playbackEpoch.Load() return current > 0 && epoch < current } // PublishAudioFrame publishes raw PCM audio (for TTS in standard pipeline). func (p *DirectPeer) PublishAudioFrame(pcm []byte, sampleRate int) error { if len(pcm) == 0 || sampleRate <= 0 { return nil } p.mediaMu.Lock() defer p.mediaMu.Unlock() if !p.waitConnected(10 * time.Second) { if p.avPipelineCtx.Err() != nil { return fmt.Errorf("audio publish cancelled") } return fmt.Errorf("audio publish timeout waiting for connection") } return p.writeOpus(pcm, sampleRate) } // writeOpus encodes PCM to Opus and writes to the audio track. // Opus encodes in 20ms frames. func (p *DirectPeer) writeOpus(pcm []byte, sampleRate int) error { p.opusMu.Lock() defer p.opusMu.Unlock() p.mu.Lock() audioTrack := p.audioTrack p.mu.Unlock() if audioTrack == nil { return fmt.Errorf("audio track is not ready") } // Lazily create or recreate encoder if sample rate changed if p.opusEncoder == nil || p.opusEncoderSR != sampleRate { enc, err := opus.NewEncoder(sampleRate, 1, opus.AppVoIP) if err != nil { return fmt.Errorf("create opus encoder: %w", err) } p.opusEncoder = enc p.opusEncoderSR = sampleRate } // Process PCM in 20ms frames samplesPerFrame := sampleRate / 50 // 20ms = 1/50 second bytesPerFrame := samplesPerFrame * 2 // 16-bit mono opusBuf := make([]byte, 4000) // max opus frame size frameDuration := 20 * time.Millisecond for offset := 0; offset+bytesPerFrame <= len(pcm); offset += bytesPerFrame { // Convert bytes to int16 samples samples := make([]int16, samplesPerFrame) for i := 0; i < samplesPerFrame; i++ { samples[i] = int16(binary.LittleEndian.Uint16(pcm[offset+i*2:])) } n, err := p.opusEncoder.Encode(samples, opusBuf) if err != nil { return fmt.Errorf("opus encode: %w", err) } if n == 0 { continue } payload := append([]byte(nil), opusBuf[:n]...) if err := audioTrack.WriteSample(media.Sample{ Data: payload, Duration: frameDuration, }); err != nil { return fmt.Errorf("write audio sample: %w", err) } if err := p.sleepAudioFrame(frameDuration); err != nil { return err } } return nil } func (p *DirectPeer) sleepAudioFrame(d time.Duration) error { if d <= 0 { return nil } _, err := p.sleepUntil(time.Now().Add(d)) return err } func (p *DirectPeer) sleepUntil(deadline time.Time) (time.Duration, error) { if deadline.IsZero() { return 0, nil } delay := time.Until(deadline) if delay <= 0 { return -delay, nil } ctx := p.avPipelineCtx if ctx == nil { time.Sleep(delay) if slip := time.Since(deadline); slip > 0 { return slip, nil } return 0, nil } timer := time.NewTimer(delay) defer timer.Stop() select { case <-ctx.Done(): return 0, fmt.Errorf("media publish cancelled") case <-timer.C: if slip := time.Since(deadline); slip > 0 { return slip, nil } return 0, nil } } // Disconnect tears down the peer connection and releases resources. func (p *DirectPeer) Disconnect() error { p.mu.Lock() defer p.mu.Unlock() if p.pc != nil { err := p.pc.Close() p.pc = nil close(p.userAudioCh) return err } return nil }