refactor(daemon): watch systemenvs as service addresses (#1920)

This commit is contained in:
dkeven 2025-10-14 19:10:05 +08:00 committed by GitHub
parent 4360e2591e
commit eb860449aa
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 202 additions and 31 deletions

View file

@ -82,7 +82,6 @@ func (g *GenerateTerminusdServiceEnv) Execute(runtime connector.Runtime) error {
"FrpAuthMethod": g.KubeConf.Arg.Frp.AuthMethod,
"TokenMaxAge": g.KubeConf.Arg.TokenMaxAge,
"MarketProvider": g.KubeConf.Arg.MarketProvider,
"TerminusGlobalEnvs": common.TerminusGlobalEnvs,
},
PrintContent: true,
}

View file

@ -23,7 +23,4 @@ FRP_AUTH_METHOD={{ .FrpAuthMethod }}
FRP_AUTH_TOKEN=
TOKEN_MAX_AGE={{ .TokenMaxAge }}
MARKET_PROVIDER={{ .MarketProvider }}
{{- range $key, $val := .TerminusGlobalEnvs }}
{{ $key }}={{ $val }}
{{- end }}
`)))

View file

@ -16,6 +16,7 @@ import (
"github.com/beclab/Olares/daemon/internel/watcher"
"github.com/beclab/Olares/daemon/internel/watcher/cert"
"github.com/beclab/Olares/daemon/internel/watcher/system"
"github.com/beclab/Olares/daemon/internel/watcher/systemenv"
"github.com/beclab/Olares/daemon/internel/watcher/upgrade"
"github.com/beclab/Olares/daemon/internel/watcher/usb"
"github.com/beclab/Olares/daemon/pkg/cluster/state"
@ -102,6 +103,7 @@ func main() {
usb.NewUmountWatcher(),
upgrade.NewUpgradeWatcher(),
cert.NewCertWatcher(),
systemenv.NewSystemEnvWatcher(),
}, func() {
if s != nil {
if err := s.Restart(); err != nil {

View file

@ -20,9 +20,9 @@ import (
func init() {
s := server.API
cmd := s.App.Group("command")
cmd.Post("/install", handlers.RequireSignature(
cmd.Post("/install",
handlers.WaitServerRunning(
handlers.RunCommand(handlers.PostTerminusInit, install.New))))
handlers.RunCommand(handlers.PostTerminusInit, install.New)))
cmd.Post("/uninstall", handlers.RequireSignature(
handlers.RequireOwner(

View file

@ -0,0 +1,174 @@
package systemenv
import (
"context"
"sync"
"time"
"github.com/beclab/Olares/daemon/internel/watcher"
"github.com/beclab/Olares/daemon/pkg/cluster/state"
"github.com/beclab/Olares/daemon/pkg/commands"
"github.com/beclab/Olares/daemon/pkg/utils"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"
)
var _ watcher.Watcher = &systemEnvWatcher{}
type systemEnvWatcher struct {
sync.Mutex
watcher.Watcher
running bool
cancel context.CancelFunc
}
func NewSystemEnvWatcher() watcher.Watcher {
return &systemEnvWatcher{}
}
func (w *systemEnvWatcher) Watch(ctx context.Context) {
w.Lock()
defer w.Unlock()
if state.CurrentState.TerminusState != state.TerminusRunning &&
state.CurrentState.TerminusState != state.Upgrading &&
state.CurrentState.TerminusState != state.Uninitialized &&
state.CurrentState.TerminusState != state.Initializing &&
state.CurrentState.TerminusState != state.InitializeFailed &&
state.CurrentState.TerminusState != state.SystemError {
if w.cancel != nil {
w.cancel()
w.cancel = nil
}
w.running = false
klog.V(4).Info("systemenv watcher stopped: cluster not running")
return
}
if w.running {
return
}
dc, err := utils.GetDynamicClient()
if err != nil {
klog.V(4).Infof("systemenv watcher: dynamic client not ready: %v", err)
return
}
execCtx, cancel := context.WithCancel(ctx)
w.cancel = cancel
w.running = true
go func() {
defer func() {
w.running = false
w.cancel = nil
klog.V(4).Info("systemenv watcher exited")
}()
startSystemEnvWatch(execCtx, dc, func(eventType watch.EventType, obj map[string]any) {
klog.V(5).Infof("systemenv event: %s", eventType)
if eventType != watch.Added && eventType != watch.Modified {
return
}
envName, _ := obj["envName"].(string)
if envName != "OLARES_SYSTEM_CDN_SERVICE" && envName != "OLARES_SYSTEM_REMOTE_SERVICE" {
return
}
val, _ := obj["value"].(string)
if val == "" {
if def, ok := obj["default"].(string); ok && def != "" {
val = def
}
}
if val == "" {
return
}
switch envName {
case "OLARES_SYSTEM_CDN_SERVICE":
if commands.OLARES_CDN_SERVICE != val {
old := commands.OLARES_CDN_SERVICE
commands.OLARES_CDN_SERVICE = val
klog.Infof("updated OLARES_CDN_SERVICE: %s -> %s", old, val)
}
case "OLARES_SYSTEM_REMOTE_SERVICE":
if commands.OLARES_REMOTE_SERVICE != val {
old := commands.OLARES_REMOTE_SERVICE
commands.OLARES_REMOTE_SERVICE = val
klog.Infof("updated OLARES_REMOTE_SERVICE: %s -> %s", old, val)
}
}
})
}()
}
var systemEnvGVR = schema.GroupVersionResource{
Group: "sys.bytetrade.io",
Version: "v1alpha1",
Resource: "systemenvs",
}
func startSystemEnvWatch(ctx context.Context, dc dynamic.Interface, handle func(watch.EventType, map[string]any)) {
for {
// 1) List existing resources to establish initial state
list, err := dc.Resource(systemEnvGVR).List(ctx, metav1.ListOptions{})
if err != nil {
select {
case <-time.After(3 * time.Second):
klog.V(3).Infof("systemenv list failed, retrying: %v", err)
continue
case <-ctx.Done():
return
}
}
for i := range list.Items {
handle(watch.Added, list.Items[i].UnstructuredContent())
}
rv := list.GetResourceVersion()
w, err := dc.Resource(systemEnvGVR).Watch(ctx, metav1.ListOptions{ResourceVersion: rv})
if err != nil {
select {
case <-time.After(3 * time.Second):
klog.V(3).Infof("systemenv watch create failed, retrying: %v", err)
continue
case <-ctx.Done():
return
}
}
ch := w.ResultChan()
for {
select {
case e, ok := <-ch:
if !ok {
w.Stop()
goto REWATCH
}
if u, ok := e.Object.(interface{ UnstructuredContent() map[string]any }); ok {
handle(e.Type, u.UnstructuredContent())
}
case <-ctx.Done():
w.Stop()
return
}
}
REWATCH:
select {
case <-time.After(1 * time.Second):
klog.V(4).Info("systemenv watch channel closed, restarting with fresh list")
continue
case <-ctx.Done():
return
}
}
}

View file

@ -12,10 +12,11 @@ import (
)
var (
INSTALLED_VERSION = ""
KUBE_TYPE = "k3s"
COMMAND_BASE_DIR = "" // deprecated shell command base dir
CDN_URL = "https://dc3p1870nn3cj.cloudfront.net"
INSTALLED_VERSION = ""
KUBE_TYPE = "k3s"
COMMAND_BASE_DIR = "" // deprecated shell command base dir
OLARES_CDN_SERVICE = "https://cdn.olares.com"
OLARES_REMOTE_SERVICE = "https://api.olares.com"
OS_ROOT_DIR = "/olares"
INSTALLING_PID_FILE = "installing.pid"
@ -43,7 +44,6 @@ func Init() {
baseDir := mustEnv("BASE_DIR")
INSTALLED_VERSION = mustEnv("INSTALLED_VERSION")
KUBE_TYPE = os.Getenv("KUBE_TYPE")
CDN_URL = os.Getenv("DOWNLOAD_CDN_URL")
TERMINUS_BASE_DIR = baseDir
INSTALLING_PID_FILE = filepath.Join(baseDir, INSTALLING_PID_FILE)

View file

@ -4,11 +4,12 @@ import (
"context"
"errors"
"fmt"
"github.com/beclab/Olares/daemon/pkg/cluster/state"
"os"
"path/filepath"
"runtime"
"github.com/beclab/Olares/daemon/pkg/cluster/state"
"github.com/beclab/Olares/daemon/pkg/commands"
)
@ -44,7 +45,7 @@ func (i *downloadCLI) Execute(ctx context.Context, p any) (res any, err error) {
downloadURL := target.CliURL
if downloadURL == "" {
downloadURL = fmt.Sprintf("%s/olares-cli-v%s_linux_%s.tar.gz", commands.CDN_URL, target.Version.Original(), arch)
downloadURL = fmt.Sprintf("%s/olares-cli-v%s_linux_%s.tar.gz", commands.OLARES_CDN_SERVICE, target.Version.Original(), arch)
}
tarFile := filepath.Join(destDir, fmt.Sprintf("olares-cli-v%s.tar.gz", target.Version.Original()))

View file

@ -4,13 +4,14 @@ import (
"context"
"errors"
"fmt"
"github.com/beclab/Olares/daemon/pkg/cluster/state"
"io"
"os"
"path/filepath"
"strings"
"time"
"github.com/beclab/Olares/daemon/pkg/cluster/state"
"github.com/beclab/Olares/daemon/pkg/cli"
"github.com/beclab/Olares/daemon/pkg/commands"
"github.com/nxadm/tail"
@ -63,8 +64,8 @@ func (i *downloadComponent) Execute(ctx context.Context, p any) (res any, err er
"--version", target.Version.Original(),
"--base-dir", commands.TERMINUS_BASE_DIR,
}
if commands.CDN_URL != "" {
params = append(params, "--download-cdn-url", commands.CDN_URL)
if commands.OLARES_CDN_SERVICE != "" {
params = append(params, "--download-cdn-url", commands.OLARES_CDN_SERVICE)
}
if err = cmd.RunAsync_(ctx, cli.TERMINUS_CLI, params...); err != nil {
return nil, err

View file

@ -4,15 +4,16 @@ import (
"context"
"errors"
"fmt"
"github.com/beclab/Olares/daemon/pkg/cli"
"github.com/beclab/Olares/daemon/pkg/cluster/state"
"github.com/beclab/Olares/daemon/pkg/commands"
"io"
"os"
"path/filepath"
"strings"
"time"
"github.com/beclab/Olares/daemon/pkg/cli"
"github.com/beclab/Olares/daemon/pkg/cluster/state"
"github.com/beclab/Olares/daemon/pkg/commands"
"github.com/nxadm/tail"
"k8s.io/klog/v2"
)
@ -66,8 +67,8 @@ func (i *downloadWizard) Execute(ctx context.Context, p any) (res any, err error
"--version", version,
"--base-dir", commands.TERMINUS_BASE_DIR,
}
if commands.CDN_URL != "" {
params = append(params, "--download-cdn-url", commands.CDN_URL)
if commands.OLARES_CDN_SERVICE != "" {
params = append(params, "--download-cdn-url", commands.OLARES_CDN_SERVICE)
}
if target.WizardURL != "" {
params = append(params, "--url-override", target.WizardURL)

View file

@ -4,23 +4,19 @@ import (
"encoding/json"
"fmt"
"net/url"
"os"
"github.com/beclab/Olares/cli/pkg/web5/jws"
"github.com/beclab/Olares/daemon/pkg/commands"
"k8s.io/klog/v2"
)
func ValidateJWS(token string) (bool, string, error) {
didGateDomain := os.Getenv("DID_GATE_URL")
if didGateDomain != "" {
newUrl := fmt.Sprintf("%s1.0/name/", didGateDomain)
_, err := url.Parse(newUrl)
if err != nil {
klog.Warningf("failed to parse DID gate URL in environment variable: %v", err)
} else {
jws.DIDGateURL = newUrl
}
didServiceURL, err := url.JoinPath(commands.OLARES_REMOTE_SERVICE, "/did/1.0/name/")
if err != nil {
klog.Errorf("failed to parse DID gate service URL: %v, Olares remote service: %s", err, commands.OLARES_REMOTE_SERVICE)
return false, "", err
}
jws.DIDGateURL = didServiceURL
// Validate the JWS token with a 20-minute expiration time
checkJWS, err := jws.CheckJWS(token, 20*60*1000)