package mediapeer import ( "bytes" "encoding/binary" "fmt" "os/exec" "time" "github.com/pion/webrtc/v4/pkg/media" "github.com/pion/webrtc/v4/pkg/media/ivfreader" opus "gopkg.in/hraban/opus.v2" ) // RawAVSegment is an unencoded video+audio segment ready for VP8 encoding. type RawAVSegment struct { TraceLabel string Epoch uint64 SegmentSeq int64 MediaStartMS int64 DurationMS int64 MarkerID int64 MarkerMediaMS int64 MarkerDurationMS int64 MarkerFrequencyHz int RGB []byte PCM []byte UserFinalAt time.Time SampleRate int Width int Height int FPS int NumFrames int QueuedAt time.Time // set by SendAVSegment for pipeline latency tracking // Fence is a pipeline drain marker. When set (non-nil) and RGB is empty, // the encoder passes it through without encoding. The publisher closes // the channel after finishing all preceding segments, signalling the // producer that the pipeline has been fully drained. Fence chan struct{} } // AVSegment is a pre-encoded video+audio segment ready for paced publishing. type AVSegment struct { TraceLabel string Epoch uint64 SegmentSeq int64 MediaStartMS int64 DurationMS int64 MarkerID int64 MarkerMediaMS int64 MarkerDurationMS int64 MarkerFrequencyHz int VP8Samples []media.Sample PCM []byte UserFinalAt time.Time SampleRate int Width int Height int FPS int NumFrames int QueuedAt time.Time // carried from RawAVSegment for end-to-end latency Fence chan struct{} } const defaultVP8BitrateKbps = 2000 // EncodeRGBChunkToVP8Samples encodes a contiguous RGB24 buffer to VP8 samples. func EncodeRGBChunkToVP8Samples(rgb []byte, width, height, numFrames, fps int) ([]media.Sample, error) { return EncodeRGBChunkToVP8SamplesWithBitrate(rgb, width, height, numFrames, fps, defaultVP8BitrateKbps) } // EncodeRGBChunkToVP8SamplesWithBitrate encodes a contiguous RGB24 buffer to VP8 samples. func EncodeRGBChunkToVP8SamplesWithBitrate(rgb []byte, width, height, numFrames, fps int, bitrateKbps int) ([]media.Sample, error) { if numFrames <= 0 || width <= 0 || height <= 0 { return nil, nil } if fps <= 0 { fps = 25 } if bitrateKbps <= 0 { bitrateKbps = defaultVP8BitrateKbps } want := width * height * 3 * numFrames if len(rgb) < want { return nil, fmt.Errorf("rgb buffer too small: got %d want >= %d", len(rgb), want) } rgb = rgb[:want] frameDur := time.Second / time.Duration(fps) if frameDur <= 0 { frameDur = time.Millisecond * 40 } // IVF VP8 via libvpx; single-process encode per chunk (latency vs simplicity tradeoff). bitrateArg := fmt.Sprintf("%dk", bitrateKbps) bufSizeArg := fmt.Sprintf("%dk", bitrateKbps*2) keyframeInterval := fmt.Sprintf("%d", numFrames) cmd := exec.Command( "ffmpeg", "-loglevel", "error", "-f", "rawvideo", "-pixel_format", "rgb24", "-video_size", fmt.Sprintf("%dx%d", width, height), "-framerate", fmt.Sprintf("%d", fps), "-i", "pipe:0", "-frames:v", fmt.Sprintf("%d", numFrames), "-c:v", "libvpx", "-deadline", "realtime", "-cpu-used", "8", "-row-mt", "1", "-b:v", bitrateArg, "-maxrate", bitrateArg, "-bufsize", bufSizeArg, "-g", keyframeInterval, // Each restarted chunk starts with a keyframe; keep later frames as inter frames. "-keyint_min", keyframeInterval, "-an", "-f", "ivf", "pipe:1", ) cmd.Stdin = bytes.NewReader(rgb) var ivfOut bytes.Buffer cmd.Stdout = &ivfOut var stderr bytes.Buffer cmd.Stderr = &stderr if err := cmd.Run(); err != nil { return nil, fmt.Errorf("ffmpeg vp8: %w: %s", err, stderr.String()) } reader, fh, err := ivfreader.NewWith(&ivfOut) if err != nil { return nil, fmt.Errorf("ivf header: %w", err) } n := int(fh.NumFrames) if n <= 0 { n = numFrames } var samples []media.Sample for i := 0; i < n; i++ { payload, _, err := reader.ParseNextFrame() if err != nil { break } if len(payload) == 0 { continue } frame := make([]byte, len(payload)) copy(frame, payload) samples = append(samples, media.Sample{ Data: frame, Duration: frameDur, }) } if len(samples) == 0 { return nil, fmt.Errorf("ffmpeg produced zero vp8 frames") } return samples, nil } // EncodePCMToOpusSamples encodes an entire 16-bit mono PCM buffer into // 20ms Opus frames returned as media.Samples. Encoding the full buffer // at once avoids the sample-loss caused by splitting PCM into small // slices and encoding each independently. func EncodePCMToOpusSamples(pcm []byte, sampleRate int) ([]media.Sample, error) { if len(pcm) == 0 || sampleRate <= 0 { return nil, nil } enc, err := opus.NewEncoder(sampleRate, 1, opus.AppVoIP) if err != nil { return nil, fmt.Errorf("create opus encoder: %w", err) } // 20ms frame parameters samplesPerFrame := sampleRate / 50 // 20ms = 1/50 second bytesPerFrame := samplesPerFrame * 2 // 16-bit mono if bytesPerFrame <= 0 { return nil, fmt.Errorf("invalid opus frame size for sample_rate=%d", sampleRate) } opusBuf := make([]byte, 4000) // max opus frame size // Pad trailing bytes (often silence from strict segment sizing) so RTP audio // duration matches the PCM buffer length and lip-synced video segments. if rem := len(pcm) % bytesPerFrame; rem != 0 { pcm = append(append([]byte(nil), pcm...), make([]byte, bytesPerFrame-rem)...) } estimatedFrames := len(pcm) / bytesPerFrame samples := make([]media.Sample, 0, estimatedFrames) for offset := 0; offset+bytesPerFrame <= len(pcm); offset += bytesPerFrame { pcmSamples := make([]int16, samplesPerFrame) for i := 0; i < samplesPerFrame; i++ { pcmSamples[i] = int16(binary.LittleEndian.Uint16(pcm[offset+i*2:])) } n, err := enc.Encode(pcmSamples, opusBuf) if err != nil { return nil, fmt.Errorf("opus encode: %w", err) } if n == 0 { continue } frame := make([]byte, n) copy(frame, opusBuf[:n]) samples = append(samples, media.Sample{ Data: frame, Duration: 20 * time.Millisecond, }) } return samples, nil }