Skip to content

Commit

Permalink
feat(v0.4): v0.4
Browse files Browse the repository at this point in the history
  • Loading branch information
Ccheers committed Dec 21, 2024
1 parent 86a8a64 commit 2b7df7f
Show file tree
Hide file tree
Showing 15 changed files with 1,435 additions and 21 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@
# Go workspace file
go.work

.idea
.idea
.env
23 changes: 23 additions & 0 deletions adapter/kratos/discovery/dummy/dummy_registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package dummy

import (
"context"
"github.com/go-kratos/kratos/v2/registry"
)

var _ registry.Registrar = (*Registrar)(nil)

type Registrar struct {
}

func NewRegistrar() *Registrar {
return &Registrar{}
}

func (x *Registrar) Register(ctx context.Context, service *registry.ServiceInstance) error {
return nil
}

func (x *Registrar) Deregister(ctx context.Context, service *registry.ServiceInstance) error {
return nil
}
231 changes: 231 additions & 0 deletions adapter/kratos/discovery/etcd/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package etcd

import (
"context"
"fmt"
"math/rand"
"time"

"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/registry"
clientv3 "go.etcd.io/etcd/client/v3"
)

var (
_ registry.Registrar = (*Registry)(nil)
_ registry.Discovery = (*Registry)(nil)
)

// Option is etcd registry option.
type Option func(o *options)

type options struct {
ctx context.Context
// 兼容go-zero 丢弃 namespace 概念
// namespace string
ttl time.Duration
maxRetry int
}

// Context with registry context.
func Context(ctx context.Context) Option {
return func(o *options) { o.ctx = ctx }
}

//Namespace with registry namespace.
//func Namespace(ns string) Option {
// return func(o *options) { o.namespace = ns }
//}

// RegisterTTL with register ttl.
func RegisterTTL(ttl time.Duration) Option {
return func(o *options) { o.ttl = ttl }
}

func MaxRetry(num int) Option {
return func(o *options) { o.maxRetry = num }
}

// Registry is etcd registry.
type Registry struct {
log *log.Helper

opts *options
client *clientv3.Client
kv clientv3.KV
lease clientv3.Lease
/*
ctxMap is used to store the context cancel function of each service instance.
When the service instance is deregistered, the corresponding context cancel function is called to stop the heartbeat.
*/
ctxMap map[*registry.ServiceInstance]context.CancelFunc
}

// New creates etcd registry
func New(client *clientv3.Client, logger log.Logger, opts ...Option) (r *Registry) {
op := &options{
ctx: context.Background(),
// namespace: "/microservices",
ttl: time.Second * 15,
maxRetry: 5,
}
for _, o := range opts {
o(op)
}
return &Registry{
log: log.NewHelper(logger),
opts: op,
client: client,
kv: clientv3.NewKV(client),
lease: nil,
ctxMap: make(map[*registry.ServiceInstance]context.CancelFunc),
}
}

// Register the registration.
func (r *Registry) Register(ctx context.Context, service *registry.ServiceInstance) error {
key := fmt.Sprintf("%s/%s", service.Name, service.ID)
value, err := marshal(service)
if err != nil {
return err
}
if r.lease != nil {
r.lease.Close()
}

r.lease = clientv3.NewLease(r.client)
leaseID, err := r.registerWithKV(ctx, key, value)
if err != nil {
return err
}

hctx, cancel := context.WithCancel(r.opts.ctx)
r.ctxMap[service] = cancel
go r.heartBeat(hctx, leaseID, key, value)
return nil
}

// Deregister the registration.
func (r *Registry) Deregister(ctx context.Context, service *registry.ServiceInstance) error {
defer func() {
if r.lease != nil {
r.lease.Close()
}
}()
// cancel heartbeat
if cancel, ok := r.ctxMap[service]; ok {
cancel()
delete(r.ctxMap, service)
}
key := fmt.Sprintf("%s/%s", service.Name, service.ID)
_, err := r.client.Delete(ctx, key)
return err
}

// GetService return the service instances in memory according to the service name.
func (r *Registry) GetService(ctx context.Context, name string) ([]*registry.ServiceInstance, error) {
key := fmt.Sprintf("%s", name)
resp, err := r.kv.Get(ctx, key, clientv3.WithPrefix())
if err != nil {
return nil, err
}
items := make([]*registry.ServiceInstance, 0, len(resp.Kvs))
for _, kv := range resp.Kvs {
value, err := unmarshal(kv.Value)
if err != nil {
r.log.Errorf("unmarshal value failed: %v, key=%s", err, kv.Key)
continue
}

items = append(items, value)
}
return items, nil
}

// Watch creates a watcher according to the service name.
func (r *Registry) Watch(ctx context.Context, name string) (registry.Watcher, error) {
key := fmt.Sprintf("%s", name)
return newWatcher(ctx, r.log, key, name, r.client)
}

// registerWithKV create a new lease, return current leaseID
func (r *Registry) registerWithKV(ctx context.Context, key string, value string) (clientv3.LeaseID, error) {
grant, err := r.lease.Grant(ctx, int64(r.opts.ttl.Seconds()))
if err != nil {
return 0, err
}
_, err = r.client.Put(ctx, key, value, clientv3.WithLease(grant.ID))
if err != nil {
return 0, err
}
return grant.ID, nil
}

func (r *Registry) heartBeat(ctx context.Context, leaseID clientv3.LeaseID, key string, value string) {
curLeaseID := leaseID
kac, err := r.client.KeepAlive(ctx, leaseID)
if err != nil {
curLeaseID = 0
}
random := rand.New(rand.NewSource(time.Now().Unix()))

for {
if curLeaseID == 0 {
// try to registerWithKV
var retreat []int
for retryCnt := 0; retryCnt < r.opts.maxRetry; retryCnt++ {
if ctx.Err() != nil {
return
}
// prevent infinite blocking
idChan := make(chan clientv3.LeaseID, 1)
errChan := make(chan error, 1)
cancelCtx, cancel := context.WithCancel(ctx)
go func() {
defer cancel()
id, registerErr := r.registerWithKV(cancelCtx, key, value)
if registerErr != nil {
errChan <- registerErr
} else {
idChan <- id
}
}()

select {
case <-time.After(3 * time.Second):
cancel()
continue
case <-errChan:
continue
case curLeaseID = <-idChan:
}

kac, err = r.client.KeepAlive(ctx, curLeaseID)
if err == nil {
break
}
retreat = append(retreat, 1<<retryCnt)
time.Sleep(time.Duration(retreat[random.Intn(len(retreat))]) * time.Second)
}
if _, ok := <-kac; !ok {
// retry failed
return
}
}

select {
case _, ok := <-kac:
if !ok {
if ctx.Err() != nil {
// channel closed due to context cancel
return
}
// need to retry registration
curLeaseID = 0
continue
}
case <-r.opts.ctx.Done():
return
}
}
}
Loading

0 comments on commit 2b7df7f

Please sign in to comment.