package main import ( "flag" "fmt" "log" "net" "net/http" "os" "os/signal" "path/filepath" "syscall" "github.com/cyberverse/server/internal/agenttask" "github.com/cyberverse/server/internal/api" "github.com/cyberverse/server/internal/character" "github.com/cyberverse/server/internal/config" "github.com/cyberverse/server/internal/direct" "github.com/cyberverse/server/internal/inference" "github.com/cyberverse/server/internal/livekit" "github.com/cyberverse/server/internal/orchestrator" "github.com/cyberverse/server/internal/recording" "github.com/cyberverse/server/internal/ws" ) func main() { configPath := flag.String("config", "../../cyberverse_config.yaml", "path to config file") flag.Parse() // Load .env before config so ${VAR} placeholders in YAML expand correctly. envPath := filepath.Join(filepath.Dir(*configPath), ".env") if err := config.LoadDotenv(envPath); err != nil { log.Printf("Warning: failed to load .env: %v", err) } if _, err := os.Stat(*configPath); os.IsNotExist(err) { log.Fatalf("Config file %s not found. Copy infra/cyberverse_config.example.yaml to cyberverse_config.yaml first.", *configPath) } cfg, err := config.Load(*configPath) if err != nil { log.Fatalf("Failed to load config: %v", err) } // Create session manager sessionMgr := orchestrator.NewSessionManager(cfg.Session.MaxConcurrent) // Create WebSocket hub wsHub := ws.NewHub() // Create inference gRPC client inferenceClient, err := inference.NewClient(cfg.Inference.Addr) if err != nil { log.Printf("Warning: failed to connect to inference server at %s: %v", cfg.Inference.Addr, err) log.Printf("Server will start but inference features will be unavailable") } // Create LiveKit room manager roomMgr := livekit.NewRoomManager(cfg.LiveKit.URL, cfg.LiveKit.APIKey, cfg.LiveKit.APISecret) // Create character store (directory-based, one dir per character) dataDir := filepath.Join(filepath.Dir(*configPath), "data") os.MkdirAll(dataDir, 0755) charStore, err := character.NewStore(filepath.Join(dataDir, "characters")) if err != nil { log.Fatalf("Failed to init character store: %v", err) } taskDBPath := filepath.Join(dataDir, "tasks", "tasks.db") artifactDir := filepath.Join(dataDir, "tasks", "artifacts") taskStore, err := agenttask.OpenStore(taskDBPath, artifactDir) if err != nil { log.Fatalf("Failed to init agent task store: %v", err) } taskSvc := agenttask.NewService(taskStore, wsHub) log.Printf("Agent task projection store initialized: db=%s", taskDBPath) // Create orchestrator (needs charStore for recording paths) recorder := recording.NewVideoRecorder(cfg.Recording) orch := orchestrator.New(inferenceClient, wsHub, sessionMgr, recorder, charStore, cfg.Pipeline) if taskSvc != nil { orch.SetTaskService(taskSvc) taskSvc.SetEventHandler(orch.HandleTaskEvent) } // Embedded TURN-over-TCP server for NAT traversal (AutoDL, SSH tunnel, etc.) var turnServer *direct.TURNServer if cfg.Pipeline.TURNEnabled && cfg.Pipeline.TURNPort > 0 { publicIP := cfg.Pipeline.ICEPublicIP // Resolve hostname to IP if needed if publicIP != "" && net.ParseIP(publicIP) == nil { addrs, err := net.LookupHost(publicIP) if err != nil || len(addrs) == 0 { log.Fatalf("Cannot resolve ice_public_ip %q: %v", publicIP, err) } publicIP = addrs[0] log.Printf("Resolved ice_public_ip %q -> %s", cfg.Pipeline.ICEPublicIP, publicIP) } if publicIP == "" { publicIP = "127.0.0.1" } ts, err := direct.NewTURNServer( cfg.Pipeline.TURNPort, publicIP, cfg.Pipeline.TURNRealm, cfg.Pipeline.TURNUsername, cfg.Pipeline.TURNPassword, ) if err != nil { log.Fatalf("TURN server setup failed: %v", err) } turnServer = ts orch.SetTURNServer(ts) log.Printf("TURN server enabled on TCP port %d (relay IP: %s)", cfg.Pipeline.TURNPort, publicIP) } // WebRTC API with interceptors (NACK, TWCC, GCC pacer) for direct streaming mode if cfg.Pipeline.StreamingMode == "direct" { api, estimatorCh, err := direct.NewWebRTCAPI(direct.WebRTCAPIConfig{ InitialBitrate: 2_500_000, MinBitrate: 800_000, MaxBitrate: 4_000_000, }) if err != nil { log.Fatalf("WebRTC API setup failed: %v", err) } orch.SetWebRTCAPI(api, estimatorCh) log.Println("WebRTC API initialized with interceptors (NACK, TWCC, GCC)") } // Register session end callback to persist conversation history sessionMgr.OnSessionEnd = func(s *orchestrator.Session) { sessionID, characterID, _, _, history := s.ConversationSnapshot() log.Printf("OnSessionEnd: session=%s character=%s historyLen=%d", sessionID, characterID, len(history)) saved, err := orch.PersistSessionConversation(s) if err != nil { log.Printf("Failed to save conversation for session %s: %v", sessionID, err) return } if !saved { log.Printf("OnSessionEnd: skipping save — characterID=%q historyLen=%d", characterID, len(history)) return } log.Printf("Conversation saved for session %s (character %s)", sessionID, characterID) } // Create router with all dependencies router := api.NewRouter(sessionMgr, orch, wsHub, roomMgr, cfg, charStore, envPath, *configPath, taskSvc) addr := fmt.Sprintf("%s:%d", cfg.Server.Host, cfg.Server.HTTPPort) srv := &http.Server{ Addr: addr, Handler: router.Handler(), } // Graceful shutdown go func() { sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) <-sigCh log.Println("Shutting down server...") // Teardown all orchestrator sessions orch.TeardownAll() // Close inference client if inferenceClient != nil { inferenceClient.Close() } // Close TURN server if turnServer != nil { turnServer.Close() } // Stop session manager cleanup sessionMgr.Stop() if taskStore != nil { taskStore.Close() } srv.Close() }() log.Printf("CyberVerse Server starting on %s", addr) log.Printf("Inference server: %s", cfg.Inference.Addr) log.Printf("LiveKit URL: %s", cfg.LiveKit.URL) log.Printf("Streaming mode: %s", cfg.Pipeline.StreamingMode) if err := srv.ListenAndServe(); err != http.ErrServerClosed { log.Fatalf("Server error: %v", err) } } func resolveConfigPath(configDir, value, fallback string) string { if value == "" { return fallback } if filepath.IsAbs(value) { return value } return filepath.Join(configDir, value) }