diff --git a/cli/pkg/daemon/task.go b/cli/pkg/daemon/task.go index a805880e2..7480e9e64 100644 --- a/cli/pkg/daemon/task.go +++ b/cli/pkg/daemon/task.go @@ -82,7 +82,6 @@ func (g *GenerateTerminusdServiceEnv) Execute(runtime connector.Runtime) error { "FrpAuthMethod": g.KubeConf.Arg.Frp.AuthMethod, "TokenMaxAge": g.KubeConf.Arg.TokenMaxAge, "MarketProvider": g.KubeConf.Arg.MarketProvider, - "TerminusGlobalEnvs": common.TerminusGlobalEnvs, }, PrintContent: true, } diff --git a/cli/pkg/daemon/templates/terminusd_env.go b/cli/pkg/daemon/templates/terminusd_env.go index 0d0d1b122..f69e7748b 100644 --- a/cli/pkg/daemon/templates/terminusd_env.go +++ b/cli/pkg/daemon/templates/terminusd_env.go @@ -23,7 +23,4 @@ FRP_AUTH_METHOD={{ .FrpAuthMethod }} FRP_AUTH_TOKEN= TOKEN_MAX_AGE={{ .TokenMaxAge }} MARKET_PROVIDER={{ .MarketProvider }} -{{- range $key, $val := .TerminusGlobalEnvs }} -{{ $key }}={{ $val }} -{{- end }} `))) diff --git a/daemon/cmd/terminusd/main.go b/daemon/cmd/terminusd/main.go index 230a0feea..7febe8630 100644 --- a/daemon/cmd/terminusd/main.go +++ b/daemon/cmd/terminusd/main.go @@ -16,6 +16,7 @@ import ( "github.com/beclab/Olares/daemon/internel/watcher" "github.com/beclab/Olares/daemon/internel/watcher/cert" "github.com/beclab/Olares/daemon/internel/watcher/system" + "github.com/beclab/Olares/daemon/internel/watcher/systemenv" "github.com/beclab/Olares/daemon/internel/watcher/upgrade" "github.com/beclab/Olares/daemon/internel/watcher/usb" "github.com/beclab/Olares/daemon/pkg/cluster/state" @@ -102,6 +103,7 @@ func main() { usb.NewUmountWatcher(), upgrade.NewUpgradeWatcher(), cert.NewCertWatcher(), + systemenv.NewSystemEnvWatcher(), }, func() { if s != nil { if err := s.Restart(); err != nil { diff --git a/daemon/internel/apiserver/handlers/command_group.go b/daemon/internel/apiserver/handlers/command_group.go index 7228e853a..5e942d82e 100644 --- a/daemon/internel/apiserver/handlers/command_group.go +++ b/daemon/internel/apiserver/handlers/command_group.go @@ -20,9 +20,9 @@ import ( func init() { s := server.API cmd := s.App.Group("command") - cmd.Post("/install", handlers.RequireSignature( + cmd.Post("/install", handlers.WaitServerRunning( - handlers.RunCommand(handlers.PostTerminusInit, install.New)))) + handlers.RunCommand(handlers.PostTerminusInit, install.New))) cmd.Post("/uninstall", handlers.RequireSignature( handlers.RequireOwner( diff --git a/daemon/internel/watcher/systemenv/systemenv_watcher.go b/daemon/internel/watcher/systemenv/systemenv_watcher.go new file mode 100644 index 000000000..5a2bf8013 --- /dev/null +++ b/daemon/internel/watcher/systemenv/systemenv_watcher.go @@ -0,0 +1,174 @@ +package systemenv + +import ( + "context" + "sync" + "time" + + "github.com/beclab/Olares/daemon/internel/watcher" + "github.com/beclab/Olares/daemon/pkg/cluster/state" + "github.com/beclab/Olares/daemon/pkg/commands" + "github.com/beclab/Olares/daemon/pkg/utils" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic" + "k8s.io/klog/v2" +) + +var _ watcher.Watcher = &systemEnvWatcher{} + +type systemEnvWatcher struct { + sync.Mutex + watcher.Watcher + running bool + cancel context.CancelFunc +} + +func NewSystemEnvWatcher() watcher.Watcher { + return &systemEnvWatcher{} +} + +func (w *systemEnvWatcher) Watch(ctx context.Context) { + w.Lock() + defer w.Unlock() + + if state.CurrentState.TerminusState != state.TerminusRunning && + state.CurrentState.TerminusState != state.Upgrading && + state.CurrentState.TerminusState != state.Uninitialized && + state.CurrentState.TerminusState != state.Initializing && + state.CurrentState.TerminusState != state.InitializeFailed && + state.CurrentState.TerminusState != state.SystemError { + if w.cancel != nil { + w.cancel() + w.cancel = nil + } + w.running = false + klog.V(4).Info("systemenv watcher stopped: cluster not running") + return + } + + if w.running { + return + } + + dc, err := utils.GetDynamicClient() + if err != nil { + klog.V(4).Infof("systemenv watcher: dynamic client not ready: %v", err) + return + } + + execCtx, cancel := context.WithCancel(ctx) + w.cancel = cancel + w.running = true + + go func() { + defer func() { + w.running = false + w.cancel = nil + klog.V(4).Info("systemenv watcher exited") + }() + + startSystemEnvWatch(execCtx, dc, func(eventType watch.EventType, obj map[string]any) { + klog.V(5).Infof("systemenv event: %s", eventType) + + if eventType != watch.Added && eventType != watch.Modified { + return + } + + envName, _ := obj["envName"].(string) + if envName != "OLARES_SYSTEM_CDN_SERVICE" && envName != "OLARES_SYSTEM_REMOTE_SERVICE" { + return + } + + val, _ := obj["value"].(string) + if val == "" { + if def, ok := obj["default"].(string); ok && def != "" { + val = def + } + } + if val == "" { + return + } + + switch envName { + case "OLARES_SYSTEM_CDN_SERVICE": + if commands.OLARES_CDN_SERVICE != val { + old := commands.OLARES_CDN_SERVICE + commands.OLARES_CDN_SERVICE = val + klog.Infof("updated OLARES_CDN_SERVICE: %s -> %s", old, val) + } + case "OLARES_SYSTEM_REMOTE_SERVICE": + if commands.OLARES_REMOTE_SERVICE != val { + old := commands.OLARES_REMOTE_SERVICE + commands.OLARES_REMOTE_SERVICE = val + klog.Infof("updated OLARES_REMOTE_SERVICE: %s -> %s", old, val) + } + } + }) + }() +} + +var systemEnvGVR = schema.GroupVersionResource{ + Group: "sys.bytetrade.io", + Version: "v1alpha1", + Resource: "systemenvs", +} + +func startSystemEnvWatch(ctx context.Context, dc dynamic.Interface, handle func(watch.EventType, map[string]any)) { + for { + // 1) List existing resources to establish initial state + list, err := dc.Resource(systemEnvGVR).List(ctx, metav1.ListOptions{}) + if err != nil { + select { + case <-time.After(3 * time.Second): + klog.V(3).Infof("systemenv list failed, retrying: %v", err) + continue + case <-ctx.Done(): + return + } + } + + for i := range list.Items { + handle(watch.Added, list.Items[i].UnstructuredContent()) + } + + rv := list.GetResourceVersion() + w, err := dc.Resource(systemEnvGVR).Watch(ctx, metav1.ListOptions{ResourceVersion: rv}) + if err != nil { + select { + case <-time.After(3 * time.Second): + klog.V(3).Infof("systemenv watch create failed, retrying: %v", err) + continue + case <-ctx.Done(): + return + } + } + + ch := w.ResultChan() + for { + select { + case e, ok := <-ch: + if !ok { + w.Stop() + goto REWATCH + } + if u, ok := e.Object.(interface{ UnstructuredContent() map[string]any }); ok { + handle(e.Type, u.UnstructuredContent()) + } + case <-ctx.Done(): + w.Stop() + return + } + } + + REWATCH: + select { + case <-time.After(1 * time.Second): + klog.V(4).Info("systemenv watch channel closed, restarting with fresh list") + continue + case <-ctx.Done(): + return + } + } +} diff --git a/daemon/pkg/commands/constants.go b/daemon/pkg/commands/constants.go index a2257bad1..fd247ae6a 100644 --- a/daemon/pkg/commands/constants.go +++ b/daemon/pkg/commands/constants.go @@ -12,10 +12,11 @@ import ( ) var ( - INSTALLED_VERSION = "" - KUBE_TYPE = "k3s" - COMMAND_BASE_DIR = "" // deprecated shell command base dir - CDN_URL = "https://dc3p1870nn3cj.cloudfront.net" + INSTALLED_VERSION = "" + KUBE_TYPE = "k3s" + COMMAND_BASE_DIR = "" // deprecated shell command base dir + OLARES_CDN_SERVICE = "https://cdn.olares.com" + OLARES_REMOTE_SERVICE = "https://api.olares.com" OS_ROOT_DIR = "/olares" INSTALLING_PID_FILE = "installing.pid" @@ -43,7 +44,6 @@ func Init() { baseDir := mustEnv("BASE_DIR") INSTALLED_VERSION = mustEnv("INSTALLED_VERSION") KUBE_TYPE = os.Getenv("KUBE_TYPE") - CDN_URL = os.Getenv("DOWNLOAD_CDN_URL") TERMINUS_BASE_DIR = baseDir INSTALLING_PID_FILE = filepath.Join(baseDir, INSTALLING_PID_FILE) diff --git a/daemon/pkg/commands/upgrade/download_cli.go b/daemon/pkg/commands/upgrade/download_cli.go index abcee953b..111e7d20f 100644 --- a/daemon/pkg/commands/upgrade/download_cli.go +++ b/daemon/pkg/commands/upgrade/download_cli.go @@ -4,11 +4,12 @@ import ( "context" "errors" "fmt" - "github.com/beclab/Olares/daemon/pkg/cluster/state" "os" "path/filepath" "runtime" + "github.com/beclab/Olares/daemon/pkg/cluster/state" + "github.com/beclab/Olares/daemon/pkg/commands" ) @@ -44,7 +45,7 @@ func (i *downloadCLI) Execute(ctx context.Context, p any) (res any, err error) { downloadURL := target.CliURL if downloadURL == "" { - downloadURL = fmt.Sprintf("%s/olares-cli-v%s_linux_%s.tar.gz", commands.CDN_URL, target.Version.Original(), arch) + downloadURL = fmt.Sprintf("%s/olares-cli-v%s_linux_%s.tar.gz", commands.OLARES_CDN_SERVICE, target.Version.Original(), arch) } tarFile := filepath.Join(destDir, fmt.Sprintf("olares-cli-v%s.tar.gz", target.Version.Original())) diff --git a/daemon/pkg/commands/upgrade/download_component.go b/daemon/pkg/commands/upgrade/download_component.go index 04358a7f1..c8a2f4d3a 100644 --- a/daemon/pkg/commands/upgrade/download_component.go +++ b/daemon/pkg/commands/upgrade/download_component.go @@ -4,13 +4,14 @@ import ( "context" "errors" "fmt" - "github.com/beclab/Olares/daemon/pkg/cluster/state" "io" "os" "path/filepath" "strings" "time" + "github.com/beclab/Olares/daemon/pkg/cluster/state" + "github.com/beclab/Olares/daemon/pkg/cli" "github.com/beclab/Olares/daemon/pkg/commands" "github.com/nxadm/tail" @@ -63,8 +64,8 @@ func (i *downloadComponent) Execute(ctx context.Context, p any) (res any, err er "--version", target.Version.Original(), "--base-dir", commands.TERMINUS_BASE_DIR, } - if commands.CDN_URL != "" { - params = append(params, "--download-cdn-url", commands.CDN_URL) + if commands.OLARES_CDN_SERVICE != "" { + params = append(params, "--download-cdn-url", commands.OLARES_CDN_SERVICE) } if err = cmd.RunAsync_(ctx, cli.TERMINUS_CLI, params...); err != nil { return nil, err diff --git a/daemon/pkg/commands/upgrade/download_wizard.go b/daemon/pkg/commands/upgrade/download_wizard.go index b1f7e9db2..9a3496945 100644 --- a/daemon/pkg/commands/upgrade/download_wizard.go +++ b/daemon/pkg/commands/upgrade/download_wizard.go @@ -4,15 +4,16 @@ import ( "context" "errors" "fmt" - "github.com/beclab/Olares/daemon/pkg/cli" - "github.com/beclab/Olares/daemon/pkg/cluster/state" - "github.com/beclab/Olares/daemon/pkg/commands" "io" "os" "path/filepath" "strings" "time" + "github.com/beclab/Olares/daemon/pkg/cli" + "github.com/beclab/Olares/daemon/pkg/cluster/state" + "github.com/beclab/Olares/daemon/pkg/commands" + "github.com/nxadm/tail" "k8s.io/klog/v2" ) @@ -66,8 +67,8 @@ func (i *downloadWizard) Execute(ctx context.Context, p any) (res any, err error "--version", version, "--base-dir", commands.TERMINUS_BASE_DIR, } - if commands.CDN_URL != "" { - params = append(params, "--download-cdn-url", commands.CDN_URL) + if commands.OLARES_CDN_SERVICE != "" { + params = append(params, "--download-cdn-url", commands.OLARES_CDN_SERVICE) } if target.WizardURL != "" { params = append(params, "--url-override", target.WizardURL) diff --git a/daemon/pkg/utils/jws.go b/daemon/pkg/utils/jws.go index 340416a58..f76383346 100644 --- a/daemon/pkg/utils/jws.go +++ b/daemon/pkg/utils/jws.go @@ -4,23 +4,19 @@ import ( "encoding/json" "fmt" "net/url" - "os" "github.com/beclab/Olares/cli/pkg/web5/jws" + "github.com/beclab/Olares/daemon/pkg/commands" "k8s.io/klog/v2" ) func ValidateJWS(token string) (bool, string, error) { - didGateDomain := os.Getenv("DID_GATE_URL") - if didGateDomain != "" { - newUrl := fmt.Sprintf("%s1.0/name/", didGateDomain) - _, err := url.Parse(newUrl) - if err != nil { - klog.Warningf("failed to parse DID gate URL in environment variable: %v", err) - } else { - jws.DIDGateURL = newUrl - } + didServiceURL, err := url.JoinPath(commands.OLARES_REMOTE_SERVICE, "/did/1.0/name/") + if err != nil { + klog.Errorf("failed to parse DID gate service URL: %v, Olares remote service: %s", err, commands.OLARES_REMOTE_SERVICE) + return false, "", err } + jws.DIDGateURL = didServiceURL // Validate the JWS token with a 20-minute expiration time checkJWS, err := jws.CheckJWS(token, 20*60*1000)