package orchestrator import ( "bytes" "context" "encoding/base64" "encoding/binary" "encoding/json" "errors" "fmt" "image" "image/color" "image/draw" "image/png" "log" "math" "math/rand" "os" "path/filepath" "strconv" "strings" "sync" "time" "github.com/cyberverse/server/internal/agenttask" "github.com/cyberverse/server/internal/character" "github.com/cyberverse/server/internal/config" "github.com/cyberverse/server/internal/direct" "github.com/cyberverse/server/internal/inference" "github.com/cyberverse/server/internal/kanshan" "github.com/cyberverse/server/internal/livekit" "github.com/cyberverse/server/internal/mediapeer" "github.com/cyberverse/server/internal/pb" ragstore "github.com/cyberverse/server/internal/rag" "github.com/cyberverse/server/internal/recording" "github.com/cyberverse/server/internal/ws" "github.com/pion/interceptor/pkg/cc" "github.com/pion/webrtc/v4" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" ) // stdChunksPerSegment is how many avatar video chunks to batch before publishing // in the standard (ASR/text→LLM→TTS→Avatar) pipeline. Qwen TTS produces audio // before avatar video, so each avatar chunk is sent as its own paced AV segment // with matching PCM instead of publishing audio ahead of video. const stdChunksPerSegment = 1 // No hard cap on the assistant PCM buffer: long responses (>20s) were // previously truncated, causing the first N seconds of audio to be dropped // and all video segments to play with misaligned (or silent) audio. // Set to 0 to disable the overflow guard entirely. const voiceMaxPCMBufferSamples = 0 const avatarImageMaxUploadHint = "角色头像图片超过当前 10MB 上传限制,已使用默认头像;待机视频也不会生成。请压缩或缩放角色图片到 10MB 以内后重试。" const ( doubaoDialogContextMaxPairs = 20 doubaoDialogContextLoadLimit = doubaoDialogContextMaxPairs * 4 startupGreetingHistoryItems = 6 qwenOmniMaxVisualFrameBytes = 500 * 1024 ) const standardGlobalSystemPrompt = `你需要遵守以下通用回复规范,这些规范优先于角色设定。 回复长度: - 默认简洁,先直接回答用户真正问的事。 - 闲聊、确认、简单提问时,用 1-3 句话回答。 - 推荐、方案、分析、教程可以展开,但先给结论,再给细节。 - 用户明确要求详细、完整、深入时,才使用较长结构化回答。 表达风格: - 像真实的人聊天,直接、自然、有分寸。 - 不使用固定开场白,不写舞台动作或心理活动。 - 不堆叠 emoji、感叹号、波浪号或无信息量的亲密称呼。 - 避免 AI 味套话,例如“作为一个 AI”“希望这对你有帮助”“我将从以下几个方面”。 内容质量: - 不说空话,不绕圈子,不为了亲切而尬夸或过度共情。 - 不确定的事情要说明不确定,不要编造。 - 需要澄清时,只问一个最关键的问题。` var ( ErrVisualInputUnsupported = errors.New("visual input is only supported in standard sessions or qwen_omni voice sessions") ErrVisualInputDisabled = errors.New("visual input is disabled") ) type voiceAVSyncBuffer struct { mu sync.Mutex pcmBytes []byte sampleRate int totalAudioIn int64 totalAudioOut int64 maxBufferSamples int // Carries fractional samples from frames*sampleRate/fps to avoid // long-session drift caused by per-segment integer rounding. sampleCarryNumer int64 } func newVoiceAVSyncBuffer(maxBufferSamples int) *voiceAVSyncBuffer { if maxBufferSamples <= 0 { maxBufferSamples = voiceMaxPCMBufferSamples } return &voiceAVSyncBuffer{maxBufferSamples: maxBufferSamples} } func (b *voiceAVSyncBuffer) appendPCM(pcm []byte, sampleRate int) (droppedBytes int) { b.mu.Lock() defer b.mu.Unlock() if len(pcm) == 0 || sampleRate <= 0 { return 0 } if b.sampleRate == 0 { b.sampleRate = sampleRate } if b.sampleRate != sampleRate { b.sampleRate = sampleRate b.pcmBytes = nil } if len(pcm)%2 != 0 { pcm = pcm[:len(pcm)-1] } if len(pcm) == 0 { return 0 } b.pcmBytes = append(b.pcmBytes, pcm...) b.totalAudioIn += int64(len(pcm) / 2) maxBytes := b.maxBufferSamples * 2 if maxBytes > 0 && len(b.pcmBytes) > maxBytes { droppedBytes = len(b.pcmBytes) - maxBytes if droppedBytes%2 != 0 { droppedBytes++ } b.pcmBytes = b.pcmBytes[droppedBytes:] } return droppedBytes } func desiredSamplesForVideo(frames, fps, sampleRate int) int { if frames <= 0 || fps <= 0 || sampleRate <= 0 { return 0 } // Rounded target samples for segment duration = frames / fps seconds. return (frames*sampleRate + fps/2) / fps } func durationMSForVideo(frames, fps int) int64 { if frames <= 0 || fps <= 0 { return 0 } return int64(math.Round(float64(frames) * 1000 / float64(fps))) } func (b *voiceAVSyncBuffer) takeSegmentPCM(frames, fps int, isFinal bool) ([]byte, int, int, int) { b.mu.Lock() defer b.mu.Unlock() if frames <= 0 || fps <= 0 || b.sampleRate <= 0 { return nil, 0, 0, 0 } // Exact target with carry: // want = floor((frames*sampleRate + carry)/fps), carry = modulo part. numer := int64(frames*b.sampleRate) + b.sampleCarryNumer wantSamples := int(numer / int64(fps)) b.sampleCarryNumer = numer % int64(fps) if wantSamples <= 0 { wantSamples = desiredSamplesForVideo(frames, fps, b.sampleRate) } if wantSamples <= 0 { return nil, 0, 0, len(b.pcmBytes) / 2 } wantBytes := wantSamples * 2 availableBytes := len(b.pcmBytes) takeBytes := wantBytes if takeBytes > availableBytes { takeBytes = availableBytes } if takeBytes%2 != 0 { takeBytes-- } out := make([]byte, wantBytes) // strict lip-sync: always return exact segment duration if takeBytes > 0 { copy(out, b.pcmBytes[:takeBytes]) b.pcmBytes = b.pcmBytes[takeBytes:] } outSamples := takeBytes / 2 b.totalAudioOut += int64(outSamples) if isFinal { // Final close-loop: strict mode prefers exact A/V alignment over // carrying remaining tail audio into post-video silence. b.pcmBytes = nil } return out, outSamples, wantSamples, len(b.pcmBytes) / 2 } func (b *voiceAVSyncBuffer) snapshot() (bufferedSamples int, totalIn int64, totalOut int64, sampleRate int) { b.mu.Lock() defer b.mu.Unlock() return len(b.pcmBytes) / 2, b.totalAudioIn, b.totalAudioOut, b.sampleRate } type voicePipelineTurn struct { seq uint64 key string questionID string replyID string assistantText string recTurnID string recAudioBuf []byte recAudioSR int historySaved bool conversationSaved bool transcriptSaved bool rawAudioSaved bool sessionDir string turnStart time.Time userFinalAt time.Time firstAudioAt time.Time audioFinalAt time.Time avatarWorkerAt time.Time firstAvatarAudioAt time.Time avatarInputClosedAt time.Time firstVideoAt time.Time syncBuf *voiceAVSyncBuffer avatarStarted bool audioOnlyStarted bool avatarInputClosed bool avatarAudioCh chan *pb.AudioChunk avatarCtx context.Context avatarCancel context.CancelFunc doneCh chan voicePipelineTurnResult aborted bool } type voicePipelineTurnResult struct { turn *voicePipelineTurn err error } func voiceOutputTurnKey(output *pb.VoiceLLMOutput) string { if output == nil { return "" } if replyID := strings.TrimSpace(output.GetReplyId()); replyID != "" { return "reply:" + replyID } if questionID := strings.TrimSpace(output.GetQuestionId()); questionID != "" { return "question:" + questionID } return "" } func voiceOutputHasAssistantContent(output *pb.VoiceLLMOutput) bool { if output == nil { return false } if output.GetTranscript() != "" { return true } audio := output.GetAudio() return audio != nil && len(audio.GetData()) > 0 } func voiceOutputIsFinal(output *pb.VoiceLLMOutput) bool { if output == nil { return false } if output.GetIsFinal() { return true } audio := output.GetAudio() return audio != nil && audio.GetIsFinal() } type dialogContextMessage struct { sessionID string role string text string timestamp time.Time } func stringValue(v any) string { if s, ok := v.(string); ok { return strings.TrimSpace(s) } return "" } func mapValue(v any) map[string]any { if m, ok := v.(map[string]any); ok { return m } return nil } func intValue(v any, fallback int) int { switch n := v.(type) { case int: return n case int64: return int(n) case float64: return int(n) case json.Number: if parsed, err := n.Int64(); err == nil { return int(parsed) } case string: if parsed, err := strconv.Atoi(strings.TrimSpace(n)); err == nil { return parsed } } return fallback } func taskStatusValue(v any, fallback agenttask.Status) agenttask.Status { status := agenttask.Status(stringValue(v)) switch status { case agenttask.StatusQueued, agenttask.StatusRunning, agenttask.StatusWaitingUser, agenttask.StatusCompleted, agenttask.StatusFailed, agenttask.StatusCancelled: return status default: return fallback } } func unixTimeFromNumber(n int64) time.Time { if n <= 0 { return time.Time{} } if n > 1_000_000_000_000 { return time.UnixMilli(n).UTC() } return time.Unix(n, 0).UTC() } func parseConversationTimestamp(v any) time.Time { switch t := v.(type) { case string: t = strings.TrimSpace(t) if t == "" { return time.Time{} } if parsed, err := time.Parse(time.RFC3339Nano, t); err == nil { return parsed.UTC() } if parsed, err := time.Parse(time.RFC3339, t); err == nil { return parsed.UTC() } if n, err := strconv.ParseInt(t, 10, 64); err == nil { return unixTimeFromNumber(n) } case float64: return unixTimeFromNumber(int64(t)) case int: return unixTimeFromNumber(int64(t)) case int64: return unixTimeFromNumber(t) case json.Number: if n, err := t.Int64(); err == nil { return unixTimeFromNumber(n) } } return time.Time{} } func buildDoubaoDialogContext(messages []map[string]any, maxPairs int, now time.Time) []DialogContextItem { if maxPairs <= 0 { maxPairs = doubaoDialogContextMaxPairs } if now.IsZero() { now = time.Now().UTC() } else { now = now.UTC() } filtered := make([]dialogContextMessage, 0, len(messages)) for _, msg := range messages { role := strings.ToLower(stringValue(msg["role"])) if role != "user" && role != "assistant" { continue } text := stringValue(msg["content"]) if text == "" { text = stringValue(msg["text"]) } if text == "" { continue } sessionID := stringValue(msg["session_id"]) if sessionID == "" { continue } filtered = append(filtered, dialogContextMessage{ sessionID: sessionID, role: role, text: text, timestamp: parseConversationTimestamp(msg["timestamp"]), }) } paired := make([]dialogContextMessage, 0, len(filtered)) var pendingUsers []dialogContextMessage pendingSessionID := "" for _, msg := range filtered { if len(pendingUsers) > 0 && msg.sessionID != pendingSessionID { pendingUsers = nil pendingSessionID = "" } if msg.role == "user" { if len(pendingUsers) == 0 { pendingSessionID = msg.sessionID } pendingUsers = append(pendingUsers, msg) continue } if len(pendingUsers) == 0 || msg.sessionID != pendingSessionID { continue } var merged strings.Builder for i, userMsg := range pendingUsers { if i > 0 { merged.WriteString("\n") } merged.WriteString(userMsg.text) } paired = append(paired, dialogContextMessage{ sessionID: pendingSessionID, role: "user", text: merged.String(), timestamp: pendingUsers[0].timestamp, }, msg) pendingUsers = nil pendingSessionID = "" } maxItems := maxPairs * 2 if len(paired) > maxItems { paired = paired[len(paired)-maxItems:] } if len(paired) == 0 { return nil } items := make([]DialogContextItem, len(paired)) for i, msg := range paired { items[i] = DialogContextItem{ Role: msg.role, Text: msg.text, Timestamp: msg.timestamp.UnixMilli(), } } return normalizeDialogContextTimestamps(items, now) } func normalizeDialogContextTimestamps(items []DialogContextItem, now time.Time) []DialogContextItem { if len(items) == 0 { return nil } if now.IsZero() { now = time.Now().UTC() } else { now = now.UTC() } fallbackStart := now.Add(-time.Duration(len(items)) * time.Millisecond) for i := range items { if items[i].Timestamp <= 0 || time.UnixMilli(items[i].Timestamp).After(now) { items[i].Timestamp = fallbackStart.Add(time.Duration(i) * time.Millisecond).UnixMilli() } } var last int64 for i := range items { if items[i].Timestamp <= last { items[i].Timestamp = last + 1 } last = items[i].Timestamp } if nowMS := now.UnixMilli(); last > nowMS { delta := last - nowMS for i := range items { items[i].Timestamp -= delta } } return items } func buildVoiceDialogContextFromSession(session *Session, excludeTurnSeq uint64, maxPairs int, now time.Time) []DialogContextItem { if session == nil { return nil } if maxPairs <= 0 { maxPairs = doubaoDialogContextMaxPairs } items := session.DialogContextSnapshot() for _, msg := range session.HistorySnapshot() { if excludeTurnSeq > 0 && msg.TurnSeq == excludeTurnSeq { continue } role := strings.ToLower(strings.TrimSpace(msg.Role)) if role != "user" && role != "assistant" { continue } text := strings.TrimSpace(msg.Content) if text == "" { continue } timestamp := int64(0) if !msg.Timestamp.IsZero() { timestamp = msg.Timestamp.UTC().UnixMilli() } items = append(items, DialogContextItem{ Role: role, Text: text, Timestamp: timestamp, }) } maxItems := maxPairs * 2 if len(items) > maxItems { items = items[len(items)-maxItems:] } return normalizeDialogContextTimestamps(items, now) } func startupGreetingHistory(items []DialogContextItem, maxItems int) []DialogContextItem { if maxItems <= 0 { maxItems = startupGreetingHistoryItems } if len(items) > maxItems { items = items[len(items)-maxItems:] } return append([]DialogContextItem(nil), items...) } func safeTraceValue(value string) string { value = strings.TrimSpace(value) if value == "" { return "-" } replacer := strings.NewReplacer(" ", "_", "\t", "_", "\n", "_", "\r", "_") return replacer.Replace(value) } func voiceTraceLabel(sessionID string, turnSeq uint64, replyID, questionID string, segSeq int64) string { parts := []string{ "sid=" + safeTraceValue(sessionID), "turn=" + strconv.FormatUint(turnSeq, 10), "reply=" + safeTraceValue(replyID), } if questionID != "" { parts = append(parts, "qid="+safeTraceValue(questionID)) } if segSeq > 0 { parts = append(parts, "seg="+strconv.FormatInt(segSeq, 10)) } return strings.Join(parts, " ") } func characterSystemPrompt(char *character.Character, includeName bool, includeSpeakingStyle bool) string { if char == nil { return "" } var parts []string appendField := func(label, value string) { value = strings.TrimSpace(value) if value != "" { parts = append(parts, label+":"+value) } } appendField("角色提示", char.SystemPrompt) if includeName { appendField("角色名称", char.Name) } appendField("角色描述", char.Description) appendField("角色性格", char.Personality) if includeSpeakingStyle { appendField("说话风格", char.SpeakingStyle) } return strings.Join(parts, "\n") } func composeSystemPrompt(globalPrompt string, rolePrompt string) string { globalPrompt = strings.TrimSpace(globalPrompt) rolePrompt = strings.TrimSpace(rolePrompt) switch { case globalPrompt == "" && rolePrompt == "": return "" case globalPrompt == "": return "【角色设定】\n" + rolePrompt case rolePrompt == "": return "【全局输出规范】\n" + globalPrompt default: return "【全局输出规范】\n" + globalPrompt + "\n\n【角色设定】\n" + rolePrompt } } func formatSinceUserFinal(start time.Time) string { if start.IsZero() { return "-" } return strconv.FormatInt(time.Since(start).Milliseconds(), 10) } func logVoiceTrace(event, sessionID string, turnSeq uint64, replyID, questionID string, since time.Time, fields ...string) { parts := []string{ fmt.Sprintf("voice_trace event=%-30s", event), "sid=" + safeTraceValue(sessionID), "turn=" + strconv.FormatUint(turnSeq, 10), "reply=" + safeTraceValue(replyID), } parts = append(parts, "qid="+safeTraceValue(questionID)) parts = append(parts, "since_user_final_ms="+formatSinceUserFinal(since)) parts = append(parts, fields...) log.Print(strings.Join(parts, " ")) } // Orchestrator manages the inference pipeline for each session, // coordinating between the gRPC inference client, media peers, // and WebSocket hub for real-time updates. type Orchestrator struct { inference inference.InferenceService wsHub *ws.Hub sessionMgr *SessionManager charStore *character.Store peers map[string]mediapeer.MediaPeer // sessionID → media peer (Bot or DirectPeer) directPeers map[string]*direct.DirectPeer // sessionID → DirectPeer (for signaling dispatch) recorder *recording.VideoRecorder streamingMode string pipelineCfg config.PipelineConfig turnServer *direct.TURNServer webrtcAPI *webrtc.API estimatorCh <-chan cc.BandwidthEstimator taskService *agenttask.Service avatarMu sync.Mutex mu sync.RWMutex } // New creates a new Orchestrator. func New(inferenceClient inference.InferenceService, hub *ws.Hub, sessionMgr *SessionManager, recorder *recording.VideoRecorder, charStore *character.Store, pipelineCfg ...config.PipelineConfig) *Orchestrator { o := &Orchestrator{ inference: inferenceClient, wsHub: hub, sessionMgr: sessionMgr, charStore: charStore, peers: make(map[string]mediapeer.MediaPeer), directPeers: make(map[string]*direct.DirectPeer), recorder: recorder, } if len(pipelineCfg) > 0 { o.pipelineCfg = pipelineCfg[0] o.streamingMode = pipelineCfg[0].StreamingMode } if o.streamingMode == "" { o.streamingMode = "direct" } return o } // HandleSignaling dispatches WebRTC signaling messages to the DirectPeer. func (o *Orchestrator) HandleSignaling(sessionID string, msg ws.WSMessage) { o.mu.RLock() dp := o.directPeers[sessionID] o.mu.RUnlock() if dp == nil { return } switch msg.Type { case "av_calibration_config": dp.SetAVCalibrationEnabled(msg.Enabled) case "av_sync_feedback": dp.HandleAVSyncFeedback(msg.TurnSeq, msg.ExcessVideoLagMS, msg.JitterBufferDeltaMS, msg.Likely) case "webrtc_ready": o.sendDirectWebRTCConfig(sessionID) if err := dp.StartNegotiation(); err != nil { log.Printf("[Orchestrator] session=%s StartNegotiation failed: %v", sessionID, err) } case "direct_media_reset_request": o.sendDirectWebRTCConfig(sessionID) if err := dp.ResetMediaPath(context.Background()); err != nil { log.Printf("[Orchestrator] session=%s ResetMediaPath failed: %v", sessionID, err) } case "webrtc_answer", "ice_candidate": var sdpMid *string if msg.SDPMid != "" { sdpMid = &msg.SDPMid } dp.HandleSignaling(msg.Type, msg.SDP, msg.Candidate, sdpMid, msg.SDPMLine) } } func (o *Orchestrator) sendDirectWebRTCConfig(sessionID string) { // Send TURN ICE server config before the SDP offer. if o.turnServer == nil { return } host := o.pipelineCfg.ICEPublicIP if host == "" { host = "127.0.0.1" } o.broadcastJSON(sessionID, map[string]any{ "type": "webrtc_config", "ice_servers": []any{o.turnServer.ICEServerConfig(host)}, }) } // SetTURNServer sets the embedded TURN server for NAT traversal. func (o *Orchestrator) SetTURNServer(ts *direct.TURNServer) { o.turnServer = ts } // SetWebRTCAPI sets the shared webrtc.API with interceptors (NACK, TWCC, GCC). func (o *Orchestrator) SetWebRTCAPI(api *webrtc.API, estimatorCh <-chan cc.BandwidthEstimator) { o.webrtcAPI = api o.estimatorCh = estimatorCh } func (o *Orchestrator) SetTaskService(taskService *agenttask.Service) { o.taskService = taskService } // StreamingMode returns the current streaming mode. func (o *Orchestrator) StreamingMode() string { return o.streamingMode } func (o *Orchestrator) AvatarEnabled() bool { if o == nil || o.pipelineCfg.AvatarEnabled == nil { return true } return *o.pipelineCfg.AvatarEnabled } func (o *Orchestrator) HealthCheck(ctx context.Context) error { if o == nil || o.inference == nil { return errors.New("inference service is not configured") } return o.inference.HealthCheck(ctx) } func (o *Orchestrator) ragService() (inference.RAGService, bool) { if o == nil || o.inference == nil { return nil, false } svc, ok := o.inference.(inference.RAGService) return svc, ok } func (o *Orchestrator) ragConfig() config.RAGConfig { if o == nil { return config.RAGConfig{} } cfg := o.pipelineCfg.RAG if cfg.TopK == 0 { cfg.TopK = 5 } if cfg.MaxContextChars == 0 { cfg.MaxContextChars = 4500 } if cfg.MinScore == 0 { cfg.MinScore = 0.25 } return cfg } func (o *Orchestrator) IndexKnowledgeSource(ctx context.Context, characterID, characterDir string, source *ragstore.Source, sourcePath string) (int, error) { svc, ok := o.ragService() if !ok { return 0, errors.New("RAG service is not configured") } if source == nil { return 0, errors.New("knowledge source is nil") } return svc.IndexRAGSource(ctx, inference.RAGIndexSourceRequest{ CharacterID: characterID, CharacterDir: characterDir, SourceID: source.ID, Title: source.Title, Filename: source.Filename, MimeType: source.MimeType, SourcePath: sourcePath, }) } func (o *Orchestrator) DeleteKnowledgeSource(ctx context.Context, characterID, characterDir, sourceID string) error { svc, ok := o.ragService() if !ok { return errors.New("RAG service is not configured") } return svc.DeleteRAGSource(ctx, characterID, characterDir, sourceID) } func (o *Orchestrator) searchKnowledge(ctx context.Context, characterID, query string) ([]inference.RAGSearchResult, error) { cfg := o.ragConfig() if !cfg.IsEnabled() || strings.TrimSpace(characterID) == "" || strings.TrimSpace(query) == "" { return nil, nil } svc, ok := o.ragService() if !ok || o.charStore == nil { return nil, nil } charDir := o.charStore.CharDir(characterID) if charDir == "" { return nil, nil } return svc.SearchRAG(ctx, inference.RAGSearchRequest{ CharacterID: characterID, CharacterDir: charDir, Query: query, TopK: cfg.TopK, MaxContextChars: cfg.MaxContextChars, MinScore: cfg.MinScore, }) } func (o *Orchestrator) SearchKnowledge(ctx context.Context, characterID, query string) ([]inference.RAGSearchResult, error) { return o.searchKnowledge(ctx, characterID, query) } func normalizedVisualInputConfig(cfg config.VisualInputConfig) config.VisualInputConfig { if cfg.FrameIntervalMS == 0 { cfg.FrameIntervalMS = 1000 } if cfg.MaxWidth == 0 { cfg.MaxWidth = 1280 } if cfg.MaxHeight == 0 { cfg.MaxHeight = 720 } if cfg.MaxFrameBytes == 0 { cfg.MaxFrameBytes = 512 * 1024 } if cfg.MaxRecentFrames == 0 { cfg.MaxRecentFrames = 2 } if cfg.FrameTTLMS == 0 { cfg.FrameTTLMS = 10000 } return cfg } func (o *Orchestrator) visualInputConfig() config.VisualInputConfig { if o == nil { return normalizedVisualInputConfig(config.VisualInputConfig{}) } return normalizedVisualInputConfig(o.pipelineCfg.VisualInput) } func qwenOmniVisualInputConfig(cfg config.VisualInputConfig) config.VisualInputConfig { cfg = normalizedVisualInputConfig(cfg) if cfg.FrameIntervalMS < 1000 { cfg.FrameIntervalMS = 1000 } if cfg.MaxFrameBytes <= 0 || cfg.MaxFrameBytes > qwenOmniMaxVisualFrameBytes { cfg.MaxFrameBytes = qwenOmniMaxVisualFrameBytes } return cfg } func (o *Orchestrator) voiceLLMProviderForSession(session *Session) string { if o.personaAgentEnabled(session) { return "persona" } return o.characterVoiceLLMProviderForSession(session) } func (o *Orchestrator) characterVoiceLLMProviderForSession(session *Session) string { if session == nil || session.Mode != ModeOmni { return "" } if session.CharacterID == "" || o == nil || o.charStore == nil { return voiceLLMProviderOrDefault("") } char, err := o.charStore.Get(session.CharacterID) if err != nil { log.Printf("voiceLLMProviderForSession: could not fetch character %s: %v", session.CharacterID, err) return voiceLLMProviderOrDefault("") } return voiceLLMProviderOrDefault(char.VoiceProvider) } func (o *Orchestrator) personaAgentEnabled(session *Session) bool { return o != nil && session != nil && session.Mode == ModeOmni } func (o *Orchestrator) sessionSupportsVisualInput(session *Session) bool { if session == nil { return false } if session.Mode == ModeStandard { return true } return session.Mode == ModeOmni && o.characterVoiceLLMProviderForSession(session) == "qwen_omni" } func (o *Orchestrator) VisualInputConfigForSession(session *Session) (config.VisualInputConfig, bool) { if !o.sessionSupportsVisualInput(session) { return config.VisualInputConfig{}, false } cfg := o.visualInputConfig() if session.Mode == ModeOmni { cfg = qwenOmniVisualInputConfig(cfg) } return cfg, true } func (o *Orchestrator) globalSystemPrompt() string { return strings.TrimSpace(standardGlobalSystemPrompt) } func validateVisualSource(source string) error { switch source { case "camera", "screen": return nil default: return fmt.Errorf("invalid visual source") } } func (o *Orchestrator) visualSession(sessionID string) (*Session, config.VisualInputConfig, error) { session, err := o.sessionMgr.Get(sessionID) if err != nil { return nil, config.VisualInputConfig{}, err } cfg, supported := o.VisualInputConfigForSession(session) if !supported { return nil, config.VisualInputConfig{}, ErrVisualInputUnsupported } if !cfg.IsEnabled() { return nil, cfg, ErrVisualInputDisabled } return session, cfg, nil } func (o *Orchestrator) HandleVisualInputStart(sessionID string, source string) error { if err := validateVisualSource(source); err != nil { return err } session, _, err := o.visualSession(sessionID) if err != nil { return err } session.StartVisualInput(source) return nil } func (o *Orchestrator) HandleVisualInputStop(sessionID string, source string) error { if source != "" { if err := validateVisualSource(source); err != nil { return err } } session, _, err := o.visualSession(sessionID) if err != nil { return err } session.StopVisualInput(source) return nil } func (o *Orchestrator) HandleVisualFrame(sessionID string, msg ws.WSMessage) error { if err := validateVisualSource(msg.Source); err != nil { return err } session, cfg, err := o.visualSession(sessionID) if err != nil { return err } if msg.Mime != "image/jpeg" { return fmt.Errorf("invalid visual frame mime") } if msg.Width <= 0 || msg.Height <= 0 || int(msg.Width) > cfg.MaxWidth || int(msg.Height) > cfg.MaxHeight { return fmt.Errorf("invalid visual frame dimensions") } encoded := strings.TrimSpace(msg.Data) if encoded == "" { return fmt.Errorf("visual frame data is required") } decoded, err := base64.StdEncoding.DecodeString(encoded) if err != nil { return fmt.Errorf("invalid visual frame data") } if len(decoded) == 0 || len(decoded) > cfg.MaxFrameBytes { return fmt.Errorf("visual frame exceeds size limit") } if len(decoded) < 3 || decoded[0] != 0xff || decoded[1] != 0xd8 || decoded[2] != 0xff { return fmt.Errorf("invalid visual frame jpeg data") } now := time.Now() frame := VisualFrame{ Data: decoded, MimeType: msg.Mime, Width: msg.Width, Height: msg.Height, Source: msg.Source, TimestampMS: msg.TimestampMS, FrameSeq: msg.FrameSeq, } minInterval := time.Duration(cfg.FrameIntervalMS) * time.Millisecond session.StoreVisualFrame(frame, cfg.MaxRecentFrames, minInterval, now) return nil } func (o *Orchestrator) CheckVoice(ctx context.Context, provider string, voiceType string) (string, error) { if o == nil || o.inference == nil { return "", errors.New("inference service is not configured") } return o.inference.CheckVoice(ctx, inference.VoiceLLMSessionConfig{ Provider: voiceLLMProviderOrDefault(provider), Voice: voiceType, }) } func (o *Orchestrator) AvatarInfo(ctx context.Context) (*pb.AvatarInfo, error) { if o == nil || o.inference == nil { return nil, errors.New("inference service is not configured") } return o.inference.AvatarInfo(ctx) } func (o *Orchestrator) idleVideoProfile() string { return character.DefaultIdleVideoProfile } func (o *Orchestrator) idleVideoOutputSize(ctx context.Context) (int, int, error) { info, err := o.AvatarInfo(ctx) if err != nil { return 0, 0, fmt.Errorf("get avatar info for idle video: %w", err) } width := int(info.GetOutputWidth()) height := int(info.GetOutputHeight()) if width <= 0 || height <= 0 { return 0, 0, fmt.Errorf("invalid idle video output size: %dx%d", width, height) } return width, height, nil } func (o *Orchestrator) activeCharacterImage(characterID string) (*character.Character, string, error) { if o == nil || o.charStore == nil { return nil, "", errors.New("character store is not configured") } char, err := o.charStore.Get(characterID) if err != nil { return nil, "", err } if char.ActiveImage == "" { return char, "", nil } return char, char.ActiveImage, nil } func normalizeImageFormat(imageFilename string) string { ext := strings.TrimPrefix(strings.ToLower(filepath.Ext(imageFilename)), ".") if ext == "" { return "png" } if ext == "jpg" { return "jpeg" } return ext } func buildDefaultAvatarPNG(width, height int) ([]byte, error) { if width <= 0 { width = 512 } if height <= 0 { height = 512 } img := image.NewRGBA(image.Rect(0, 0, width, height)) draw.Draw(img, img.Bounds(), &image.Uniform{C: color.RGBA{128, 128, 128, 255}}, image.Point{}, draw.Src) var buf bytes.Buffer if err := png.Encode(&buf, img); err != nil { return nil, err } return buf.Bytes(), nil } // AvatarSetupWarning converts avatar setup failures into a concise message // suitable for browser console diagnostics. func AvatarSetupWarning(err error) string { if err == nil { return "" } if warning, ok := AvatarImageTooLargeWarning(err); ok { return warning } return fmt.Sprintf("角色头像设置失败,已使用默认头像:%v", err) } // AvatarImageTooLargeWarning reports whether a gRPC message-size failure was // caused by an oversized avatar image. func AvatarImageTooLargeWarning(err error) (string, bool) { if err == nil { return "", false } msg := strings.ToLower(err.Error()) if status.Code(err) == codes.ResourceExhausted && strings.Contains(msg, "message larger than max") { return avatarImageMaxUploadHint, true } if strings.Contains(msg, "trying to send message larger than max") { return avatarImageMaxUploadHint, true } return "", false } func (o *Orchestrator) setDefaultAvatarLocked(ctx context.Context, sessionID string) error { if o == nil || o.inference == nil { return errors.New("inference service is not configured") } width, height := 512, 512 if info, err := o.inference.AvatarInfo(ctx); err == nil && info != nil { if int(info.GetOutputWidth()) > 0 { width = int(info.GetOutputWidth()) } if int(info.GetOutputHeight()) > 0 { height = int(info.GetOutputHeight()) } } imageData, err := buildDefaultAvatarPNG(width, height) if err != nil { return fmt.Errorf("build default avatar image: %w", err) } if err := o.inference.SetAvatar(ctx, sessionID, imageData, "png"); err != nil { return fmt.Errorf("set default avatar: %w", err) } return nil } func (o *Orchestrator) loadCharacterImage(characterID, imageFilename string) ([]byte, string, error) { if o == nil || o.charStore == nil { return nil, "", errors.New("character store is not configured") } if imageFilename == "" { return nil, "", errors.New("active image is empty") } imgDir := o.charStore.ImagesDir(characterID) if imgDir == "" { return nil, "", fmt.Errorf("character images dir not found: %s", characterID) } path := filepath.Join(imgDir, filepath.Base(imageFilename)) data, err := os.ReadFile(path) if err != nil { return nil, "", fmt.Errorf("read character image %s: %w", path, err) } return data, normalizeImageFormat(imageFilename), nil } // buildTrailingSilence creates a 1.5-second silent PCM chunk (s16le mono) // appended after TTS audio so the avatar can close its mouth before the idle switch. func buildTrailingSilence(sampleRate int) *pb.AudioChunk { if sampleRate <= 0 { sampleRate = 16000 } numSamples := sampleRate * 3 / 2 // 1.5 seconds return &pb.AudioChunk{ Data: make([]byte, numSamples*2), // s16le: 2 bytes per sample SampleRate: int32(sampleRate), Channels: 1, Format: "pcm_s16le", IsFinal: true, } } func buildIdleBreathingPCM(duration time.Duration, sampleRate int) []byte { if sampleRate <= 0 { sampleRate = 16000 } totalSamples := int(math.Round(duration.Seconds() * float64(sampleRate))) if totalSamples <= 0 { return nil } out := make([]byte, totalSamples*2) rng := rand.New(rand.NewSource(42)) fadeSamples := int(0.25 * float64(sampleRate)) for i := 0; i < totalSamples; i++ { t := float64(i) / float64(sampleRate) cyclePos := math.Mod(t, 3.8) var env float64 switch { case cyclePos < 1.1: p := cyclePos / 1.1 env = 0.010 + 0.020*math.Sin(p*math.Pi/2) case cyclePos < 1.5: env = 0.028 case cyclePos < 3.0: p := (cyclePos - 1.5) / 1.5 env = 0.030 + 0.020*math.Cos(p*math.Pi/2) default: env = 0.006 } texture := 0.55*math.Sin(2*math.Pi*170*t) + 0.25*math.Sin(2*math.Pi*310*t+0.7) + 0.20*(rng.Float64()*2-1) motion := 0.92 + 0.08*math.Sin(2*math.Pi*0.21*t+0.4) sample := env * texture * motion if fadeSamples > 0 { if i < fadeSamples { sample *= float64(i) / float64(fadeSamples) } else if remain := totalSamples - i; remain < fadeSamples { sample *= float64(remain) / float64(fadeSamples) } } if sample > 0.95 { sample = 0.95 } if sample < -0.95 { sample = -0.95 } pcm := int16(sample * 32767) binary.LittleEndian.PutUint16(out[i*2:], uint16(pcm)) } return out } func fitPCMToVideoDuration(pcm []byte, sampleRate, frames, fps int) []byte { if len(pcm) == 0 || sampleRate <= 0 || frames <= 0 || fps <= 0 { return pcm } wantSamples := desiredSamplesForVideo(frames, fps, sampleRate) if wantSamples <= 0 { return pcm } wantBytes := wantSamples * 2 if len(pcm) == wantBytes { return pcm } if len(pcm) > wantBytes { return pcm[:wantBytes] } out := make([]byte, wantBytes) copy(out, pcm) return out } func audioChunkToPCM16(chunk *pb.AudioChunk) ([]byte, int) { if chunk == nil || len(chunk.GetData()) == 0 { return nil, 0 } sampleRate := int(chunk.GetSampleRate()) format := strings.ToLower(strings.TrimSpace(chunk.GetFormat())) data := chunk.GetData() switch format { case "float32", "f32", "pcm_f32le": n := len(data) / 4 if n <= 0 { return nil, sampleRate } out := make([]byte, n*2) for i := 0; i < n; i++ { v := math.Float32frombits(binary.LittleEndian.Uint32(data[i*4:])) if math.IsNaN(float64(v)) || math.IsInf(float64(v), 0) { v = 0 } if v > 1 { v = 1 } else if v < -1 { v = -1 } var sample int16 if v >= 0 { sample = int16(v * 32767) } else { sample = int16(v * 32768) } binary.LittleEndian.PutUint16(out[i*2:], uint16(sample)) } return out, sampleRate default: if len(data)%2 != 0 { data = data[:len(data)-1] } if len(data) == 0 { return nil, sampleRate } out := make([]byte, len(data)) copy(out, data) return out, sampleRate } } func (o *Orchestrator) setAvatarFromCharacterImage(ctx context.Context, sessionID, characterID, imageFilename string) error { if o == nil || o.inference == nil { return errors.New("inference service is not configured") } imageData, format, err := o.loadCharacterImage(characterID, imageFilename) if err != nil { return err } o.avatarMu.Lock() defer o.avatarMu.Unlock() if err := o.inference.SetAvatar(ctx, sessionID, imageData, format); err != nil { if resetErr := o.setDefaultAvatarLocked(ctx, sessionID); resetErr != nil { return fmt.Errorf("set avatar from image %q (%d bytes): %w; default avatar reset failed: %v", imageFilename, len(imageData), err, resetErr) } log.Printf("SetAvatar failed for image %q (%d bytes); reset inference avatar to default placeholder", imageFilename, len(imageData)) return fmt.Errorf("set avatar from image %q (%d bytes): %w", imageFilename, len(imageData), err) } return nil } // EnsureIdleVideo generates and caches the idle MP4 for the active image if missing. func (o *Orchestrator) EnsureIdleVideo(ctx context.Context, characterID string) (string, error) { if o == nil || o.charStore == nil { return "", errors.New("character store is not configured") } if o.inference == nil { return "", errors.New("inference service is not configured") } if !o.AvatarEnabled() { return "", errors.New("avatar inference is disabled") } _, imageFilename, err := o.activeCharacterImage(characterID) if err != nil || imageFilename == "" { return "", err } profile := o.idleVideoProfile() targetWidth, targetHeight, err := o.idleVideoOutputSize(ctx) if err != nil { return "", err } sizeDir := o.charStore.IdleVideosForSizeDir(characterID, imageFilename, targetWidth, targetHeight) if sizeDir == "" { return "", fmt.Errorf("idle video dir unavailable for character %s", characterID) } files, err := o.charStore.ListIdleVideos(characterID, imageFilename, targetWidth, targetHeight) if err == nil && len(files) > 0 { return filepath.Join(sizeDir, files[0]), nil } outPath := o.charStore.IdleVideoPath(characterID, imageFilename, profile, targetWidth, targetHeight) if outPath == "" { return "", fmt.Errorf("idle video path unavailable for character %s", characterID) } imageData, format, err := o.loadCharacterImage(characterID, imageFilename) if err != nil { return "", err } const ( idleDuration = 10 * time.Second idleSampleRate = 16000 idleCRF = 23 ) pcm := buildIdleBreathingPCM(idleDuration, idleSampleRate) audioChunk := &pb.AudioChunk{ Data: pcm, SampleRate: idleSampleRate, Channels: 1, Format: "pcm_s16le", IsFinal: true, } // Hold the mutex for the entire generation cycle (SetAvatar + GenerateAvatar // + frame collection) so that a concurrent SetupSession or another // EnsureIdleVideo call cannot change the inference server's avatar state // while we are still collecting frames. o.avatarMu.Lock() defer o.avatarMu.Unlock() jobID := fmt.Sprintf("idle-%s-%d", characterID, time.Now().UnixNano()) if err := o.inference.SetAvatar(ctx, jobID, imageData, format); err != nil { if resetErr := o.setDefaultAvatarLocked(ctx, jobID); resetErr != nil { log.Printf("EnsureIdleVideo: failed to reset default avatar after SetAvatar failure for character %s image=%s: %v", characterID, imageFilename, resetErr) } return "", fmt.Errorf("set avatar for idle video from image %q (%d bytes): %w", imageFilename, len(imageData), err) } videoCh, errCh := o.inference.GenerateAvatar(ctx, []*pb.AudioChunk{audioChunk}) rgbChunks := make([][]byte, 0, 8) width, height, fps, totalFrames := 0, 0, 25, 0 loop: for { select { case chunk, ok := <-videoCh: if !ok { break loop } if chunk == nil || len(chunk.Data) == 0 { continue } if width == 0 { width = int(chunk.Width) height = int(chunk.Height) if int(chunk.Fps) > 0 { fps = int(chunk.Fps) } } totalFrames += int(chunk.NumFrames) rgbCopy := make([]byte, len(chunk.Data)) copy(rgbCopy, chunk.Data) rgbChunks = append(rgbChunks, rgbCopy) case genErr := <-errCh: if genErr != nil { // Drain videoCh so the gRPC stream can close cleanly. for range videoCh { } return "", fmt.Errorf("generate idle avatar video: %w", genErr) } } } // Drain errCh after videoCh closes in case an error arrived concurrently. select { case genErr := <-errCh: if genErr != nil { return "", fmt.Errorf("generate idle avatar video: %w", genErr) } default: } if len(rgbChunks) == 0 || width <= 0 || height <= 0 || totalFrames <= 0 { return "", errors.New("idle avatar generation produced no video frames") } if width != targetWidth || height != targetHeight { sizeDir = o.charStore.IdleVideosForSizeDir(characterID, imageFilename, width, height) if sizeDir == "" { return "", fmt.Errorf("idle video dir unavailable for character %s", characterID) } outPath = o.charStore.IdleVideoPath(characterID, imageFilename, profile, width, height) if outPath == "" { return "", fmt.Errorf("idle video path unavailable for character %s", characterID) } } pcm = fitPCMToVideoDuration(pcm, idleSampleRate, totalFrames, fps) if err := recording.EncodeRGB24ToMP4(outPath, width, height, fps, rgbChunks, pcm, idleSampleRate, idleCRF); err != nil { return "", fmt.Errorf("encode idle avatar mp4: %w", err) } return outPath, nil } // SetupSession creates a media peer (DirectPeer or LiveKit Bot) and prepares for streaming. // When roomMgr is nil (direct mode), a DirectPeer is created instead of a LiveKit Bot. func (o *Orchestrator) SetupSession(ctx context.Context, session *Session, roomMgr *livekit.RoomManager) (mediapeer.MediaPeer, []string, error) { warnings := []string{} // Best-effort: apply the character's active avatar image when realtime // avatar inference is enabled. Pure voice sessions keep cached idle videos // but do not touch the avatar model. if o.AvatarEnabled() && session != nil && session.CharacterID != "" { _, imageFilename, err := o.activeCharacterImage(session.CharacterID) if err != nil { log.Printf("SetupSession: could not resolve active image for character %s: %v", session.CharacterID, err) } else if imageFilename != "" { if err := o.setAvatarFromCharacterImage(ctx, session.ID, session.CharacterID, imageFilename); err != nil { warning := AvatarSetupWarning(err) warnings = append(warnings, warning) log.Printf("SetupSession: %s character=%s image=%s details=%v", warning, session.CharacterID, imageFilename, err) } } } var peer mediapeer.MediaPeer if o.streamingMode == "livekit" { // LiveKit SFU mode roomName := livekit.RoomName(session.ID) if err := roomMgr.CreateRoom(ctx, roomName); err != nil { return nil, warnings, err } bot := livekit.NewBot( roomMgr.URL(), roomMgr.APIKey(), roomMgr.APISecret(), roomName, ) if err := bot.Connect(ctx); err != nil { return nil, warnings, err } peer = bot } else { // Direct P2P WebRTC mode signalingFn := func(sessionID string, msg map[string]any) { o.broadcastJSON(sessionID, msg) } iceServers := make([]webrtc.ICEServer, 0, len(o.pipelineCfg.ICEServers)) for _, s := range o.pipelineCfg.ICEServers { iceServers = append(iceServers, webrtc.ICEServer{ URLs: s.URLs, Username: s.Username, Credential: s.Credential, }) } dp := direct.NewDirectPeer(session.ID, signalingFn, iceServers, o.webrtcAPI, o.estimatorCh) if err := dp.Connect(ctx); err != nil { return nil, warnings, err } peer = dp o.mu.Lock() o.directPeers[session.ID] = dp o.mu.Unlock() } // Use a detached context for the AV pipeline so it outlives the HTTP // request / setup timeout that ctx may be derived from. peer.StartAVPipeline(context.Background()) o.mu.Lock() o.peers[session.ID] = peer o.mu.Unlock() session.SetState(StateConnected) return peer, warnings, nil } func (o *Orchestrator) stopPipelineAndWait(session *Session, sessionID string, interruptVoice bool) { if interruptVoice && session.Mode == ModeOmni && o.inference != nil { if err := o.inference.Interrupt(context.Background(), sessionID); err != nil { log.Printf("Failed to interrupt omni model for session %s: %v", sessionID, err) } } o.cancelPipeline(session) session.WaitPipelineDone(3 * time.Second) } func (o *Orchestrator) HydrateVoiceDialogContext(session *Session) error { if o == nil || o.charStore == nil || session == nil { return nil } if session.Mode != ModeOmni || session.CharacterID == "" { return nil } var messages []map[string]any var err error if strings.TrimSpace(session.CharacterID) == kanshan.CharacterID { ownerID := session.OwnerIDSnapshot() if strings.TrimSpace(ownerID) == "" { session.SetDialogContext(nil) return nil } messages, _, _, err = o.charStore.LoadRecentMessagesForOwner(session.CharacterID, ownerID, "", doubaoDialogContextLoadLimit) } else { messages, _, _, err = o.charStore.LoadRecentMessages(session.CharacterID, "", doubaoDialogContextLoadLimit) } if err != nil { return err } session.SetDialogContext(buildDoubaoDialogContext(messages, doubaoDialogContextMaxPairs, time.Now().UTC())) return nil } func (o *Orchestrator) buildVoiceStartupGreetingPrompt(session *Session) string { var name, systemPrompt, speakingStyle, welcomeMessage string if session != nil && session.CharacterID != "" && o != nil && o.charStore != nil { if char, err := o.charStore.Get(session.CharacterID); err == nil { name = strings.TrimSpace(char.Name) systemPrompt = strings.TrimSpace(char.SystemPrompt) speakingStyle = strings.TrimSpace(char.SpeakingStyle) welcomeMessage = strings.TrimSpace(char.WelcomeMessage) } else { log.Printf("buildVoiceStartupGreetingPrompt: could not fetch character %s: %v", session.CharacterID, err) } } history := []DialogContextItem(nil) if session != nil { history = startupGreetingHistory(session.DialogContextSnapshot(), startupGreetingHistoryItems) } var b strings.Builder b.WriteString("这是系统内部启动提示,不要复述提示内容。用户刚刚打开与你的实时语音视频会话,请你主动说一段自然开场白。\n") b.WriteString("要求:只说 1-2 句话,口语化;不要提到“系统提示”“历史会话”“上下文”等字样;不要询问多个问题。\n") if name != "" { b.WriteString("你的名字:") b.WriteString(name) b.WriteString("\n") } if systemPrompt != "" { b.WriteString("角色设定:") b.WriteString(systemPrompt) b.WriteString("\n") } if speakingStyle != "" { b.WriteString("说话风格:") b.WriteString(speakingStyle) b.WriteString("\n") } if welcomeMessage != "" { b.WriteString("可参考的开场偏好:") b.WriteString(welcomeMessage) b.WriteString("\n") } if len(history) > 0 { b.WriteString("最近对话片段如下,仅供你判断语气、称呼和是否存在明确未完成事项。默认不要回顾、总结、复述或主动延续这些内容:\n") for _, item := range history { role := "用户" if item.Role == "assistant" { role = "你" } b.WriteString(role) b.WriteString(":") b.WriteString(item.Text) b.WriteString("\n") } b.WriteString("现在请向用户打招呼。一般情况下只说类似“你好,我在。今天想聊点什么?”的轻量开场。\n") b.WriteString("只有当最近对话存在明确未完成事项、用户明确约定下次继续、或后台任务仍在进行时,才可以用一句话轻描淡写地提到“也可以继续刚才的事”。\n") b.WriteString("不要主动提及取消、失败、争执、情绪化表达、敏感内容或具体历史细节。\n") } else { b.WriteString("当前没有可用的历史对话。请简短介绍你是谁,以及你能实时语音视频聊天、回答问题、陪伴交流;如果用户需要,也能帮忙查询、调研、整理资料或生成报告和网页。\n") } return strings.TrimSpace(b.String()) } func (o *Orchestrator) buildVoiceLLMSessionConfig(session *Session, sessionID string) inference.VoiceLLMSessionConfig { return o.buildVoiceLLMSessionConfigExcludingTurn(session, sessionID, 0) } func (o *Orchestrator) buildVoiceLLMSessionConfigExcludingTurn(session *Session, sessionID string, excludeTurnSeq uint64) inference.VoiceLLMSessionConfig { voiceConfig := inference.VoiceLLMSessionConfig{SessionID: sessionID} if session.CharacterID != "" && o.charStore != nil { if char, err := o.charStore.Get(session.CharacterID); err == nil { voiceConfig.CharacterID = session.CharacterID voiceConfig.CharacterDir = o.charStore.CharDir(session.CharacterID) voiceConfig.Provider = voiceLLMProviderOrDefault(char.VoiceProvider) if o.personaAgentEnabled(session) { voiceConfig.Provider = "persona" } voiceConfig.SystemPrompt = char.SystemPrompt voiceConfig.Voice = char.VoiceType voiceConfig.BotName = char.Name voiceConfig.SpeakingStyle = char.SpeakingStyle } else { log.Printf("buildVoiceLLMSessionConfig: could not fetch character %s: %v", session.CharacterID, err) } } if o.personaAgentEnabled(session) { voiceConfig.Provider = "persona" } for _, item := range buildVoiceDialogContextFromSession(session, excludeTurnSeq, doubaoDialogContextMaxPairs, time.Now().UTC()) { voiceConfig.DialogContext = append(voiceConfig.DialogContext, inference.VoiceLLMDialogContextItem{ Role: item.Role, Text: item.Text, Timestamp: item.Timestamp, }) } return voiceConfig } func (o *Orchestrator) buildVoiceStartupGreetingSessionConfig(session *Session, sessionID string) inference.VoiceLLMSessionConfig { voiceConfig := o.buildVoiceLLMSessionConfig(session, sessionID) voiceConfig.Provider = o.characterVoiceLLMProviderForSession(session) voiceConfig.WelcomeMessage = "" return voiceConfig } func voiceLLMProviderOrDefault(provider string) string { provider = strings.ToLower(strings.TrimSpace(provider)) switch provider { case "persona": return "persona" case "qwen_omni": return "qwen_omni" default: return "doubao" } } func modeStringForLog(mode PipelineMode) string { if mode == ModeStandard { return "standard" } return "omni" } func (o *Orchestrator) standardComponentDefaults() character.Components { defaults := character.Components{LLM: "qwen", ASR: "qwen", TTS: "qwen"} if o.pipelineCfg.DefaultLLM != "" { defaults.LLM = o.pipelineCfg.DefaultLLM } if o.pipelineCfg.DefaultASR != "" { defaults.ASR = o.pipelineCfg.DefaultASR } if o.pipelineCfg.DefaultTTS != "" { defaults.TTS = o.pipelineCfg.DefaultTTS } return defaults } func (o *Orchestrator) standardCharacterConfig(session *Session) (character.Components, string, string, string) { components := character.NormalizeComponents(character.Components{}, o.standardComponentDefaults()) voice := "" speakingStyle := "" language := "" if session.CharacterID != "" && o.charStore != nil { if char, err := o.charStore.Get(session.CharacterID); err == nil { components = character.NormalizeComponents(char.Components, components) voice = strings.TrimSpace(char.VoiceType) speakingStyle = strings.TrimSpace(char.SpeakingStyle) } else { log.Printf("standardCharacterConfig: could not fetch character %s: %v", session.CharacterID, err) } } if voice == "" && components.TTS == "qwen" { voice = "Momo" } return components, voice, speakingStyle, language } func (o *Orchestrator) standardSystemPrompt(session *Session) string { if session.CharacterID == "" || o.charStore == nil { return composeSystemPrompt(o.globalSystemPrompt(), "") } char, err := o.charStore.Get(session.CharacterID) if err != nil { log.Printf("standardSystemPrompt: could not fetch character %s: %v", session.CharacterID, err) return composeSystemPrompt(o.globalSystemPrompt(), "") } return composeSystemPrompt(o.globalSystemPrompt(), characterSystemPrompt(char, true, true)) } func appendRAGContext(rolePrompt string, ragContext string) string { ragContext = strings.TrimSpace(ragContext) if ragContext == "" { return rolePrompt } rolePrompt = strings.TrimSpace(rolePrompt) if rolePrompt == "" { return ragContext } return rolePrompt + "\n\n" + ragContext } func formatRAGContext(results []inference.RAGSearchResult) string { if len(results) == 0 { return "" } var b strings.Builder b.WriteString("【角色素材检索结果】\n") b.WriteString("以下内容来自该角色导入的知识、文档或人物生平素材。只在与用户问题相关时使用;不要编造素材中没有的事实。\n") for i, item := range results { content := strings.TrimSpace(item.Content) if content == "" { continue } title := strings.TrimSpace(item.Title) if title == "" { title = strings.TrimSpace(item.Filename) } if title == "" { title = "未命名素材" } b.WriteString(fmt.Sprintf("[%d] %s\n%s\n", i+1, title, content)) } return strings.TrimSpace(b.String()) } func (o *Orchestrator) standardSystemPromptWithRAG(session *Session, ragContext string) string { if session.CharacterID == "" || o.charStore == nil { return composeSystemPrompt(o.globalSystemPrompt(), appendRAGContext("", ragContext)) } char, err := o.charStore.Get(session.CharacterID) if err != nil { log.Printf("standardSystemPrompt: could not fetch character %s: %v", session.CharacterID, err) return composeSystemPrompt(o.globalSystemPrompt(), appendRAGContext("", ragContext)) } return composeSystemPrompt(o.globalSystemPrompt(), appendRAGContext(characterSystemPrompt(char, true, true), ragContext)) } func latestUserText(history []ChatMessage) string { for i := len(history) - 1; i >= 0; i-- { if history[i].Role == "user" { return strings.TrimSpace(history[i].Content) } } return "" } func wrapVoiceAudioInput(ctx context.Context, audioCh <-chan []byte) <-chan inference.VoiceLLMInputEvent { inputCh := make(chan inference.VoiceLLMInputEvent, 64) go func() { defer close(inputCh) for { select { case <-ctx.Done(): return case data, ok := <-audioCh: if !ok { return } if len(data) == 0 { continue } select { case inputCh <- inference.VoiceLLMInputEvent{Audio: data}: case <-ctx.Done(): return } } } }() return inputCh } func wrapVoiceMultimodalInput( ctx context.Context, audioCh <-chan []byte, frameCh <-chan VisualFrame, unsubscribe func(), initialFrames []VisualFrame, ) <-chan inference.VoiceLLMInputEvent { inputCh := make(chan inference.VoiceLLMInputEvent, 64) go func() { defer close(inputCh) if unsubscribe != nil { defer unsubscribe() } for _, frame := range initialFrames { select { case inputCh <- inference.VoiceLLMInputEvent{ Image: &inference.ImageFrame{ Data: frame.Data, MimeType: frame.MimeType, Width: frame.Width, Height: frame.Height, Source: frame.Source, TimestampMS: frame.TimestampMS, FrameSeq: frame.FrameSeq, }, }: case <-ctx.Done(): return } } for { select { case <-ctx.Done(): return case data, ok := <-audioCh: if !ok { return } if len(data) == 0 { continue } select { case inputCh <- inference.VoiceLLMInputEvent{Audio: data}: case <-ctx.Done(): return } case frame, ok := <-frameCh: if !ok { frameCh = nil continue } select { case inputCh <- inference.VoiceLLMInputEvent{ Image: &inference.ImageFrame{ Data: frame.Data, MimeType: frame.MimeType, Width: frame.Width, Height: frame.Height, Source: frame.Source, TimestampMS: frame.TimestampMS, FrameSeq: frame.FrameSeq, }, }: case <-ctx.Done(): return } } } }() return inputCh } func singleVoiceTextInput(text string) <-chan inference.VoiceLLMInputEvent { inputCh := make(chan inference.VoiceLLMInputEvent, 1) inputCh <- inference.VoiceLLMInputEvent{Text: text} close(inputCh) return inputCh } func drainUserAudio(audioCh <-chan []byte, maxDrain int) { for i := 0; i < maxDrain; i++ { select { case <-audioCh: default: return } } } func (o *Orchestrator) resumeVoiceAudioStream(sessionID string) error { session, err := o.sessionMgr.Get(sessionID) if err != nil { return err } if session.Mode != ModeOmni { return nil } o.mu.RLock() peer := o.peers[sessionID] o.mu.RUnlock() if peer == nil { return errors.New("media peer not found") } audioCh := peer.SubscribeUserAudio() drainUserAudio(audioCh, 256) return o.HandleAudioStream(context.Background(), sessionID, audioCh) } func (o *Orchestrator) handleStandardTextInput(ctx context.Context, session *Session, sessionID string, text string) error { o.stopPipelineAndWait(session, sessionID, false) pipeCtx, cancel := context.WithCancel(ctx) session.mu.Lock() session.PipelineCancel = cancel session.mu.Unlock() turnSeq := session.MarkTurnStarted() o.advancePlaybackEpoch(sessionID, turnSeq) session.AddMessage(ChatMessage{Role: "user", Content: text, TurnSeq: turnSeq}) pipelineSeq := session.MarkPipelineRunning() go o.runStandardPipeline(pipeCtx, session, sessionID, pipelineSeq, turnSeq) return nil } func (o *Orchestrator) handleVoiceLLMTextInput(ctx context.Context, session *Session, sessionID string, text string) error { turnSeq := session.MarkTurnStarted() o.advancePlaybackEpoch(sessionID, turnSeq) if o.inference != nil { if err := o.inference.Interrupt(context.Background(), sessionID); err != nil { log.Printf("Failed to interrupt omni model for session %s: %v", sessionID, err) } } o.cancelPipeline(session) pipeCtx, cancel := context.WithCancel(ctx) session.mu.Lock() session.PipelineCancel = cancel session.mu.Unlock() session.AddMessage(ChatMessage{Role: "user", Content: text, TurnSeq: turnSeq}) session.SetState(StateProcessing) o.broadcastStatusTurn(sessionID, "processing", turnSeq) pipelineSeq := session.MarkPipelineRunning() inputCh := singleVoiceTextInput(text) go func(seq uint64) { o.runVoiceLLMPipeline(pipeCtx, session, sessionID, inputCh, seq, turnSeq) if pipeCtx.Err() != nil || !session.IsCurrentPipeline(seq) { return } if err := o.resumeVoiceAudioStream(sessionID); err != nil { log.Printf("Failed to resume omni audio stream for session %s: %v", sessionID, err) } }(pipelineSeq) return nil } func (o *Orchestrator) HandleTaskEvent(task *agenttask.Task, event *agenttask.Event) { if o == nil || task == nil || event == nil { return } if event.Status == agenttask.StatusCompleted { msg := strings.TrimSpace(event.Message) if msg == "" { msg = "任务已经完成,我把整理好的资料放在聊天里了。" } else { msg = "任务已经完成。" + msg } _ = o.SpeakAssistantText(context.Background(), task.SessionID, msg, true) return } if event.Status == agenttask.StatusFailed { msg := strings.TrimSpace(event.Message) if msg == "" { msg = "后台任务失败了,我暂时拿不到结果。" } else { msg = "后台任务失败了:" + msg } _ = o.SpeakAssistantText(context.Background(), task.SessionID, msg, true) } } func taskEventBroadcastPayload(task *agenttask.Task, event *agenttask.Event) map[string]any { payload := map[string]any{ "type": "task_event", "task_id": task.ID, "session_id": task.SessionID, "seq": event.Seq, "event_type": event.EventType, "status": event.Status, "message": event.Message, "progress": event.Progress, "created_at": event.CreatedAt, "task": task, } if strings.TrimSpace(string(event.Payload)) != "" { var decoded any if err := json.Unmarshal(event.Payload, &decoded); err == nil && decoded != nil { payload["payload"] = decoded } } return payload } func fallbackPersonaTaskEventPayload(sessionID string, payload map[string]any) map[string]any { if payload == nil { payload = map[string]any{} } payload["type"] = "task_event" if stringValue(payload["session_id"]) == "" { payload["session_id"] = sessionID } return payload } func sanitizePersonaArtifactPayload(payload map[string]any) map[string]any { sanitized := make(map[string]any, len(payload)) for key, value := range payload { if key == "content" { continue } sanitized[key] = value } return sanitized } func (o *Orchestrator) persistPersonaArtifactEvent(ctx context.Context, store *agenttask.Store, taskID string, eventPayload map[string]any) map[string]any { if eventPayload == nil { return nil } sanitized := sanitizePersonaArtifactPayload(eventPayload) content := stringValue(eventPayload["content"]) if content == "" { return sanitized } artifactID := stringValue(eventPayload["artifact_id"]) if artifactID == "" { artifactID = stringValue(eventPayload["id"]) } if artifactID != "" { if artifact, _, err := store.GetArtifact(ctx, taskID, artifactID); err == nil { sanitized["artifact_id"] = artifact.ID sanitized["title"] = artifact.Title sanitized["type"] = artifact.Type sanitized["mime_type"] = artifact.MimeType return sanitized } else if !errors.Is(err, agenttask.ErrNotFound) { log.Printf("persona task artifact lookup failed task=%s artifact=%s: %v", taskID, artifactID, err) } } artifactType := stringValue(eventPayload["type"]) if artifactType == "" { artifactType = "html" } mimeType := stringValue(eventPayload["mime_type"]) if mimeType == "" && strings.Contains(strings.ToLower(artifactType), "html") { mimeType = "text/html; charset=utf-8" } title := stringValue(eventPayload["title"]) if title == "" { title = "任务产物" } metadata, _ := json.Marshal(mapValue(eventPayload["metadata"])) artifact, err := store.CreateArtifact(ctx, taskID, agenttask.CreateArtifactInput{ ID: artifactID, Type: artifactType, Title: title, MimeType: mimeType, Content: content, Metadata: metadata, }) if err != nil { log.Printf("persona task artifact persist failed task=%s artifact=%s: %v", taskID, artifactID, err) return sanitized } sanitized["artifact_id"] = artifact.ID sanitized["title"] = artifact.Title sanitized["type"] = artifact.Type sanitized["mime_type"] = artifact.MimeType return sanitized } func (o *Orchestrator) persistPersonaTaskEvent(ctx context.Context, session *Session, sessionID string, payload map[string]any) map[string]any { fallback := fallbackPersonaTaskEventPayload(sessionID, payload) if o == nil || o.taskService == nil || o.taskService.Store() == nil { return fallback } taskPayload := mapValue(payload["task"]) taskID := stringValue(payload["task_id"]) if taskID == "" { taskID = stringValue(taskPayload["id"]) } if taskID == "" { return fallback } store := o.taskService.Store() ownerID := "" if session != nil && strings.TrimSpace(session.CharacterID) == kanshan.CharacterID { ownerID = session.OwnerIDSnapshot() } task, err := store.GetTask(ctx, taskID) if errors.Is(err, agenttask.ErrNotFound) { taskSessionID := stringValue(payload["session_id"]) if taskSessionID == "" { taskSessionID = stringValue(taskPayload["session_id"]) } if taskSessionID == "" { taskSessionID = sessionID } characterID := stringValue(taskPayload["character_id"]) if characterID == "" && session != nil { characterID = session.CharacterID } title := stringValue(taskPayload["title"]) userRequest := stringValue(taskPayload["user_request"]) if userRequest == "" { userRequest = title } if userRequest == "" { userRequest = stringValue(payload["message"]) } if userRequest == "" { userRequest = "后台任务" } kind := stringValue(taskPayload["kind"]) if kind == "" { kind = "research" } task, err = store.CreateTask(ctx, agenttask.CreateTaskInput{ ID: taskID, SessionID: taskSessionID, CharacterID: characterID, OwnerID: ownerID, Kind: kind, Title: title, UserRequest: userRequest, }) } if err != nil { log.Printf("persona task persist failed task=%s: %v", taskID, err) return fallback } if ownerID != "" && task.OwnerID != "" && task.OwnerID != ownerID { log.Printf("persona task owner mismatch task=%s session=%s", taskID, sessionID) return fallback } eventType := stringValue(payload["event_type"]) eventPayload := mapValue(payload["payload"]) if eventType == "artifact.created" { eventPayload = o.persistPersonaArtifactEvent(ctx, store, task.ID, eventPayload) } var rawPayload json.RawMessage if len(eventPayload) > 0 { if raw, err := json.Marshal(eventPayload); err == nil { rawPayload = raw } } event, updated, err := store.AppendEvent(ctx, task.ID, agenttask.AppendEventInput{ EventType: eventType, Status: taskStatusValue(payload["status"], task.Status), Message: stringValue(payload["message"]), Progress: intValue(payload["progress"], intValue(taskPayload["progress"], task.Progress)), Payload: rawPayload, }) if err != nil { log.Printf("persona task event persist failed task=%s event=%s: %v", task.ID, eventType, err) return fallback } return taskEventBroadcastPayload(updated, event) } // HandleTextInput processes a text message through either the standard // LLM→TTS→Avatar pipeline or the omni text-query path. func (o *Orchestrator) HandleTextInput(ctx context.Context, sessionID string, text string) error { session, err := o.sessionMgr.Get(sessionID) if err != nil { return err } text = strings.TrimSpace(text) if text == "" { return nil } if session.Mode == ModeOmni { return o.handleVoiceLLMTextInput(ctx, session, sessionID, text) } return o.handleStandardTextInput(ctx, session, sessionID, text) } // runStandardPipeline executes: LLM → sentence detection → TTS → Avatar. func (o *Orchestrator) runStandardPipeline(ctx context.Context, session *Session, sessionID string, pipelineSeq uint64, turnSeq uint64) { var fullResponseCh chan string // set below; read in defer to store assistant message recSessionDir := "" recTurnID := "turn" + strconv.FormatUint(turnSeq, 10) var recMu sync.Mutex var recAudioBuf []byte var recAudioSR int var turnRec *recording.TurnRecording var recFinished bool if o.recorder != nil { recSessionDir = o.sessionRecordingDir(session) } appendRecAudio := func(pcm []byte, sampleRate int) { if o.recorder == nil || len(pcm) == 0 || sampleRate <= 0 { return } pcmCopy := append([]byte(nil), pcm...) recMu.Lock() if recAudioSR == 0 { recAudioSR = sampleRate } recAudioBuf = append(recAudioBuf, pcmCopy...) activeRec := turnRec if activeRec != nil && !recFinished { activeRec.WriteAudioChunk(pcmCopy, sampleRate) } recMu.Unlock() } beginRec := func(width, height, fps int) { if o.recorder == nil || width <= 0 || height <= 0 || fps <= 0 { return } recMu.Lock() if turnRec != nil || recFinished { recMu.Unlock() return } activeRec := o.recorder.BeginTurn(recSessionDir, recTurnID, width, height, fps) turnRec = activeRec audioCopy := append([]byte(nil), recAudioBuf...) audioSR := recAudioSR if activeRec != nil && len(audioCopy) > 0 && audioSR > 0 { activeRec.WriteAudioChunk(audioCopy, audioSR) } recMu.Unlock() } writeRecVideo := func(rgb []byte) { if o.recorder == nil || len(rgb) == 0 { return } recMu.Lock() activeRec := turnRec if activeRec != nil && !recFinished { activeRec.WriteVideoChunk(rgb) } recMu.Unlock() } finishRec := func() { if o.recorder == nil { return } recMu.Lock() if recFinished { recMu.Unlock() return } activeRec := turnRec turnRec = nil recFinished = true recMu.Unlock() if activeRec != nil { _ = activeRec.Finish() } } defer func() { // Store assistant message in session history assistantResp := "" if fullResponseCh != nil { if resp, ok := <-fullResponseCh; ok && resp != "" { assistantResp = resp session.AddMessage(ChatMessage{Role: "assistant", Content: resp, TurnSeq: turnSeq}) if _, err := o.persistSessionConversation(session); err != nil { log.Printf("conversation: SaveConversation error session=%s: %v", sessionID, err) } } } if o.recorder != nil { recMu.Lock() audioCopy := append([]byte(nil), recAudioBuf...) audioSR := recAudioSR recMu.Unlock() if len(audioCopy) > 0 && audioSR > 0 { if err := o.recorder.SaveRawAudio(recSessionDir, recTurnID, audioCopy, audioSR); err != nil { log.Printf("recording: SaveRawAudio error session=%s turn=%s: %v", sessionID, recTurnID, err) } } if strings.TrimSpace(assistantResp) != "" { if err := o.recorder.SaveTranscript(recSessionDir, recTurnID, assistantResp); err != nil { log.Printf("recording: SaveTranscript error session=%s turn=%s: %v", sessionID, recTurnID, err) } } } session.MarkPipelineFinished(pipelineSeq) session.SetState(StateListening) o.broadcastStatus(sessionID, "idle") }() session.SetState(StateProcessing) o.broadcastStatus(sessionID, "processing") pipelineStart := time.Now() components, voice, speakingStyle, language := o.standardCharacterConfig(session) // Prepare LLM messages history := session.HistorySnapshot() messages := make([]inference.ChatMessage, 0, len(history)+1) ragContext := "" if query := latestUserText(history); query != "" { results, err := o.searchKnowledge(ctx, session.CharacterID, query) if err != nil { log.Printf("RAG search failed for session %s character %s: %v", sessionID, session.CharacterID, err) } else { ragContext = formatRAGContext(results) } } if systemPrompt := o.standardSystemPromptWithRAG(session, ragContext); systemPrompt != "" { messages = append(messages, inference.ChatMessage{Role: "system", Content: systemPrompt}) } for _, m := range history { messages = append(messages, inference.ChatMessage{Role: m.Role, Content: m.Content}) } visualCfg := o.visualInputConfig() visualFrames := session.LatestVisualFrames(time.Now(), time.Duration(visualCfg.FrameTTLMS)*time.Millisecond) if len(visualFrames) > 0 { images := make([]inference.ImageFrame, 0, len(visualFrames)) for _, frame := range visualFrames { images = append(images, inference.ImageFrame{ Data: frame.Data, MimeType: frame.MimeType, Width: frame.Width, Height: frame.Height, Source: frame.Source, TimestampMS: frame.TimestampMS, FrameSeq: frame.FrameSeq, }) } for i := len(messages) - 1; i >= 0; i-- { if messages[i].Role == "user" { messages[i].Images = images break } } } // 1. Start LLM stream llmCh, llmErrCh := o.inference.GenerateLLMStream(ctx, sessionID, messages, inference.LLMConfig{ Temperature: 0.7, Provider: components.LLM, }) // 2. Collect the full LLM response, then feed it to TTS once. textCh := make(chan string, 1) fullResponseCh = make(chan string, 1) // captures full LLM response for history go func() { defer close(textCh) var tokenBuffer strings.Builder var fullResponse string finalText := func() string { if text := strings.TrimSpace(fullResponse); text != "" { return text } return strings.TrimSpace(tokenBuffer.String()) } finish := func(sendTTS bool) { text := finalText() if sendTTS && text != "" { select { case textCh <- text: case <-ctx.Done(): } } if text != "" { fullResponseCh <- text } close(fullResponseCh) } for { select { case <-ctx.Done(): finish(false) return case chunk, ok := <-llmCh: if !ok { finish(true) return } // Broadcast LLM token to WebSocket o.broadcastJSON(sessionID, map[string]any{ "type": "llm_token", "token": chunk.Token, "accumulated": chunk.AccumulatedText, "is_final": chunk.IsFinal, "turn_seq": turnSeq, }) tokenBuffer.WriteString(chunk.Token) // Track the latest accumulated text. A normally closed errCh // can race with the final chunk, so keep this current even // before the explicit final marker arrives. if chunk.AccumulatedText != "" { fullResponse = chunk.AccumulatedText } case err, ok := <-llmErrCh: if !ok { llmErrCh = nil continue } if err != nil { log.Printf("LLM stream error for session %s: %v", sessionID, err) o.broadcastError(sessionID, "LLM generation failed") } if err != nil { finish(true) return } } } }() // 3. Start TTS stream ttsAudioCh, ttsErrCh := o.inference.SynthesizeSpeechStream(ctx, textCh, inference.TTSConfig{ Provider: components.TTS, Voice: voice, SpeakingStyle: speakingStyle, Language: language, SessionID: sessionID, }) lookupStdPeer := func() mediapeer.MediaPeer { o.mu.RLock() defer o.mu.RUnlock() return o.peers[sessionID] } if !o.AvatarEnabled() { speakingBroadcasted := false for ttsAudioCh != nil || ttsErrCh != nil { select { case <-ctx.Done(): return case chunk, ok := <-ttsAudioCh: if !ok { ttsAudioCh = nil continue } pcm, sampleRate := audioChunkToPCM16(chunk) if len(pcm) == 0 { continue } appendRecAudio(pcm, sampleRate) if !speakingBroadcasted { speakingBroadcasted = true session.SetState(StateSpeaking) o.broadcastStatus(sessionID, "speaking") } if peer := lookupStdPeer(); peer != nil { if err := peer.PublishAudioFrame(pcm, sampleRate); err != nil { log.Printf("std audio-only publish failed session=%s: %v", sessionID, err) } } case err, ok := <-ttsErrCh: if !ok { ttsErrCh = nil continue } if err == nil { continue } log.Printf("TTS stream error for session %s: %v", sessionID, err) o.broadcastError(sessionID, "Speech synthesis failed") return } } return } // 4. Start Avatar stream stdSyncBuf := newVoiceAVSyncBuffer(voiceMaxPCMBufferSamples) avatarAudioCh := make(chan *pb.AudioChunk, 8) go func() { defer close(avatarAudioCh) for { select { case <-ctx.Done(): return case chunk, ok := <-ttsAudioCh: if !ok { return } // Avatar decodes AudioChunk.Format itself; browser audio and // recordings need signed 16-bit PCM. pcm, pcmSampleRate := audioChunkToPCM16(chunk) if len(pcm) > 0 { appendRecAudio(pcm, pcmSampleRate) if dropped := stdSyncBuf.appendPCM(pcm, pcmSampleRate); dropped > 0 { bufferedSamples, _, _, _ := stdSyncBuf.snapshot() log.Printf("std sync buffer overflow for session %s: dropped=%d bytes, buffered_samples=%d", sessionID, dropped, bufferedSamples) } } // Forward original audio to avatar. Browser audio is published // later as part of the same paced AV segment as the video. select { case avatarAudioCh <- chunk: case <-ctx.Done(): return } case err, ok := <-ttsErrCh: if !ok { ttsErrCh = nil continue } if err == nil { continue } log.Printf("TTS stream error for session %s: %v", sessionID, err) o.broadcastError(sessionID, "Speech synthesis failed") return } } }() // Delay speaking status until first video frame arrives (avoids frozen-frame stall on frontend). speakingBroadcasted := false // Serialize with concurrent avatar operations (see runVoiceLLMPipeline comment). o.avatarMu.Lock() videoCh, videoErrCh := o.inference.GenerateAvatarStream(ctx, avatarAudioCh) // 5. Publish paced AV segments. The standard/Qwen chain receives audio // before video, so PCM is buffered and sliced to match each video segment. var ( segVideo []byte segFrames int segWidth int segHeight int segFPS int segCount int segSeq int64 segMediaStartMS int64 firstFrameSent bool ) flushStdSeg := func(isFinalSeg bool) { if segCount == 0 { return } segSeq++ segDurationMS := durationMSForVideo(segFrames, segFPS) if peer := lookupStdPeer(); peer != nil { segPCM, outSamples, wantSamples, bufferedSamplesAfterTake := stdSyncBuf.takeSegmentPCM(segFrames, segFPS, isFinalSeg) _, _, _, sampleRate := stdSyncBuf.snapshot() if sampleRate <= 0 { sampleRate = 16000 } if segSeq == 1 && outSamples < wantSamples { bufferedSamples, _, _, _ := stdSyncBuf.snapshot() log.Printf("std av drift for session %s: out_samples=%d want_samples=%d frames=%d fps=%d buffered_samples=%d", sessionID, outSamples, wantSamples, segFrames, segFPS, bufferedSamples) } if isFinalSeg && bufferedSamplesAfterTake > 0 { log.Printf("std av strict trim tail session=%s: trimmed_samples=%d", sessionID, bufferedSamplesAfterTake) } raw := &mediapeer.RawAVSegment{ TraceLabel: voiceTraceLabel(sessionID, turnSeq, "standard", "", segSeq), Epoch: turnSeq, SegmentSeq: segSeq, MediaStartMS: segMediaStartMS, DurationMS: segDurationMS, RGB: segVideo, PCM: segPCM, SampleRate: sampleRate, Width: segWidth, Height: segHeight, FPS: segFPS, NumFrames: segFrames, } if err := peer.SendAVSegment(raw); err != nil { log.Printf("std av SendAVSegment failed session=%s: %v", sessionID, err) } } segMediaStartMS += segDurationMS segVideo = nil segFrames = 0 segCount = 0 } for { select { case <-ctx.Done(): flushStdSeg(false) finishRec() o.avatarMu.Unlock() return case chunk, ok := <-videoCh: if !ok { flushStdSeg(true) finishRec() if err := <-videoErrCh; err != nil { log.Printf("Avatar stream error for session %s: %v", sessionID, err) o.broadcastError(sessionID, "Avatar generation failed") } if ctx.Err() == nil { if peer := lookupStdPeer(); peer != nil { peer.WaitAVDrain(10 * time.Second) } } o.avatarMu.Unlock() return } nf := int(chunk.GetNumFrames()) if nf <= 0 && int(chunk.GetWidth())*int(chunk.GetHeight())*3 > 0 { nf = len(chunk.GetData()) / (int(chunk.GetWidth()) * int(chunk.GetHeight()) * 3) } fps := int(chunk.GetFps()) if fps <= 0 { fps = 25 } if !firstFrameSent { firstFrameSent = true log.Printf("TTFF std pipeline session=%s first_video_chunk=%.3fs", sessionID, time.Since(pipelineStart).Seconds()) if !speakingBroadcasted { speakingBroadcasted = true session.SetState(StateSpeaking) o.broadcastStatus(sessionID, "speaking") } } beginRec(int(chunk.GetWidth()), int(chunk.GetHeight()), fps) writeRecVideo(chunk.GetData()) segVideo = append(segVideo, chunk.GetData()...) segFrames += nf segWidth = int(chunk.GetWidth()) segHeight = int(chunk.GetHeight()) segFPS = fps segCount++ if segCount >= stdChunksPerSegment || chunk.GetIsFinal() { flushStdSeg(chunk.GetIsFinal()) } } } } // HandleAudioStream processes incoming user audio through the session's pipeline. func (o *Orchestrator) HandleAudioStream(ctx context.Context, sessionID string, audioCh <-chan []byte) error { session, err := o.sessionMgr.Get(sessionID) if err != nil { return err } if session.Mode == ModeStandard { go o.runStandardASRLoop(ctx, session, sessionID, audioCh) return nil } pipeCtx, cancel := context.WithCancel(ctx) session.mu.Lock() session.PipelineCancel = cancel session.mu.Unlock() pipelineSeq := session.MarkPipelineRunning() var inputCh <-chan inference.VoiceLLMInputEvent if visualCfg, ok := o.VisualInputConfigForSession(session); ok { frameCh, unsubscribe := session.SubscribeVisualFrames(2) initialFrames := session.LatestVisualFrames(time.Now(), time.Duration(visualCfg.FrameTTLMS)*time.Millisecond) inputCh = wrapVoiceMultimodalInput(pipeCtx, audioCh, frameCh, unsubscribe, initialFrames) } else { inputCh = wrapVoiceAudioInput(pipeCtx, audioCh) } go o.runVoiceLLMPipeline(pipeCtx, session, sessionID, inputCh, pipelineSeq, 0) return nil } func (o *Orchestrator) runStandardASRLoop(ctx context.Context, session *Session, sessionID string, audioCh <-chan []byte) { components, _, _, language := o.standardCharacterConfig(session) transcriptCh, errCh := o.inference.TranscribeStream(ctx, audioCh, inference.ASRConfig{ Provider: components.ASR, Language: language, SessionID: sessionID, }) session.SetState(StateListening) o.broadcastStatus(sessionID, "idle") for { select { case <-ctx.Done(): return case event, ok := <-transcriptCh: if !ok { select { case err, ok := <-errCh: if ok && err != nil && ctx.Err() == nil { log.Printf("ASR stream error for session %s: %v", sessionID, err) o.broadcastError(sessionID, "Speech recognition failed") } default: } return } text := strings.TrimSpace(event.GetText()) if text == "" { continue } if !event.GetIsFinal() { o.broadcastJSON(sessionID, map[string]any{ "type": "transcript", "text": text, "is_final": false, "speaker": "user", }) continue } o.stopPipelineAndWait(session, sessionID, false) turnSeq := session.MarkTurnStarted() o.advancePlaybackEpoch(sessionID, turnSeq) session.AddMessage(ChatMessage{Role: "user", Content: text, TurnSeq: turnSeq}) o.broadcastJSON(sessionID, map[string]any{ "type": "transcript", "text": text, "is_final": true, "speaker": "user", "turn_seq": turnSeq, }) pipeCtx, cancel := context.WithCancel(context.Background()) session.mu.Lock() session.PipelineCancel = cancel session.mu.Unlock() pipelineSeq := session.MarkPipelineRunning() go o.runStandardPipeline(pipeCtx, session, sessionID, pipelineSeq, turnSeq) case err, ok := <-errCh: if !ok { errCh = nil continue } if err == nil { continue } if ctx.Err() == nil { log.Printf("ASR stream error for session %s: %v", sessionID, err) o.broadcastError(sessionID, "Speech recognition failed") } return } } } // runVoiceLLMPipeline executes an omni turn source -> omni model -> Avatar (video). func (o *Orchestrator) runVoiceLLMPipeline(ctx context.Context, session *Session, sessionID string, inputCh <-chan inference.VoiceLLMInputEvent, pipelineSeq uint64, initialTurnSeq uint64) { o.runVoiceLLMPipelineWithConfig( ctx, session, sessionID, inputCh, pipelineSeq, initialTurnSeq, o.buildVoiceLLMSessionConfigExcludingTurn(session, sessionID, initialTurnSeq), false, ) } func (o *Orchestrator) runVoiceLLMPipelineWithConfig( ctx context.Context, session *Session, sessionID string, inputCh <-chan inference.VoiceLLMInputEvent, pipelineSeq uint64, initialTurnSeq uint64, voiceConfig inference.VoiceLLMSessionConfig, suppressUserTranscript bool, ) { sessionDir := "" if o.recorder != nil { sessionDir = o.sessionRecordingDir(session) } ctx, cancel := context.WithCancel(ctx) defer cancel() outputCh, errCh := o.inference.ConverseStream(ctx, inputCh, voiceConfig) pendingTurnSeq := initialTurnSeq pendingTurnAssistantReady := initialTurnSeq > 0 ignoredTurns := make(map[string]*voicePipelineTurn) var currentTurn *voicePipelineTurn var currentTurnDone <-chan voicePipelineTurnResult var streamErr error var pendingQuestionID string var pendingReplyID string var pendingUserFinalAt time.Time var pendingAssistantMustMatchKey bool lookupPeer := func() mediapeer.MediaPeer { o.mu.RLock() defer o.mu.RUnlock() return o.peers[sessionID] } reservePendingTurn := func() uint64 { if pendingTurnSeq == 0 { pendingTurnSeq = session.MarkTurnStarted() o.advancePlaybackEpoch(sessionID, pendingTurnSeq) } return pendingTurnSeq } broadcastProcessing := func(turnSeq uint64) { if turnSeq == 0 || !session.IsCurrentPipeline(pipelineSeq) { return } session.SetState(StateProcessing) o.broadcastStatusTurn(sessionID, "processing", turnSeq) } abortTurn := func(turn *voicePipelineTurn, ignoreKey bool) { if turn == nil { return } turn.aborted = true if ignoreKey && turn.key != "" { ignoredTurns[turn.key] = turn } if turn.avatarCancel != nil { turn.avatarCancel() } } saveAssistantMessage := func(turn *voicePipelineTurn) { if turn == nil || turn.historySaved || strings.TrimSpace(turn.assistantText) == "" { return } session.AddMessage(ChatMessage{Role: "assistant", Content: turn.assistantText, TurnSeq: turn.seq}) turn.historySaved = true } saveTurnTranscript := func(turn *voicePipelineTurn) { if turn == nil || turn.transcriptSaved || strings.TrimSpace(turn.assistantText) == "" { return } if o.recorder == nil || turn.recTurnID == "" { return } if err := o.recorder.SaveTranscript(turn.sessionDir, turn.recTurnID, turn.assistantText); err != nil { log.Printf("recording: SaveTranscript error session=%s turn=%s: %v", sessionID, turn.recTurnID, err) return } turn.transcriptSaved = true } saveTurnRawAudio := func(turn *voicePipelineTurn) { if turn == nil || turn.rawAudioSaved || len(turn.recAudioBuf) == 0 { return } if o.recorder == nil || turn.recTurnID == "" { return } if err := o.recorder.SaveRawAudio(turn.sessionDir, turn.recTurnID, turn.recAudioBuf, turn.recAudioSR); err != nil { log.Printf("recording: SaveRawAudio error session=%s turn=%s: %v", sessionID, turn.recTurnID, err) return } turn.rawAudioSaved = true } saveTurnConversation := func(turn *voicePipelineTurn) { if turn == nil || turn.conversationSaved || strings.TrimSpace(turn.assistantText) == "" { return } saved, err := o.persistSessionConversation(session) if err != nil { log.Printf("conversation: SaveConversation error session=%s turn=%d: %v", sessionID, turn.seq, err) return } if saved { turn.conversationSaved = true } } saveCompletedTurn := func(turn *voicePipelineTurn) { if turn == nil || turn.aborted { return } saveAssistantMessage(turn) saveTurnRawAudio(turn) saveTurnTranscript(turn) saveTurnConversation(turn) } recordIgnoredTurnOutput := func(turn *voicePipelineTurn, output *pb.VoiceLLMOutput) bool { if turn == nil || output == nil { return false } isFinal := voiceOutputIsFinal(output) if transcript := output.GetTranscript(); transcript != "" { if isFinal { turn.assistantText = transcript } else { turn.assistantText += transcript } } if audio := output.GetAudio(); audio != nil && len(audio.GetData()) > 0 { turn.recAudioBuf = append(turn.recAudioBuf, audio.GetData()...) if int(audio.GetSampleRate()) > 0 { turn.recAudioSR = int(audio.GetSampleRate()) } } if isFinal { shouldBroadcast := !turn.historySaved && strings.TrimSpace(turn.assistantText) != "" saveAssistantMessage(turn) saveTurnRawAudio(turn) saveTurnTranscript(turn) saveTurnConversation(turn) if shouldBroadcast { o.broadcastJSON(sessionID, map[string]any{ "type": "transcript", "text": turn.assistantText, "is_final": true, "speaker": "assistant", "turn_seq": turn.seq, }) } } return isFinal } setIdleIfCurrent := func(turnSeq uint64) { if turnSeq == 0 || !session.IsCurrentPipeline(pipelineSeq) || !session.IsCurrentTurn(turnSeq) { return } session.SetState(StateListening) o.broadcastStatusTurn(sessionID, "idle", turnSeq) } startTurn := func(key string) *voicePipelineTurn { seq := pendingTurnSeq if seq == 0 { seq = session.MarkTurnStarted() o.advancePlaybackEpoch(sessionID, seq) } if key != "" { delete(ignoredTurns, key) } turn := &voicePipelineTurn{ seq: seq, key: key, questionID: pendingQuestionID, replyID: pendingReplyID, recTurnID: fmt.Sprintf("turn%d", seq), recAudioSR: 16000, sessionDir: sessionDir, turnStart: time.Now(), userFinalAt: pendingUserFinalAt, syncBuf: newVoiceAVSyncBuffer(voiceMaxPCMBufferSamples), } pendingTurnSeq = 0 pendingTurnAssistantReady = false pendingQuestionID = "" pendingReplyID = "" pendingUserFinalAt = time.Time{} pendingAssistantMustMatchKey = false logVoiceTrace("go_turn_started", sessionID, turn.seq, turn.replyID, turn.questionID, turn.userFinalAt) broadcastProcessing(seq) return turn } pendingTurnKey := func() string { if pendingReplyID != "" { return "reply:" + pendingReplyID } if pendingQuestionID != "" { return "question:" + pendingQuestionID } return "" } startAvatarWorker := func(turn *voicePipelineTurn) { if turn == nil || turn.aborted || turn.avatarStarted { return } turn.avatarStarted = true turn.doneCh = make(chan voicePipelineTurnResult, 1) turn.avatarAudioCh = make(chan *pb.AudioChunk, 64) turn.avatarCtx, turn.avatarCancel = context.WithCancel(ctx) turn.avatarWorkerAt = time.Now() logVoiceTrace("go_avatar_worker_started", sessionID, turn.seq, turn.replyID, turn.questionID, turn.userFinalAt) currentTurnDone = turn.doneCh go func(turn *voicePipelineTurn) { result := voicePipelineTurnResult{turn: turn} defer func() { turn.doneCh <- result }() o.avatarMu.Lock() defer o.avatarMu.Unlock() if turn.avatarCtx.Err() != nil { return } avatarCtx := inference.WithTraceContext(turn.avatarCtx, inference.TraceContext{ SessionID: sessionID, QuestionID: turn.questionID, ReplyID: turn.replyID, TurnSeq: turn.seq, UserFinalAt: turn.userFinalAt, }) videoCh, avatarErrCh := o.inference.GenerateAvatarStream(avatarCtx, turn.avatarAudioCh) logVoiceTrace("go_avatar_grpc_stream_opened", sessionID, turn.seq, turn.replyID, turn.questionID, turn.userFinalAt) var turnRec *recording.TurnRecording var ( segVideo []byte segFrames int segWidth int segHeight int segFPS int segCount int segSeq int64 segMediaStartMS int64 lastKnownSampleRate int firstFrameSent bool speakingBroadcasted bool ) flushVoiceSeg := func(isFinalSeg bool) { if segCount == 0 { return } segSeq++ segDurationMS := durationMSForVideo(segFrames, segFPS) peer := lookupPeer() if peer != nil { traceLabel := "" if segSeq == 1 { traceLabel = voiceTraceLabel(sessionID, turn.seq, turn.replyID, turn.questionID, segSeq) } segPCM, outSamples, wantSamples, bufferedSamplesAfterTake := turn.syncBuf.takeSegmentPCM(segFrames, segFPS, isFinalSeg) bufferedSamples, _, _, sampleRate := turn.syncBuf.snapshot() if sampleRate > 0 { lastKnownSampleRate = sampleRate } else { sampleRate = lastKnownSampleRate } if sampleRate <= 0 { sampleRate = 16000 } if segSeq == 1 && outSamples < wantSamples { log.Printf("voice av drift for session %s: out_samples=%d want_samples=%d frames=%d fps=%d buffered_samples=%d", sessionID, outSamples, wantSamples, segFrames, segFPS, bufferedSamples) } if segSeq == 1 && isFinalSeg && bufferedSamplesAfterTake > 0 { log.Printf("voice av strict trim tail session=%s: trimmed_samples=%d", sessionID, bufferedSamplesAfterTake) } raw := &mediapeer.RawAVSegment{ TraceLabel: traceLabel, Epoch: turn.seq, SegmentSeq: segSeq, MediaStartMS: segMediaStartMS, DurationMS: segDurationMS, RGB: segVideo, PCM: segPCM, UserFinalAt: turn.userFinalAt, SampleRate: sampleRate, Width: segWidth, Height: segHeight, FPS: segFPS, NumFrames: segFrames, } if err := peer.SendAVSegment(raw); err != nil { log.Printf("voice av SendAVSegment failed session=%s seg=%d: %v", sessionID, segSeq, err) } if turnRec != nil { turnRec.WriteVideoChunk(segVideo) turnRec.WriteAudioChunk(segPCM, sampleRate) } } segMediaStartMS += segDurationMS segVideo = nil segFrames = 0 segCount = 0 } for { select { case <-turn.avatarCtx.Done(): flushVoiceSeg(false) if turnRec != nil { _ = turnRec.Finish() } return case chunk, ok := <-videoCh: if !ok { flushVoiceSeg(false) if turnRec != nil { _ = turnRec.Finish() turnRec = nil } if remain, totalIn, totalOut, _ := turn.syncBuf.snapshot(); remain > 0 { log.Printf("voice sync tail flush for session %s: dropping_unaligned_samples=%d total_in=%d total_out=%d", sessionID, remain, totalIn, totalOut) } if err := <-avatarErrCh; err != nil && turn.avatarCtx.Err() == nil && !errors.Is(err, context.Canceled) { result.err = err } if turn.avatarCtx.Err() == nil { if peer := lookupPeer(); peer != nil { peer.WaitAVDrain(10 * time.Second) } } return } nf := int(chunk.GetNumFrames()) if nf <= 0 && int(chunk.GetWidth())*int(chunk.GetHeight())*3 > 0 { nf = len(chunk.GetData()) / (int(chunk.GetWidth()) * int(chunk.GetHeight()) * 3) } fps := int(chunk.GetFps()) if fps <= 0 { fps = 20 } if !firstFrameSent { firstFrameSent = true turn.firstVideoAt = time.Now() logVoiceTrace( "go_avatar_first_video_received", sessionID, turn.seq, turn.replyID, turn.questionID, turn.userFinalAt, "avatar_ms="+strconv.FormatInt(time.Since(turn.avatarWorkerAt).Milliseconds(), 10), ) log.Printf("TTFF voice pipeline session=%s turn=%d first_video_chunk=%.3fs", sessionID, turn.seq, time.Since(turn.turnStart).Seconds()) if !speakingBroadcasted && session.IsCurrentPipeline(pipelineSeq) && session.IsCurrentTurn(turn.seq) { speakingBroadcasted = true session.SetState(StateSpeaking) o.broadcastStatusTurn(sessionID, "speaking", turn.seq) } } if turnRec == nil && o.recorder != nil && turn.recTurnID != "" && nf > 0 { turnRec = o.recorder.BeginTurn(turn.sessionDir, turn.recTurnID, int(chunk.GetWidth()), int(chunk.GetHeight()), fps) } segVideo = append(segVideo, chunk.GetData()...) segFrames += nf segWidth = int(chunk.GetWidth()) segHeight = int(chunk.GetHeight()) segFPS = fps segCount++ flushVoiceSeg(chunk.GetIsFinal()) if chunk.GetIsFinal() && turnRec != nil { _ = turnRec.Finish() turnRec = nil } } } }(turn) } closeTurnInput := func(turn *voicePipelineTurn) { if turn == nil || !turn.avatarStarted || turn.avatarInputClosed { return } sampleRate := turn.recAudioSR if sampleRate <= 0 { if _, _, _, bufferedSR := turn.syncBuf.snapshot(); bufferedSR > 0 { sampleRate = bufferedSR } } silence := buildTrailingSilence(sampleRate) if dropped := turn.syncBuf.appendPCM(silence.GetData(), int(silence.GetSampleRate())); dropped > 0 { bufferedSamples, _, _, _ := turn.syncBuf.snapshot() log.Printf("voice sync buffer overflow for session %s: dropped=%d bytes, buffered_samples=%d", sessionID, dropped, bufferedSamples) } select { case turn.avatarAudioCh <- silence: case <-turn.avatarCtx.Done(): } close(turn.avatarAudioCh) turn.avatarInputClosed = true turn.avatarInputClosedAt = time.Now() if turn.firstVideoAt.IsZero() { logVoiceTrace("go_avatar_input_closed", sessionID, turn.seq, turn.replyID, turn.questionID, turn.userFinalAt) } } currentStatusTurnSeq := func() uint64 { if currentTurn != nil { return currentTurn.seq } if pendingTurnSeq != 0 { return pendingTurnSeq } return session.CurrentTurnSeq() } defer func() { if currentTurn != nil { abortTurn(currentTurn, false) } session.MarkPipelineFinished(pipelineSeq) if session.IsCurrentPipeline(pipelineSeq) { session.SetState(StateListening) o.broadcastStatusTurn(sessionID, "idle", currentStatusTurnSeq()) } }() if initialTurnSeq == 0 && session.IsCurrentPipeline(pipelineSeq) { session.SetState(StateListening) } for outputCh != nil || currentTurn != nil || errCh != nil { select { case <-ctx.Done(): return case result := <-currentTurnDone: if currentTurn == nil || result.turn != currentTurn { continue } turn := currentTurn currentTurn = nil currentTurnDone = nil if result.err != nil && !turn.aborted { log.Printf("Avatar stream error for session %s (omni): %v", sessionID, result.err) if session.IsCurrentPipeline(pipelineSeq) { o.broadcastError(sessionID, "Avatar generation failed") } } if turn.aborted { continue } saveCompletedTurn(turn) setIdleIfCurrent(turn.seq) case output, ok := <-outputCh: if !ok { outputCh = nil if currentTurn != nil && !o.AvatarEnabled() && !currentTurn.avatarStarted { turn := currentTurn currentTurn = nil currentTurnDone = nil saveCompletedTurn(turn) setIdleIfCurrent(turn.seq) } continue } if rawTaskEvent := strings.TrimSpace(output.GetTaskEventJson()); rawTaskEvent != "" { var payload map[string]any if err := json.Unmarshal([]byte(rawTaskEvent), &payload); err != nil { log.Printf("invalid persona task event json session=%s: %v", sessionID, err) } else { o.broadcastJSON(sessionID, o.persistPersonaTaskEvent(ctx, session, sessionID, payload)) } if !voiceOutputHasAssistantContent(output) && !voiceOutputIsFinal(output) && strings.TrimSpace(output.GetUserTranscript()) == "" && !output.GetBargeIn() { continue } } outputQuestionID := output.GetQuestionId() outputReplyID := output.GetReplyId() if output.GetBargeIn() { if currentTurn != nil || pendingTurnSeq == 0 { seq := reservePendingTurn() if outputQuestionID != "" { pendingQuestionID = outputQuestionID } if outputReplyID != "" { pendingReplyID = outputReplyID } pendingTurnAssistantReady = false logVoiceTrace("go_barge_in_received", sessionID, seq, pendingReplyID, pendingQuestionID, time.Time{}) if currentTurn != nil { abortTurn(currentTurn, true) currentTurn = nil currentTurnDone = nil pendingAssistantMustMatchKey = true broadcastProcessing(seq) } } continue } if currentTurn != nil { if outputQuestionID != "" { currentTurn.questionID = outputQuestionID } if outputReplyID != "" { currentTurn.replyID = outputReplyID } } else { if outputQuestionID != "" { pendingQuestionID = outputQuestionID } if outputReplyID != "" { pendingReplyID = outputReplyID } } if userText := strings.TrimSpace(output.GetUserTranscript()); userText != "" { if suppressUserTranscript { if pendingUserFinalAt.IsZero() { pendingUserFinalAt = time.Now() } pendingTurnAssistantReady = true if outputQuestionID != "" { pendingQuestionID = outputQuestionID } if outputReplyID != "" { pendingReplyID = outputReplyID } if !voiceOutputHasAssistantContent(output) && !voiceOutputIsFinal(output) { continue } } else { if currentTurn != nil { abortTurn(currentTurn, true) currentTurn = nil currentTurnDone = nil pendingAssistantMustMatchKey = true } if outputQuestionID != "" { pendingQuestionID = outputQuestionID } if outputReplyID != "" { pendingReplyID = outputReplyID } seq := reservePendingTurn() pendingTurnAssistantReady = true pendingUserFinalAt = time.Now() logVoiceTrace( "go_user_transcript_received", sessionID, seq, pendingReplyID, pendingQuestionID, pendingUserFinalAt, ) session.AddMessage(ChatMessage{Role: "user", Content: userText, TurnSeq: seq}) o.broadcastJSON(sessionID, map[string]any{ "type": "transcript", "text": userText, "is_final": true, "speaker": "user", "turn_seq": seq, }) broadcastProcessing(seq) } } turnKey := voiceOutputTurnKey(output) if turnKey != "" { if ignoredTurn, ignored := ignoredTurns[turnKey]; ignored { if recordIgnoredTurnOutput(ignoredTurn, output) { delete(ignoredTurns, turnKey) } continue } } if pendingTurnSeq != 0 && !pendingTurnAssistantReady && currentTurn == nil && voiceOutputHasAssistantContent(output) { continue } if pendingTurnSeq != 0 && pendingTurnAssistantReady && pendingAssistantMustMatchKey && currentTurn == nil && voiceOutputHasAssistantContent(output) { if expectedKey := pendingTurnKey(); expectedKey != "" && turnKey != expectedKey { continue } } if !voiceOutputHasAssistantContent(output) && !voiceOutputIsFinal(output) { continue } if currentTurn == nil && !voiceOutputHasAssistantContent(output) { continue } if currentTurn != nil { if currentTurn.key == "" && turnKey != "" { currentTurn.key = turnKey } else if turnKey != "" && currentTurn.key != "" && turnKey != currentTurn.key { abortTurn(currentTurn, true) currentTurn = nil currentTurnDone = nil } } if currentTurn == nil { if pendingTurnSeq != 0 && !pendingTurnAssistantReady { continue } currentTurn = startTurn(turnKey) } if currentTurn.key == "" && turnKey != "" { currentTurn.key = turnKey } isFinal := voiceOutputIsFinal(output) if transcript := output.GetTranscript(); transcript != "" { o.broadcastJSON(sessionID, map[string]any{ "type": "transcript", "text": transcript, "is_final": isFinal, "speaker": "assistant", "turn_seq": currentTurn.seq, }) if isFinal { currentTurn.assistantText = transcript } else { currentTurn.assistantText += transcript } } audio := output.GetAudio() if audio != nil && len(audio.GetData()) > 0 { pcm, pcmSampleRate := audioChunkToPCM16(audio) if currentTurn.firstAudioAt.IsZero() { currentTurn.firstAudioAt = time.Now() logVoiceTrace( "go_first_voice_audio_received", sessionID, currentTurn.seq, currentTurn.replyID, currentTurn.questionID, currentTurn.userFinalAt, ) } if len(pcm) > 0 { currentTurn.recAudioBuf = append(currentTurn.recAudioBuf, pcm...) if pcmSampleRate > 0 { currentTurn.recAudioSR = pcmSampleRate } } if !o.AvatarEnabled() { if len(pcm) > 0 { if !currentTurn.audioOnlyStarted && session.IsCurrentPipeline(pipelineSeq) && session.IsCurrentTurn(currentTurn.seq) { currentTurn.audioOnlyStarted = true session.SetState(StateSpeaking) o.broadcastStatusTurn(sessionID, "speaking", currentTurn.seq) } if peer := lookupPeer(); peer != nil { if err := peer.PublishAudioFrame(pcm, pcmSampleRate); err != nil { log.Printf("voice audio-only publish failed session=%s turn=%d: %v", sessionID, currentTurn.seq, err) } } } } else { if !currentTurn.avatarStarted { startAvatarWorker(currentTurn) } if len(pcm) > 0 { if dropped := currentTurn.syncBuf.appendPCM(pcm, pcmSampleRate); dropped > 0 { bufferedSamples, _, _, _ := currentTurn.syncBuf.snapshot() log.Printf("voice sync buffer overflow for session %s: dropped=%d bytes, buffered_samples=%d", sessionID, dropped, bufferedSamples) } } audioClone := proto.Clone(audio).(*pb.AudioChunk) if currentTurn.firstAvatarAudioAt.IsZero() { currentTurn.firstAvatarAudioAt = time.Now() logVoiceTrace( "go_avatar_first_audio_enqueued", sessionID, currentTurn.seq, currentTurn.replyID, currentTurn.questionID, currentTurn.userFinalAt, ) } select { case currentTurn.avatarAudioCh <- audioClone: case <-currentTurn.avatarCtx.Done(): } } } if !isFinal { continue } if currentTurn.audioFinalAt.IsZero() { currentTurn.audioFinalAt = time.Now() if currentTurn.firstVideoAt.IsZero() { logVoiceTrace("go_voice_audio_final_received", sessionID, currentTurn.seq, currentTurn.replyID, currentTurn.questionID, currentTurn.userFinalAt) } } saveAssistantMessage(currentTurn) saveTurnRawAudio(currentTurn) saveTurnTranscript(currentTurn) saveTurnConversation(currentTurn) if currentTurn.avatarStarted { closeTurnInput(currentTurn) continue } turn := currentTurn currentTurn = nil currentTurnDone = nil saveCompletedTurn(turn) setIdleIfCurrent(turn.seq) case err, ok := <-errCh: if !ok { errCh = nil continue } if err != nil && !errors.Is(err, context.Canceled) { streamErr = err } errCh = nil } } if streamErr != nil { log.Printf("Omni stream error for session %s: %v", sessionID, streamErr) if session.IsCurrentPipeline(pipelineSeq) { o.broadcastError(sessionID, "Voice conversation failed") } } } // HandleClientMediaReady starts the one-time proactive greeting for omni sessions // after the browser confirms that realtime media is connected. func (o *Orchestrator) HandleClientMediaReady(ctx context.Context, sessionID string) error { if o == nil || o.sessionMgr == nil { return errors.New("orchestrator is not configured") } if o.inference == nil { return errors.New("inference service is not configured") } session, err := o.sessionMgr.Get(sessionID) if err != nil { return err } if session.Mode != ModeOmni { log.Printf("startup greeting ignored for non-omni session %s mode=%s", sessionID, modeStringForLog(session.Mode)) return nil } if !session.TryStartVoiceStartupGreeting() { log.Printf("startup greeting already started for session %s", sessionID) return nil } if ctx == nil { ctx = context.Background() } prompt := o.buildVoiceStartupGreetingPrompt(session) voiceConfig := o.buildVoiceStartupGreetingSessionConfig(session, sessionID) log.Printf("startup greeting starting for session %s provider=%s history_items=%d", sessionID, voiceConfig.Provider, len(session.DialogContextSnapshot())) o.stopPipelineAndWait(session, sessionID, true) pipeCtx, cancel := context.WithCancel(ctx) session.mu.Lock() session.PipelineCancel = cancel session.mu.Unlock() turnSeq := session.MarkTurnStarted() o.advancePlaybackEpoch(sessionID, turnSeq) session.SetState(StateProcessing) o.broadcastStatusTurn(sessionID, "processing", turnSeq) pipelineSeq := session.MarkPipelineRunning() inputCh := singleVoiceTextInput(prompt) go func(seq uint64) { o.runVoiceLLMPipelineWithConfig( pipeCtx, session, sessionID, inputCh, seq, turnSeq, voiceConfig, true, ) if pipeCtx.Err() != nil || !session.IsCurrentPipeline(seq) { return } if err := o.resumeVoiceAudioStream(sessionID); err != nil { log.Printf("Failed to resume omni audio stream after startup greeting for session %s: %v", sessionID, err) } }(pipelineSeq) return nil } // Interrupt cancels the current pipeline for a session. func (o *Orchestrator) Interrupt(sessionID string) error { session, err := o.sessionMgr.Get(sessionID) if err != nil { return err } turnSeq := session.MarkTurnStarted() o.advancePlaybackEpoch(sessionID, turnSeq) o.cancelPipeline(session) // Also interrupt the omni model on the inference side if session.Mode == ModeOmni { _ = o.inference.Interrupt(context.Background(), sessionID) } session.SetState(StateListening) o.broadcastStatusTurn(sessionID, "idle", turnSeq) return nil } // TeardownSession cleans up all resources for a session. func (o *Orchestrator) TeardownSession(sessionID string) error { session, err := o.sessionMgr.Get(sessionID) if err != nil { return err } o.cancelPipeline(session) // Wait for pipeline goroutine to finish storing messages (up to 3s) session.WaitPipelineDone(3 * time.Second) // Disconnect media peer o.mu.Lock() peer, ok := o.peers[sessionID] if ok { delete(o.peers, sessionID) } delete(o.directPeers, sessionID) o.mu.Unlock() if peer != nil { peer.StopAVPipeline() _ = peer.Disconnect() } // Close WebSocket connections o.wsHub.CloseSession(sessionID) session.SetState(StateClosed) return nil } // TeardownAll cleans up all sessions. Called during server shutdown. func (o *Orchestrator) TeardownAll() { o.mu.Lock() peers := make(map[string]mediapeer.MediaPeer, len(o.peers)) for k, v := range o.peers { peers[k] = v } o.peers = make(map[string]mediapeer.MediaPeer) o.directPeers = make(map[string]*direct.DirectPeer) o.mu.Unlock() for _, peer := range peers { peer.StopAVPipeline() _ = peer.Disconnect() } } // cancelPipeline cancels the active pipeline for a session if one exists. func (o *Orchestrator) cancelPipeline(session *Session) { session.mu.Lock() cancel := session.PipelineCancel session.PipelineCancel = nil session.mu.Unlock() if cancel != nil { cancel() } } func (o *Orchestrator) advancePlaybackEpoch(sessionID string, turnSeq uint64) { if turnSeq == 0 { return } o.mu.RLock() peer := o.peers[sessionID] o.mu.RUnlock() if peer != nil { peer.AdvancePlaybackEpoch(turnSeq) } } // broadcastStatus sends an avatar_status message to all WebSocket clients. func (o *Orchestrator) broadcastStatus(sessionID, status string) { o.broadcastJSON(sessionID, map[string]string{ "type": "avatar_status", "status": status, }) } func (o *Orchestrator) broadcastStatusTurn(sessionID, status string, turnSeq uint64) { payload := map[string]any{ "type": "avatar_status", "status": status, } if turnSeq > 0 { payload["turn_seq"] = turnSeq } o.broadcastJSON(sessionID, payload) } // broadcastError sends an error message to all WebSocket clients. func (o *Orchestrator) broadcastError(sessionID, message string) { o.broadcastJSON(sessionID, map[string]string{ "type": "error", "message": message, }) } func (o *Orchestrator) SpeakAssistantText(ctx context.Context, sessionID, text string, persist bool) error { if o == nil { return errors.New("orchestrator is nil") } session, err := o.sessionMgr.Get(sessionID) if err != nil { return err } text = strings.TrimSpace(text) if text == "" { return nil } o.stopPipelineAndWait(session, sessionID, session.Mode == ModeOmni) turnSeq := session.MarkTurnStarted() o.advancePlaybackEpoch(sessionID, turnSeq) o.broadcastJSON(sessionID, map[string]any{ "type": "assistant_message", "text": text, "turn_seq": turnSeq, }) if persist { session.AddMessage(ChatMessage{Role: "assistant", Content: text, TurnSeq: turnSeq}) if _, err := o.persistSessionConversation(session); err != nil { log.Printf("conversation: SaveConversation assistant speech error session=%s: %v", sessionID, err) } } if o.inference == nil { return nil } pipeCtx, cancel := context.WithCancel(ctx) session.mu.Lock() session.PipelineCancel = cancel session.mu.Unlock() pipelineSeq := session.MarkPipelineRunning() resumeOmni := session.Mode == ModeOmni go func() { o.runAssistantSpeechPipeline(pipeCtx, session, sessionID, text, pipelineSeq, turnSeq) if resumeOmni && pipeCtx.Err() == nil && session.IsCurrentPipeline(pipelineSeq) { if err := o.resumeVoiceAudioStream(sessionID); err != nil { log.Printf("Failed to resume omni audio stream after assistant speech for session %s: %v", sessionID, err) } } }() return nil } func (o *Orchestrator) runAssistantSpeechPipeline(ctx context.Context, session *Session, sessionID, text string, pipelineSeq uint64, turnSeq uint64) { defer func() { session.MarkPipelineFinished(pipelineSeq) if session.IsCurrentPipeline(pipelineSeq) { session.SetState(StateListening) o.broadcastStatusTurn(sessionID, "idle", turnSeq) } }() session.SetState(StateProcessing) o.broadcastStatusTurn(sessionID, "processing", turnSeq) components, voice, speakingStyle, language := o.standardCharacterConfig(session) textCh := make(chan string, 1) textCh <- text close(textCh) ttsAudioCh, ttsErrCh := o.inference.SynthesizeSpeechStream(ctx, textCh, inference.TTSConfig{ Provider: components.TTS, Voice: voice, SpeakingStyle: speakingStyle, Language: language, SessionID: sessionID, }) lookupPeer := func() mediapeer.MediaPeer { o.mu.RLock() defer o.mu.RUnlock() return o.peers[sessionID] } if !o.AvatarEnabled() { speakingBroadcasted := false for ttsAudioCh != nil || ttsErrCh != nil { select { case <-ctx.Done(): return case chunk, ok := <-ttsAudioCh: if !ok { ttsAudioCh = nil continue } pcm, sampleRate := audioChunkToPCM16(chunk) if len(pcm) == 0 { continue } if !speakingBroadcasted { speakingBroadcasted = true session.SetState(StateSpeaking) o.broadcastStatusTurn(sessionID, "speaking", turnSeq) } if peer := lookupPeer(); peer != nil { if err := peer.PublishAudioFrame(pcm, sampleRate); err != nil { log.Printf("assistant speech audio-only publish failed session=%s: %v", sessionID, err) } } case err, ok := <-ttsErrCh: if !ok { ttsErrCh = nil continue } if err != nil { log.Printf("assistant speech TTS error for session %s: %v", sessionID, err) o.broadcastError(sessionID, "Speech synthesis failed") return } } } return } syncBuf := newVoiceAVSyncBuffer(voiceMaxPCMBufferSamples) avatarAudioCh := make(chan *pb.AudioChunk, 8) go func() { defer close(avatarAudioCh) for { select { case <-ctx.Done(): return case chunk, ok := <-ttsAudioCh: if !ok { return } pcm, sampleRate := audioChunkToPCM16(chunk) if len(pcm) > 0 { _ = syncBuf.appendPCM(pcm, sampleRate) } select { case avatarAudioCh <- chunk: case <-ctx.Done(): return } case err, ok := <-ttsErrCh: if !ok { ttsErrCh = nil continue } if err != nil { log.Printf("assistant speech TTS error for session %s: %v", sessionID, err) o.broadcastError(sessionID, "Speech synthesis failed") return } } } }() o.avatarMu.Lock() defer o.avatarMu.Unlock() videoCh, videoErrCh := o.inference.GenerateAvatarStream(ctx, avatarAudioCh) var ( segVideo []byte segFrames int segWidth int segHeight int segFPS int segCount int segSeq int64 segMediaStartMS int64 firstFrameSent bool ) flushSeg := func(isFinalSeg bool) { if segCount == 0 { return } segSeq++ segDurationMS := durationMSForVideo(segFrames, segFPS) if peer := lookupPeer(); peer != nil { segPCM, _, _, _ := syncBuf.takeSegmentPCM(segFrames, segFPS, isFinalSeg) _, _, _, sampleRate := syncBuf.snapshot() if sampleRate <= 0 { sampleRate = 16000 } raw := &mediapeer.RawAVSegment{ TraceLabel: voiceTraceLabel(sessionID, turnSeq, "task", "", segSeq), Epoch: turnSeq, SegmentSeq: segSeq, MediaStartMS: segMediaStartMS, DurationMS: segDurationMS, RGB: segVideo, PCM: segPCM, SampleRate: sampleRate, Width: segWidth, Height: segHeight, FPS: segFPS, NumFrames: segFrames, } if err := peer.SendAVSegment(raw); err != nil { log.Printf("assistant speech SendAVSegment failed session=%s: %v", sessionID, err) } } segMediaStartMS += segDurationMS segVideo = nil segFrames = 0 segCount = 0 } for { select { case <-ctx.Done(): flushSeg(false) return case chunk, ok := <-videoCh: if !ok { flushSeg(true) if err := <-videoErrCh; err != nil { log.Printf("assistant speech avatar error for session %s: %v", sessionID, err) o.broadcastError(sessionID, "Avatar generation failed") } if ctx.Err() == nil { if peer := lookupPeer(); peer != nil { peer.WaitAVDrain(10 * time.Second) } } return } nf := int(chunk.GetNumFrames()) if nf <= 0 && int(chunk.GetWidth())*int(chunk.GetHeight())*3 > 0 { nf = len(chunk.GetData()) / (int(chunk.GetWidth()) * int(chunk.GetHeight()) * 3) } fps := int(chunk.GetFps()) if fps <= 0 { fps = 25 } if !firstFrameSent { firstFrameSent = true session.SetState(StateSpeaking) o.broadcastStatusTurn(sessionID, "speaking", turnSeq) } segVideo = append(segVideo, chunk.GetData()...) segFrames += nf segWidth = int(chunk.GetWidth()) segHeight = int(chunk.GetHeight()) segFPS = fps segCount++ if segCount >= stdChunksPerSegment || chunk.GetIsFinal() { flushSeg(chunk.GetIsFinal()) } } } } // PersistSessionConversation writes the current session history to session.json. func (o *Orchestrator) PersistSessionConversation(session *Session) (bool, error) { return o.persistSessionConversation(session) } func (o *Orchestrator) persistSessionConversation(session *Session) (bool, error) { if o == nil || o.charStore == nil || session == nil { return false, nil } sessionID, characterID, startedAt, endedAt, history := session.ConversationSnapshot() if characterID == "" || len(history) == 0 { return false, nil } messages := make([]map[string]any, len(history)) for i, m := range history { ts := m.Timestamp if ts.IsZero() { ts = startedAt } messages[i] = map[string]any{ "role": m.Role, "content": m.Content, "timestamp": ts.UTC().Format(time.RFC3339Nano), } if m.TurnSeq > 0 { messages[i]["turn_seq"] = m.TurnSeq } } if strings.TrimSpace(characterID) == kanshan.CharacterID { ownerID := session.OwnerIDSnapshot() if strings.TrimSpace(ownerID) == "" { return false, nil } if err := o.charStore.SaveConversationForOwner(characterID, ownerID, sessionID, startedAt, endedAt, messages); err != nil { return false, err } } else { if err := o.charStore.SaveConversation(characterID, sessionID, startedAt, endedAt, messages); err != nil { return false, err } } return true, nil } // sessionRecordingDir returns the directory for recording output. // If the session has a character, records go into the character's sessions/ dir. // Otherwise falls back to a timestamp-based dir (used by the recorder's OutputDir). func (o *Orchestrator) sessionRecordingDir(session *Session) string { if session.CharacterID != "" && o.charStore != nil { var dir string if strings.TrimSpace(session.CharacterID) == kanshan.CharacterID { ownerID := session.OwnerIDSnapshot() if strings.TrimSpace(ownerID) != "" { dir = o.charStore.SessionRecordingDirForOwner(session.CharacterID, ownerID, session.ID, session.CreatedAt) } } else { dir = o.charStore.SessionRecordingDir(session.CharacterID, session.ID, session.CreatedAt) } if dir != "" { session.mu.Lock() session.RecordingDir = dir session.mu.Unlock() return dir } } // Fallback: legacy timestamp-based dir return time.Now().Format("20060102-150405") } // broadcastJSON marshals and broadcasts a JSON message. func (o *Orchestrator) broadcastJSON(sessionID string, v any) { if o.wsHub == nil { return } data, err := json.Marshal(v) if err != nil { log.Printf("Failed to marshal broadcast: %v", err) return } o.wsHub.Broadcast(sessionID, data) }