diff --git a/go.mod b/go.mod index 9d90eed51..cd7ae39d5 100644 --- a/go.mod +++ b/go.mod @@ -21,12 +21,8 @@ toolchain go1.24.11 require ( dubbo.apache.org/dubbo-go/v3 v3.3.0 - github.com/Masterminds/semver/v3 v3.2.1 - github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 - github.com/dubbogo/gost v1.14.0 + github.com/dubbogo/go-zookeeper v1.0.4 github.com/duke-git/lancet/v2 v2.3.6 - github.com/emicklei/go-restful/v3 v3.12.2 - github.com/envoyproxy/go-control-plane v0.13.4 github.com/envoyproxy/go-control-plane/envoy v1.32.4 github.com/fullstorydev/grpcurl v1.9.1 github.com/gin-contrib/sessions v1.0.4 @@ -35,27 +31,18 @@ require ( github.com/go-co-op/gocron v1.9.0 github.com/go-logr/logr v1.4.2 github.com/go-logr/zapr v1.3.0 - github.com/goburrow/cache v0.1.4 github.com/golang/protobuf v1.5.4 - github.com/google/go-cmp v0.7.0 github.com/google/uuid v1.6.0 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 - github.com/hoisie/mustache v0.0.0-20160804235033-6375acf62c69 github.com/jhump/protoreflect v1.16.0 - github.com/kelseyhightower/envconfig v1.4.0 - github.com/mitchellh/mapstructure v1.5.0 github.com/nacos-group/nacos-sdk-go/v2 v2.3.5 github.com/onsi/ginkgo/v2 v2.22.1 github.com/onsi/gomega v1.36.2 github.com/pkg/errors v0.9.1 - github.com/prometheus/client_golang v1.20.5 - github.com/slok/go-http-metrics v0.11.0 github.com/spf13/cobra v1.8.1 github.com/stretchr/testify v1.10.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 - golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67 - golang.org/x/sys v0.38.0 golang.org/x/text v0.31.0 google.golang.org/grpc v1.73.0 google.golang.org/protobuf v1.36.6 @@ -68,7 +55,6 @@ require ( k8s.io/apimachinery v0.34.2 k8s.io/client-go v0.34.2 k8s.io/klog/v2 v2.130.1 - moul.io/zapgorm2 v1.3.0 sigs.k8s.io/controller-runtime v0.19.4 sigs.k8s.io/yaml v1.6.0 ) @@ -110,8 +96,8 @@ require ( ) require ( - cel.dev/expr v0.23.0 // indirect filippo.io/edwards25519 v1.1.0 // indirect + github.com/BurntSushi/toml v1.3.2 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bufbuild/protocompile v0.10.0 // indirect github.com/bytedance/sonic v1.13.2 // indirect @@ -120,8 +106,8 @@ require ( github.com/cloudwego/base64x v0.1.5 // indirect github.com/cncf/xds/go v0.0.0-20250326154945-ae57f3c0d45f // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect - github.com/dubbogo/go-zookeeper v1.0.4 // indirect - github.com/envoyproxy/go-control-plane/ratelimit v0.1.0 // indirect + github.com/dubbogo/gost v1.14.0 // indirect + github.com/emicklei/go-restful/v3 v3.12.2 // indirect github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect github.com/fxamacker/cbor/v2 v2.9.0 // indirect github.com/gabriel-vasile/mimetype v1.4.8 // indirect @@ -138,6 +124,7 @@ require ( github.com/goccy/go-json v0.10.5 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/google/gnostic-models v0.7.0 // indirect + github.com/google/go-cmp v0.7.0 // indirect github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect github.com/gorilla/context v1.1.2 // indirect github.com/gorilla/securecookie v1.1.2 // indirect @@ -162,6 +149,7 @@ require ( github.com/pelletier/go-toml/v2 v2.2.4 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/prometheus/client_golang v1.20.5 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.57.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect @@ -175,17 +163,17 @@ require ( go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/arch v0.16.0 // indirect golang.org/x/crypto v0.45.0 // indirect + golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67 // indirect golang.org/x/net v0.47.0 // indirect golang.org/x/oauth2 v0.28.0 // indirect golang.org/x/sync v0.18.0 // indirect + golang.org/x/sys v0.38.0 // indirect golang.org/x/term v0.37.0 // indirect golang.org/x/time v0.9.0 // indirect golang.org/x/tools v0.38.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20250324211829-b45e905df463 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect - gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b // indirect k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 // indirect diff --git a/go.sum b/go.sum index 265bc3a8a..d29595994 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,3 @@ -cel.dev/expr v0.23.0 h1:wUb94w6OYQS4uXraxo9U+wUAs9jT47Xvl4iPgAwM2ss= -cel.dev/expr v0.23.0/go.mod h1:hLPLo1W4QUmuYdA72RBX06QTs6MXw941piREPl3Yfiw= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= @@ -42,8 +40,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8= github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= -github.com/Masterminds/semver/v3 v3.2.1 h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0= -github.com/Masterminds/semver/v3 v3.2.1/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/Workiva/go-datastructures v1.0.52 h1:PLSK6pwn8mYdaoaCZEMsXBpBotr4HHn9abU0yMQt0NI= github.com/Workiva/go-datastructures v1.0.52/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA= @@ -118,8 +114,6 @@ github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kd github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= -github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 h1:DklsrG3dyBCFEj5IhUbnKptjxatkF07cF2ak3yi77so= -github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= @@ -196,12 +190,8 @@ github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.m github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= -github.com/envoyproxy/go-control-plane v0.13.4 h1:zEqyPVyku6IvWCFwux4x9RxkLOMUL+1vC9xUFv5l2/M= -github.com/envoyproxy/go-control-plane v0.13.4/go.mod h1:kDfuBlDVsSj2MjrLEtRWtHlsWIFcGyB2RMO44Dc5GZA= github.com/envoyproxy/go-control-plane/envoy v1.32.4 h1:jb83lalDRZSpPWW2Z7Mck/8kXZ5CQAFYVjQcdVIr83A= github.com/envoyproxy/go-control-plane/envoy v1.32.4/go.mod h1:Gzjc5k8JcJswLjAx1Zm+wSYE20UrLtt7JZMWiWQXQEw= -github.com/envoyproxy/go-control-plane/ratelimit v0.1.0 h1:/G9QYbddjL25KvtKTv3an9lx6VBE2cnb8wp1vEGNYGI= -github.com/envoyproxy/go-control-plane/ratelimit v0.1.0/go.mod h1:Wk+tMFAFbCXaJPzVVHnPgRKdUdwW/KdbRt94AzgRee4= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v1.2.1 h1:DEo3O99U8j4hBFwbJfrz9VtgcDfUKS7KJ7spH3d86P8= github.com/envoyproxy/protoc-gen-validate v1.2.1/go.mod h1:d/C80l/jxXLdfEIhX1W2TmLfsJ31lvEjwamM4DxlWXU= @@ -263,8 +253,6 @@ github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqw github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= -github.com/goburrow/cache v0.1.4 h1:As4KzO3hgmzPlnaMniZU9+VmoNYseUhuELbxy9mRBfw= -github.com/goburrow/cache v0.1.4/go.mod h1:cDFesZDnIlrHoNlMYqqMpCRawuXulgx+y7mXU8HZ+/c= github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= @@ -380,8 +368,6 @@ github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ= github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I= github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= -github.com/hoisie/mustache v0.0.0-20160804235033-6375acf62c69 h1:umaj0TCQ9lWUUKy2DxAhEzPbwd0jnxiw1EI2z3FiILM= -github.com/hoisie/mustache v0.0.0-20160804235033-6375acf62c69/go.mod h1:zdLK9ilQRSMjSeLKoZ4BqUfBT7jswTGF8zRlKEsiRXA= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= @@ -400,7 +386,6 @@ github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg= github.com/jinzhu/copier v0.3.5/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= -github.com/jinzhu/now v1.1.4/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= @@ -427,8 +412,6 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88/go.mod h1:3w7q1U84EfirKl04SVQ/s7nPm1ZPhiXd34z40TNz36k= github.com/k0kubun/pp v3.0.1+incompatible/go.mod h1:GWse8YhT0p8pT4ir3ZgBbfZild3tgzSScAn6HmfYukg= -github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8= -github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= @@ -473,8 +456,6 @@ github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS4 github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY= github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= -github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= -github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -568,8 +549,6 @@ github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPx github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= -github.com/slok/go-http-metrics v0.11.0 h1:ABJUpekCZSkQT1wQrFvS4kGbhea/w6ndFJaWJeh3zL0= -github.com/slok/go-http-metrics v0.11.0/go.mod h1:ZGKeYG1ET6TEJpQx18BqAJAvxw9jBAZXCHU7bWQqqAc= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/assertions v1.1.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= @@ -669,7 +648,6 @@ go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= -go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= @@ -1023,8 +1001,6 @@ google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7Fc google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= -google.golang.org/genproto/googleapis/api v0.0.0-20250324211829-b45e905df463 h1:hE3bRWtU6uceqlh4fhrSnUyjKHMKB9KrTLLG+bc0ddM= -google.golang.org/genproto/googleapis/api v0.0.0-20250324211829-b45e905df463/go.mod h1:U90ffi8eUL9MwPcrJylN5+Mk2v3vuPDptd5yyNUiRR8= google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463 h1:e0AIkUUhxyBKh6ssZNrAMeqhA7RKUj42346d1y02i2g= google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= @@ -1102,7 +1078,6 @@ gorm.io/driver/postgres v1.6.0 h1:2dxzU8xJ+ivvqTRph34QX+WrRaJlmfyPqXmoGVjMBa4= gorm.io/driver/postgres v1.6.0/go.mod h1:vUw0mrGgrTK+uPHEhAdV4sfFELrByKVGnaVRkXDhtWo= gorm.io/driver/sqlite v1.5.7 h1:8NvsrhP0ifM7LX9G4zPB97NwovUakUxc+2V2uuf3Z1I= gorm.io/driver/sqlite v1.5.7/go.mod h1:U+J8craQU6Fzkcvu8oLeAQmi50TkwPEhHDEjQZXDah4= -gorm.io/gorm v1.23.6/go.mod h1:l2lP/RyAtc1ynaTjFksBde/O8v9oOGIApu2/xRitmZk= gorm.io/gorm v1.30.0 h1:qbT5aPv1UH8gI99OsRlvDToLxW5zR7FzS9acZDOZcgs= gorm.io/gorm v1.30.0/go.mod h1:8Z33v652h4//uMA76KjeDH8mJXPm1QNCYrMeatR0DOE= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= @@ -1124,8 +1099,6 @@ k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b h1:MloQ9/bdJyIu9lb1PzujOP k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b/go.mod h1:UZ2yyWbFTpuhSbFhv24aGNOdoRdJZgsIObGBUaYVsts= k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 h1:hwvWFiBzdWw1FhfY1FooPn3kzWuJ8tmbZBHi4zVsl1Y= k8s.io/utils v0.0.0-20250604170112-4c0f3b243397/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= -moul.io/zapgorm2 v1.3.0 h1:+CzUTMIcnafd0d/BvBce8T4uPn6DQnpIrz64cyixlkk= -moul.io/zapgorm2 v1.3.0/go.mod h1:nPVy6U9goFKHR4s+zfSo1xVFaoU7Qgd5DoCdOfzoCqs= nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= diff --git a/pkg/core/discovery/component.go b/pkg/core/discovery/component.go index 6cd27203f..89841a722 100644 --- a/pkg/core/discovery/component.go +++ b/pkg/core/discovery/component.go @@ -18,6 +18,7 @@ package discovery import ( + "context" "fmt" "math" "reflect" @@ -27,9 +28,11 @@ import ( "github.com/apache/dubbo-admin/pkg/common/bizerror" "github.com/apache/dubbo-admin/pkg/config/discovery" "github.com/apache/dubbo-admin/pkg/config/engine" + storecfg "github.com/apache/dubbo-admin/pkg/config/store" "github.com/apache/dubbo-admin/pkg/core/controller" "github.com/apache/dubbo-admin/pkg/core/discovery/subscriber" "github.com/apache/dubbo-admin/pkg/core/events" + "github.com/apache/dubbo-admin/pkg/core/leader" "github.com/apache/dubbo-admin/pkg/core/logger" meshresource "github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1" coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model" @@ -51,10 +54,12 @@ var _ Component = &discoveryComponent{} type Informers []controller.Informer type discoveryComponent struct { - configs []*discovery.Config - informers map[string]Informers - subscribers []events.Subscriber - subscriptionMgr events.SubscriptionManager + configs []*discovery.Config + informers map[string]Informers + subscribers []events.Subscriber + subscriptionMgr events.SubscriptionManager + leaderElection *leader.LeaderElection + needsLeaderElection bool } func (d *discoveryComponent) RequiredDependencies() []runtime.ComponentType { @@ -111,10 +116,74 @@ func (d *discoveryComponent) Init(ctx runtime.BuilderContext) error { if err != nil { return err } + + // Memory store runs single-replica; leader election is not needed. + if ctx.Config().Store.Type == storecfg.Memory { + return nil + } + + dbSrc, ok := storeComponent.(leader.DBSource) + if !ok { + return nil + } + db, hasDB := dbSrc.GetDB() + if !hasDB { + return nil + } + holderID, err := leader.GenerateHolderID() + if err != nil { + logger.Warnf("discovery: failed to generate holder ID, skipping leader election: %v", err) + return nil + } + le := leader.NewLeaderElection(db, runtime.ResourceDiscovery, holderID) + if err := le.EnsureTable(); err != nil { + logger.Warnf("discovery: failed to ensure leader lease table: %v", err) + return nil + } + d.leaderElection = le + d.needsLeaderElection = true + logger.Infof("discovery: leader election initialized (holder: %s)", holderID) return nil } func (d *discoveryComponent) Start(_ runtime.Runtime, ch <-chan struct{}) error { + if !d.needsLeaderElection { + return d.startBusinessLogic(ch) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + <-ch + cancel() + }() + + var leaderStopCh chan struct{} + + d.leaderElection.RunLeaderElection(ctx, ch, + func() { // onStartLeading: create a fresh stopCh for this leadership term + leaderStopCh = make(chan struct{}) + logger.Infof("discovery: became leader, starting business logic") + if err := d.startBusinessLogic(leaderStopCh); err != nil { + logger.Errorf("discovery: failed to start business logic: %v", err) + } + }, + func() { // onStopLeading: stop informers from the current term + logger.Warnf("discovery: lost leadership, stopping business logic") + if leaderStopCh != nil { + close(leaderStopCh) + leaderStopCh = nil + } + }, + ) + + return nil +} + +// startBusinessLogic starts subscribers and informers using the provided stopCh. +// When stopCh is closed all informer goroutines will exit. +func (d *discoveryComponent) startBusinessLogic(stopCh <-chan struct{}) error { // 1. subscribe resource changed events for _, sub := range d.subscribers { err := d.subscriptionMgr.Subscribe(sub) @@ -123,11 +192,12 @@ func (d *discoveryComponent) Start(_ runtime.Runtime, ch <-chan struct{}) error fmt.Sprintf("subscriber %s can not subscribe resource changed events", sub.Name())) } } + // 2. start informers for name, informers := range d.informers { for _, informer := range informers { - go informer.Run(ch) + go informer.Run(stopCh) } - logger.Infof("resource discvoery %s has started succesfully", name) + logger.Infof("resource discovery %s has started successfully", name) } return nil } diff --git a/pkg/core/engine/component.go b/pkg/core/engine/component.go index 992eec8ac..94d10c6c8 100644 --- a/pkg/core/engine/component.go +++ b/pkg/core/engine/component.go @@ -18,6 +18,7 @@ package engine import ( + "context" "fmt" "math" "reflect" @@ -26,9 +27,11 @@ import ( "github.com/apache/dubbo-admin/pkg/common/bizerror" enginecfg "github.com/apache/dubbo-admin/pkg/config/engine" + storecfg "github.com/apache/dubbo-admin/pkg/config/store" "github.com/apache/dubbo-admin/pkg/core/controller" "github.com/apache/dubbo-admin/pkg/core/engine/subscriber" "github.com/apache/dubbo-admin/pkg/core/events" + "github.com/apache/dubbo-admin/pkg/core/leader" "github.com/apache/dubbo-admin/pkg/core/logger" meshresource "github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1" "github.com/apache/dubbo-admin/pkg/core/runtime" @@ -52,6 +55,8 @@ type engineComponent struct { informers []controller.Informer subscriptionManager events.SubscriptionManager subscribers []events.Subscriber + leaderElection *leader.LeaderElection + needsLeaderElection bool } func newEngineComponent() Component { @@ -103,7 +108,35 @@ func (e *engineComponent) Init(ctx runtime.BuilderContext) error { if err = e.initSubscribers(eventBus); err != nil { return fmt.Errorf("init subscribers failed, %w", err) } - logger.Infof("resource engine %s has been inited successfully", e.name) + + defer logger.Infof("resource engine %s has been inited successfully", e.name) + + // Memory store runs single-replica; leader election is not needed. + if ctx.Config().Store.Type == storecfg.Memory { + return nil + } + + dbSrc, ok := storeComponent.(leader.DBSource) + if !ok { + return nil + } + db, hasDB := dbSrc.GetDB() + if !hasDB { + return nil + } + holderID, err := leader.GenerateHolderID() + if err != nil { + logger.Warnf("engine: failed to generate holder ID, skipping leader election: %v", err) + return nil + } + le := leader.NewLeaderElection(db, runtime.ResourceEngine, holderID) + if err := le.EnsureTable(); err != nil { + logger.Warnf("engine: failed to ensure leader lease table: %v", err) + return nil + } + e.leaderElection = le + e.needsLeaderElection = true + logger.Infof("engine: leader election initialized (holder: %s)", holderID) return nil } @@ -146,6 +179,43 @@ func (e *engineComponent) initSubscribers(eventbus events.EventBus) error { } func (e *engineComponent) Start(_ runtime.Runtime, ch <-chan struct{}) error { + if !e.needsLeaderElection { + return e.startBusinessLogic(ch) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + <-ch + cancel() + }() + + var leaderStopCh chan struct{} + + e.leaderElection.RunLeaderElection(ctx, ch, + func() { // onStartLeading: create a fresh stopCh for this leadership term + leaderStopCh = make(chan struct{}) + logger.Infof("engine: became leader, starting business logic") + if err := e.startBusinessLogic(leaderStopCh); err != nil { + logger.Errorf("engine: failed to start business logic: %v", err) + } + }, + func() { // onStopLeading: stop informers from the current term + logger.Warnf("engine: lost leadership, stopping business logic") + if leaderStopCh != nil { + close(leaderStopCh) + leaderStopCh = nil + } + }, + ) + + return nil +} + +// startBusinessLogic starts subscribers and informers using the provided stopCh. +// When stopCh is closed all informer goroutines will exit. +func (e *engineComponent) startBusinessLogic(stopCh <-chan struct{}) error { // 1. subscribe resource changed events for _, sub := range e.subscribers { if err := e.subscriptionManager.Subscribe(sub); err != nil { @@ -154,7 +224,7 @@ func (e *engineComponent) Start(_ runtime.Runtime, ch <-chan struct{}) error { } // 2. start informers for _, informer := range e.informers { - go informer.Run(ch) + go informer.Run(stopCh) } logger.Infof("resource engine %s has started successfully", e.name) return nil diff --git a/pkg/core/leader/db_source.go b/pkg/core/leader/db_source.go new file mode 100644 index 000000000..66d37146b --- /dev/null +++ b/pkg/core/leader/db_source.go @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package leader + +import "gorm.io/gorm" + +// DBSource is an interface for components that provide access to a database connection +// Used by leader election to access the shared database for leader lease management +type DBSource interface { + // GetDB returns the shared database connection and a boolean indicating if a DB is available + // Returns (db, true) if the component is backed by a database, (nil, false) otherwise + GetDB() (*gorm.DB, bool) +} diff --git a/pkg/core/leader/leader.go b/pkg/core/leader/leader.go new file mode 100644 index 000000000..2a06bab12 --- /dev/null +++ b/pkg/core/leader/leader.go @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package leader + +import ( + "context" + "fmt" + "os" + "sync/atomic" + "time" + + "github.com/google/uuid" + "gorm.io/gorm" + + "github.com/apache/dubbo-admin/pkg/core/logger" +) + +const ( + // DefaultLeaseDuration is the default duration for a leader lease + DefaultLeaseDuration = 30 * time.Second + // DefaultRenewInterval is the default interval for renewing the lease + DefaultRenewInterval = 10 * time.Second + // DefaultAcquireRetryInterval is the default retry interval for acquiring leadership + DefaultAcquireRetryInterval = 5 * time.Second +) + +// LeaderElection manages leader election for distributed components +// It uses database-based optimistic locking to ensure only one replica holds the lease at any time +type LeaderElection struct { + db *gorm.DB + component string + holderID string + leaseDuration time.Duration + renewInterval time.Duration + acquireRetry time.Duration + isLeader atomic.Bool + currentVersion int64 + stopCh chan struct{} +} + +// Option is a functional option for configuring LeaderElection +type Option func(*LeaderElection) + +// WithLeaseDuration sets the lease duration +func WithLeaseDuration(d time.Duration) Option { + return func(le *LeaderElection) { + le.leaseDuration = d + } +} + +// WithRenewInterval sets the renewal interval +func WithRenewInterval(d time.Duration) Option { + return func(le *LeaderElection) { + le.renewInterval = d + } +} + +// WithAcquireRetryInterval sets the acquisition retry interval +func WithAcquireRetryInterval(d time.Duration) Option { + return func(le *LeaderElection) { + le.acquireRetry = d + } +} + +// NewLeaderElection creates a new LeaderElection instance +func NewLeaderElection(db *gorm.DB, component, holderID string, opts ...Option) *LeaderElection { + le := &LeaderElection{ + db: db, + component: component, + holderID: holderID, + leaseDuration: DefaultLeaseDuration, + renewInterval: DefaultRenewInterval, + acquireRetry: DefaultAcquireRetryInterval, + stopCh: make(chan struct{}), + } + + for _, opt := range opts { + opt(le) + } + + return le +} + +// EnsureTable creates the leader_leases table if it doesn't exist +// This is idempotent and can be called multiple times +func (le *LeaderElection) EnsureTable() error { + return le.db.AutoMigrate(&LeaderLease{}) +} + +// TryAcquire attempts to acquire the leader lease from an expired holder. +// It only competes for leases that have already expired and does NOT renew an +// existing self-held lease — use Renew for that. +// Returns true if the current holder successfully acquired the lease. +func (le *LeaderElection) TryAcquire(ctx context.Context) bool { + now := time.Now() + expiresAt := now.Add(le.leaseDuration) + + // Only take over an expired lease; never pre-empt an active holder. + result := le.db.WithContext(ctx).Model(&LeaderLease{}). + Where("component = ? AND expires_at < ?", le.component, now). + Updates(map[string]interface{}{ + "holder_id": le.holderID, + "acquired_at": now, + "expires_at": expiresAt, + "version": gorm.Expr("version + 1"), + }) + + if result.Error != nil { + logger.Warnf("leader election: failed to update lease for component %s: %v", le.component, result.Error) + le.isLeader.Store(false) + return false + } + + // If the update succeeded (found a row to update) + if result.RowsAffected > 0 { + // Fetch the updated version + var lease LeaderLease + err := le.db.WithContext(ctx). + Where("component = ?", le.component). + First(&lease).Error + if err == nil { + le.currentVersion = lease.Version + } + le.isLeader.Store(true) + return true + } + + // No row was updated, try to insert a new record (lease doesn't exist) + result = le.db.WithContext(ctx).Create(&LeaderLease{ + Component: le.component, + HolderID: le.holderID, + AcquiredAt: now, + ExpiresAt: expiresAt, + Version: 1, + }) + + if result.Error != nil { + // If insertion fails, it means another replica just created it + // This is expected in concurrent scenarios + logger.Debugf("leader election: failed to insert lease for component %s (probably created by another replica): %v", le.component, result.Error) + le.isLeader.Store(false) + return false + } + + le.currentVersion = 1 + le.isLeader.Store(true) + return true +} + +// Renew attempts to renew the current leader lease +// Returns true if the renewal was successful +func (le *LeaderElection) Renew(ctx context.Context) bool { + if !le.isLeader.Load() { + return false + } + + now := time.Now() + expiresAt := now.Add(le.leaseDuration) + + result := le.db.WithContext(ctx).Model(&LeaderLease{}). + Where("component = ? AND holder_id = ? AND version = ?", le.component, le.holderID, le.currentVersion). + Updates(map[string]interface{}{ + "acquired_at": now, + "expires_at": expiresAt, + "version": gorm.Expr("version + 1"), + }) + + if result.Error != nil { + logger.Warnf("leader election: failed to renew lease for component %s: %v", le.component, result.Error) + le.isLeader.Store(false) + return false + } + + if result.RowsAffected > 0 { + le.currentVersion++ + return true + } + + // Lease was lost (likely held by another replica now) + logger.Warnf("leader election: lost leader lease for component %s (renewal failed, version mismatch)", le.component) + le.isLeader.Store(false) + return false +} + +// Release releases the leader lease for this holder +// This should be called when the holder voluntarily gives up leadership +func (le *LeaderElection) Release(ctx context.Context) { + le.isLeader.Store(false) + + expiresAt := time.Now().Add(-1 * time.Second) // Immediately expire the lease + + result := le.db.WithContext(ctx).Model(&LeaderLease{}). + Where("component = ? AND holder_id = ?", le.component, le.holderID). + Update("expires_at", expiresAt) + + if result.Error != nil { + logger.Warnf("leader election: failed to release lease for component %s: %v", le.component, result.Error) + } +} + +// IsLeader returns true if this holder currently holds the leader lease +func (le *LeaderElection) IsLeader() bool { + return le.isLeader.Load() +} + +// RunLeaderElection runs the leader election loop +// It blocks and runs onStartLeading/onStopLeading callbacks as leadership changes +// This is designed to be run in a separate goroutine +func (le *LeaderElection) RunLeaderElection(ctx context.Context, stopCh <-chan struct{}, + onStartLeading func(), onStopLeading func()) { + + ticker := time.NewTicker(le.acquireRetry) + defer ticker.Stop() + + renewTicker := time.NewTicker(le.renewInterval) + renewTicker.Stop() // Don't start renewal ticker yet + + isLeader := false + + for { + select { + case <-ctx.Done(): + if isLeader { + le.Release(context.Background()) + onStopLeading() + } + return + case <-stopCh: + if isLeader { + le.Release(context.Background()) + onStopLeading() + } + return + case <-ticker.C: + // Try to acquire leadership if not already leader + if !isLeader { + if le.TryAcquire(ctx) { + logger.Infof("leader election: component %s acquired leadership (holder: %s)", le.component, le.holderID) + isLeader = true + renewTicker.Reset(le.renewInterval) + onStartLeading() + } + } + case <-renewTicker.C: + // Renew leadership if currently leader + if isLeader { + if !le.Renew(ctx) { + logger.Warnf("leader election: component %s lost leadership (holder: %s)", le.component, le.holderID) + isLeader = false + renewTicker.Stop() + ticker.Reset(le.acquireRetry) + onStopLeading() + } + } + } + } +} + +// GenerateHolderID generates a unique holder ID combining hostname and UUID +func GenerateHolderID() (string, error) { + hostname, err := os.Hostname() + if err != nil { + hostname = "unknown" + } + return fmt.Sprintf("%s-%s", hostname, uuid.New().String()), nil +} diff --git a/pkg/core/leader/leader_test.go b/pkg/core/leader/leader_test.go new file mode 100644 index 000000000..525a1bb23 --- /dev/null +++ b/pkg/core/leader/leader_test.go @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package leader + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gorm.io/driver/sqlite" + "gorm.io/gorm" +) + +// setupTestDB creates an in-memory SQLite database for testing +func setupTestDB(t *testing.T) *gorm.DB { + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + require.NoError(t, err) + return db +} + +func TestLeaderElection_EnsureTable(t *testing.T) { + db := setupTestDB(t) + le := NewLeaderElection(db, "test-component", "holder-1") + + err := le.EnsureTable() + assert.NoError(t, err) + + // Verify the table exists by creating another instance and checking again + err = le.EnsureTable() + assert.NoError(t, err) +} + +func TestLeaderElection_TryAcquire(t *testing.T) { + db := setupTestDB(t) + le := NewLeaderElection(db, "test-component", "holder-1") + err := le.EnsureTable() + require.NoError(t, err) + + ctx := context.Background() + + // First attempt should succeed (no lease exists) + acquired := le.TryAcquire(ctx) + assert.True(t, acquired) + assert.True(t, le.IsLeader()) + + // Verify the lease was created + var lease LeaderLease + result := db.Where("component = ?", "test-component").First(&lease) + assert.NoError(t, result.Error) + assert.Equal(t, "holder-1", lease.HolderID) +} + +func TestLeaderElection_TryAcquire_AlreadyHeld(t *testing.T) { + db := setupTestDB(t) + le1 := NewLeaderElection(db, "test-component", "holder-1") + le2 := NewLeaderElection(db, "test-component", "holder-2") + + err := le1.EnsureTable() + require.NoError(t, err) + + ctx := context.Background() + + // First holder acquires + acquired := le1.TryAcquire(ctx) + assert.True(t, acquired) + + // Second holder tries to acquire (should fail because lease is not expired) + acquired = le2.TryAcquire(ctx) + assert.False(t, acquired) + assert.False(t, le2.IsLeader()) +} + +func TestLeaderElection_Renew(t *testing.T) { + db := setupTestDB(t) + le := NewLeaderElection(db, "test-component", "holder-1", + WithLeaseDuration(1*time.Second), + WithRenewInterval(500*time.Millisecond)) + + err := le.EnsureTable() + require.NoError(t, err) + + ctx := context.Background() + + // Acquire the lease first + acquired := le.TryAcquire(ctx) + assert.True(t, acquired) + + oldVersion := le.currentVersion + + // Renew should succeed + renewed := le.Renew(ctx) + assert.True(t, renewed) + assert.Greater(t, le.currentVersion, oldVersion) + + // Verify the lease was updated + var lease LeaderLease + result := db.Where("component = ?", "test-component").First(&lease) + assert.NoError(t, result.Error) + assert.Greater(t, lease.Version, int64(1)) +} + +func TestLeaderElection_Release(t *testing.T) { + db := setupTestDB(t) + le := NewLeaderElection(db, "test-component", "holder-1") + err := le.EnsureTable() + require.NoError(t, err) + + ctx := context.Background() + + // Acquire the lease + acquired := le.TryAcquire(ctx) + assert.True(t, acquired) + + // Release it + le.Release(ctx) + assert.False(t, le.IsLeader()) + + // Verify the lease has expired + var lease LeaderLease + result := db.Where("component = ?", "test-component").First(&lease) + assert.NoError(t, result.Error) + assert.True(t, lease.ExpiresAt.Before(time.Now())) +} + +func TestLeaderElection_Failover(t *testing.T) { + db := setupTestDB(t) + le1 := NewLeaderElection(db, "test-component", "holder-1", + WithLeaseDuration(100*time.Millisecond)) + le2 := NewLeaderElection(db, "test-component", "holder-2", + WithLeaseDuration(100*time.Millisecond)) + + err := le1.EnsureTable() + require.NoError(t, err) + + ctx := context.Background() + + // First holder acquires + acquired := le1.TryAcquire(ctx) + assert.True(t, acquired) + + // Second holder cannot acquire yet + acquired = le2.TryAcquire(ctx) + assert.False(t, acquired) + + // Wait for lease to expire + time.Sleep(150 * time.Millisecond) + + // Now second holder should be able to acquire + acquired = le2.TryAcquire(ctx) + assert.True(t, acquired) + assert.True(t, le2.IsLeader()) +} + +func TestGenerateHolderID(t *testing.T) { + id1, err := GenerateHolderID() + assert.NoError(t, err) + assert.NotEmpty(t, id1) + + id2, err := GenerateHolderID() + assert.NoError(t, err) + assert.NotEmpty(t, id2) + + // IDs should be different (due to UUID) + assert.NotEqual(t, id1, id2) +} + +func TestLeaderElection_IsLeader(t *testing.T) { + db := setupTestDB(t) + le := NewLeaderElection(db, "test-component", "holder-1") + err := le.EnsureTable() + require.NoError(t, err) + + ctx := context.Background() + + // Initially not leader + assert.False(t, le.IsLeader()) + + // After acquiring, should be leader + le.TryAcquire(ctx) + assert.True(t, le.IsLeader()) + + // After releasing, should not be leader + le.Release(ctx) + assert.False(t, le.IsLeader()) +} diff --git a/pkg/core/leader/model.go b/pkg/core/leader/model.go new file mode 100644 index 000000000..a6180955a --- /dev/null +++ b/pkg/core/leader/model.go @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package leader + +import "time" + +// LeaderLease is the GORM model for the leader_leases table +// It uses optimistic locking via the Version field to ensure atomic leader elections +type LeaderLease struct { + ID uint `gorm:"primaryKey;autoIncrement"` + Component string `gorm:"uniqueIndex;size:64;not null"` + HolderID string `gorm:"size:255;not null"` + AcquiredAt time.Time `gorm:"not null"` + ExpiresAt time.Time `gorm:"not null"` + Version int64 `gorm:"not null;default:0"` +} + +// TableName returns the table name for LeaderLease +func (LeaderLease) TableName() string { + return "leader_leases" +} diff --git a/pkg/core/store/component.go b/pkg/core/store/component.go index 6eadc7f29..2cae0097b 100644 --- a/pkg/core/store/component.go +++ b/pkg/core/store/component.go @@ -20,7 +20,11 @@ package store import ( "fmt" "math" + "reflect" + "gorm.io/gorm" + + "github.com/apache/dubbo-admin/pkg/core/leader" coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model" "github.com/apache/dubbo-admin/pkg/core/runtime" ) @@ -34,6 +38,12 @@ type Router interface { ResourceKindRoute(k coremodel.ResourceKind) (ResourceStore, error) } +// poolProvider is an internal interface for stores that provide DB access +// This avoids circular imports by not referencing dbcommon directly +type poolProvider interface { + Pool() interface{} // Returns *ConnectionPool, but we don't type it to avoid import +} + // The Component interface is composed of both functional interfaces and lifecycle interfaces type Component interface { runtime.Component @@ -48,6 +58,9 @@ type storeComponent struct { stores map[coremodel.ResourceKind]ManagedResourceStore } +// Compile-time check that storeComponent implements leader.DBSource interface +var _ leader.DBSource = &storeComponent{} + func (sc *storeComponent) RequiredDependencies() []runtime.ComponentType { return []runtime.ComponentType{ runtime.EventBus, // Store may need EventBus for event emission @@ -109,3 +122,32 @@ func (sc *storeComponent) ResourceKindRoute(k coremodel.ResourceKind) (ResourceS return nil, fmt.Errorf("%s is not supported by store yet", k) } + +// GetDB returns the shared DB connection if the underlying store is DB-backed +// Implements the leader.DBSource interface +func (sc *storeComponent) GetDB() (*gorm.DB, bool) { + // Try to get DB from any store that has a Pool() method (all GormStores share the same ConnectionPool) + for _, store := range sc.stores { + pp, ok := store.(poolProvider) + if !ok { + continue + } + pool := pp.Pool() + if pool == nil { + continue + } + // Use reflection to call GetDB() on the pool to avoid importing dbcommon + poolVal := reflect.ValueOf(pool) + getDBMethod := poolVal.MethodByName("GetDB") + if !getDBMethod.IsValid() { + continue + } + result := getDBMethod.Call(nil) + if len(result) > 0 { + if db, ok := result[0].Interface().(*gorm.DB); ok { + return db, true + } + } + } + return nil, false +} diff --git a/pkg/store/dbcommon/gorm_store.go b/pkg/store/dbcommon/gorm_store.go index e823189b5..474d5eb6d 100644 --- a/pkg/store/dbcommon/gorm_store.go +++ b/pkg/store/dbcommon/gorm_store.go @@ -582,3 +582,9 @@ func (gs *GormStore) rebuildIndices() error { logger.Infof("Rebuilt indices for %s: loaded %d resources", gs.kind.ToString(), len(models)) return nil } + +// Pool returns the connection pool for this store +// Used by other components (e.g., leader election) that need direct DB access +func (gs *GormStore) Pool() *ConnectionPool { + return gs.pool +}