etcd | Golang 客户端用户手册

2021年3月17日 0 条评论 1.5k 次阅读 0 人点赞

etcd 安装

一、安装

地址:https://github.com/etcd-io/etcd/releases

下载对应版本解压上传到服务器

二、启动

启动命令如下:

./etcd --listen-client-urls http://0.0.0.0:2379 --advertise-client-urls http://0.0.0.0:2380 --listen-peer-urls http://0.0.0.0:2381
参数详解:
--listen-client-urls:对外提供服务的地址:比如 http://ip:2379,http://127.0.0.1:2379,客户端会连接到这里和 etcd 交互
--advertise-client-urls:对外公告的该节点客户端监听地址,这个值会告诉集群中其他节点
--listen-peer-urls:和同伴通信的地址,比如 http://ip:2381,如果有多个,使用逗号分隔。需要所有节点都能够访问,所以不要使用 localhost!

启动后可以通过发送 http 请求查看是否启动成功 例如:浏览器直接访问 http://ip:2379/version

三、 GO 安装包

命令 go get github.com/coreos/etcd/clientv3

发现报错

undefined: resolver.BuildOption
undefined: resolver.ResolveNowOption
undefined: balancer.PickOptions
undefined: balancer.PickOptions

查了下

大概是说原因是google.golang.org/grpc 1.26后的版本是不支持clientv3的。

也就是说要把这个改成1.26版本的就可以了。

具体操作方法是在go.mod里加上:

replace google.golang.org/grpc => google.golang.org/grpc v1.26.0

四、使用

(一) 、连接

cli, err := clientv3.New(clientv3.Config{
   Endpoints:   []string{"localhost:2378"},
   DialTimeout: 5 * time.Second,
})

要访问etcd第一件事就是创建client,它需要传入一个Config配置,这里传了2个选项:

  • Endpoints:etcd 的多个节点服务地址,因为我是单点测试,所以只传1个。
  • DialTimeout:创建client的首次连接超时,这里传了5秒,如果5秒都没有连接成功就会返回err;值得注意的是,一旦client创建成功,我们就不用再关心后续底层连接的状态了,client内部会重连。

当然,如果上述err != nil,那么一般情况下我们可以选择重试几次,或者退出程序(重启)。

这里重点需要了解一下client到底长什么样:

type Client struct {
   Cluster
   KV
   Lease
   Watcher
   Auth
   Maintenance

   // Username is a user name for authentication.
   Username string
   // Password is a password for authentication.
   Password string
   // contains filtered or unexported fields
}

Cluster、KV、Lease…,你会发现它们其实就代表了整个客户端的几大核心功能板块,分别用于:

  • Cluster:向集群里增加etcd服务端节点之类,属于管理员操作。
  • KV:我们主要使用的功能,即操作K-V。
  • Lease:租约相关操作,比如申请一个TTL=10秒的租约。
  • Watcher:观察订阅,从而监听最新的数据变化。
  • Auth:管理etcd的用户和权限,属于管理员操作。
  • Maintenance:维护etcd,比如主动迁移etcd的leader节点,属于管理员操作。

(二)、获取 KV 对象

实际上 client.KV 是一个interface,提供了关于k-v操作的所有方法:

type KV interface {
// Put puts a key-value pair into etcd.
// Note that key,value can be plain bytes array and string is
// an immutable representation of that bytes array.
// To get a string of bytes, do string([]byte{0x10, 0x20}).
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)

// Get retrieves keys.
// By default, Get will return the value for "key", if any.
// When passed WithRange(end), Get will return the keys in the range [key, end).
// When passed WithFromKey(), Get returns keys greater than or equal to key.
// When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision;
// if the required revision is compacted, the request will fail with ErrCompacted .
// When passed WithLimit(limit), the number of returned keys is bounded by limit.
// When passed WithSort(), the keys will be sorted.
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)

// Delete deletes a key, or optionally using WithRange(end), [key, end).
Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)

// Compact compacts etcd KV history before the given rev.
Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)

// Do applies a single Op on KV without a transaction.
// Do is useful when creating arbitrary operations to be issued at a
// later time; the user can range over the operations, calling Do to
// execute them. Get/Put/Delete, on the other hand, are best suited
// for when the operation should be issued at the time of declaration.
Do(ctx context.Context, op Op) (OpResponse, error)

// Txn creates a transaction.
Txn(ctx context.Context) Txn
}

但是我们并不是直接获取client.KV来使用,而是通过一个方法来获得一个经过装饰的KV实现(内置错误重试机制的高级KV):

kv := clientv3.NewKV(cli)

接下来,我们将通过kv对象操作etcd中的数据。

(三)、PUT 写入

putResp, err := kv.Put(context.TODO(),"/test/a", "something")

第一个参数context经常用 golang 的同学比较熟悉,很多API利用context实现取消操作,比如希望超过一定时间就让API立即返回,但是通常我们不需要用到。

后面2个参数分别是key和value,还记得etcd是k-v存储引擎么?对于etcd来说,key=/test/a只是一个字符串而已,但是对我们而言却可以模拟出目录层级关系,先继续往下看。

其函数原型如下:

// Put puts a key-value pair into etcd.
// Note that key,value can be plain bytes array and string is
// an immutable representation of that bytes array.
// To get a string of bytes, do string([]byte{0x10, 0x20}).
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)

除了我们传递的参数,还支持一个可变参数,主要是传递一些控制项来影响Put的行为,例如可以携带一个lease ID来支持key过期,这个后面再说。

上述Put操作返回的是PutResponse,不同的KV操作对应不同的response结构,这里顺便一提。

type (
CompactResponse pb.CompactionResponse
PutResponse     pb.PutResponse
GetResponse     pb.RangeResponse
DeleteResponse  pb.DeleteRangeResponse
TxnResponse     pb.TxnResponse
)

你可以通过 IDE 跳转到 PutResponse ,详细看看有哪些可用的信息:

type PutResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
// if prev_kv is set in the request, the previous key-value pair will be returned.
PrevKv *mvccpb.KeyValue `protobuf:"bytes,2,opt,name=prev_kv,json=prevKv" json:"prev_kv,omitempty"`
}

Header 里保存的主要是本次更新的 revision 信息,而 PrevKv 可以返回Put覆盖之前的value是什么(目前是nil,后面会说原因),打印给大家看看:

记得,我们需要判断err来确定操作是否成功。

我们再 Put 其他 2个 key,用于后续演示:

// 再写一个孩子
kv.Put(context.TODO(),"/test/b", "another")

// 再写一个同前缀的干扰项
kv.Put(context.TODO(), "/testxxx", "干扰")

现在理论上来说,/test目录下有2个孩子:a与b,而/testxxx并不是/test目录的孩子。

(四)、读取

我们可以先来读取一下/test/a:

getResp, err := kv.Get(context.TODO(), "/test/a")

其函数原型如下:

// Get retrieves keys.
// By default, Get will return the value for "key", if any.
// When passed WithRange(end), Get will return the keys in the range [key, end).
// When passed WithFromKey(), Get returns keys greater than or equal to key.
// When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision;
// if the required revision is compacted, the request will fail with ErrCompacted .
// When passed WithLimit(limit), the number of returned keys is bounded by limit.
// When passed WithSort(), the keys will be sorted.
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)

和Put类似,函数注释里提示我们可以传递一些控制参数来影响Get的行为,比如:WithFromKey表示读取从参数key开始递增的所有key,而不是读取单个key。

在上面的例子中,我没有传递opOption,所以就是获取key=/test/a的最新版本数据。

这里err并不能反馈出key是否存在(只能反馈出本次操作因为各种原因异常了),我们需要通过GetResponse(实际上是pb.RangeResponse)判断key是否存在:结构如下所示

type RangeResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
// kvs is the list of key-value pairs matched by the range request.
// kvs is empty when count is requested.
Kvs []*mvccpb.KeyValue `protobuf:"bytes,2,rep,name=kvs" json:"kvs,omitempty"`
// more indicates if there are more keys to return in the requested range.
More bool `protobuf:"varint,3,opt,name=more,proto3" json:"more,omitempty"`
// count is set to the number of keys within the range when requested.
Count int64 `protobuf:"varint,4,opt,name=count,proto3" json:"count,omitempty"`
}

Kvs 字段,保存了本次 Get 查询到的所有 k-v 对,因为上述例子只Get了一个单key,所以只需要判断一下 len(Kvs) 是否==1即可知道是否存在。

而 mvccpb.KeyValue 在上一篇博客中有所提及,它就是 etcd 在 bbolt 中保存的K-v对象:

type KeyValue struct {
// key is the key in bytes. An empty key is not allowed.
Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
// create_revision is the revision of last creation on this key.
CreateRevision int64 `protobuf:"varint,2,opt,name=create_revision,json=createRevision,proto3" json:"create_revision,omitempty"`
// mod_revision is the revision of last modification on this key.
ModRevision int64 `protobuf:"varint,3,opt,name=mod_revision,json=modRevision,proto3" json:"mod_revision,omitempty"`
// version is the version of the key. A deletion resets
// the version to zero and any modification of the key
// increases its version.
Version int64 `protobuf:"varint,4,opt,name=version,proto3" json:"version,omitempty"`
// value is the value held by the key, in bytes.
Value []byte `protobuf:"bytes,5,opt,name=value,proto3" json:"value,omitempty"`
// lease is the ID of the lease that attached to key.
// When the attached lease expires, the key will be deleted.
// If lease is 0, then no lease is attached to the key.
Lease int64 `protobuf:"varint,6,opt,name=lease,proto3" json:"lease,omitempty"`
}

至于 RangeResponse.More 和 Count,当我们使用 withLimit() 选项进行Get时会发挥作用,相当于翻页查询。

接下来,我们通过一个特别的Get选项,获取/test目录下的所有孩子:

rangeResp, err := kv.Get(context.TODO(), "/test/", clientv3.WithPrefix())

WithPrefix() 是指查找以/test/为前缀的所有key,因此可以模拟出查找子目录的效果。

我们知道 etcd 是一个有序的k-v存储,因此/test/为前缀的key总是顺序排列在一起。

withPrefix 实际上会转化为范围查询,它根据前缀/test/生成了一个key range,[“/test/”, “/test0”),为什么呢?因为比/大的字符是’0’,所以以/test0作为范围的末尾,就可以扫描到所有的/test/打头的key了。

在之前,我Put了一个 /testxxx 干扰项,因为不符合/test/前缀(注意末尾的/),所以就不会被这次Get获取到。但是,如果我查询的前缀是/test,那么/testxxx也会被扫描到,这就是 etcd k-v 模型导致的,编程时一定要特别注意。

打印 rangeResp.Kvs 可以看到2个孩子:

[key:"/test/a" create_revision:6 mod_revision:6 version:1 value:"i am /test/a"  key:"/test/b" create_revision:7 mod_revision:7 version:1 value:"i am /test/b" ]

五、获取 Lease 对象 (租约)

和获取KV对象一样,通过下面代码获取它:

lease := clientv3.NewLease(cli)
type Lease interface {
// Grant creates a new lease.
Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)

// Revoke revokes the given lease.
Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)

// TimeToLive retrieves the lease information of the given lease ID.
TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)

// Leases retrieves all leases.
Leases(ctx context.Context) (*LeaseLeasesResponse, error)

// KeepAlive keeps the given lease alive forever.
KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)

// KeepAliveOnce renews the lease once. In most of the cases, KeepAlive
// should be used instead of KeepAliveOnce.
KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)

// Close releases all resources Lease keeps for efficient communication
// with the etcd server.
Close() error
}

Lease提供了几个功能:

  • Grant:分配一个租约。
  • Revoke:释放一个租约。
  • TimeToLive:获取剩余TTL时间。
  • Leases:列举所有etcd中的租约。
  • KeepAlive:自动定时的续约某个租约。
  • KeepAliveOnce:为某个租约续约一次。
  • Close:貌似是关闭当前客户端建立的所有租约

(一)、 Grant 与 TTL

要想实现key自动过期,首先得创建一个租约,它有10秒的TTL:

grantResp, err := lease.Grant(context.TODO(), 10)

grantResp 中主要使用到了ID,也就是租约ID:

// LeaseGrantResponse wraps the protobuf message LeaseGrantResponse.
type LeaseGrantResponse struct {
*pb.ResponseHeader
ID    LeaseID
TTL   int64
Error string
}

接下来,我们用这个租约来Put一个会自动过期的Key:

kv.Put(context.TODO(), "/test/expireme", "gone...", clientv3.WithLease(grantResp.ID))

这里特别需要注意,有一种情况是在Put之前Lease已经过期了,那么这个Put操作会返回error,此时你需要重新分配Lease。请注意,这里的重新分配是重新创建一个租约,而不是在过期租约上进行 Keepalive

当我们实现服务注册时,需要主动给Lease进行续约,这需要调用 KeepAlive/KeepAliveOnce,你可以在一个循环中定时的调用:

keepResp, err := lease.KeepAliveOnce(context.TODO(), grantResp.ID)
// sleep一会..

keepResp 结构如下:

// LeaseKeepAliveResponse wraps the protobuf message LeaseKeepAliveResponse.
type LeaseKeepAliveResponse struct {
*pb.ResponseHeader
ID  LeaseID
TTL int64
}

没什么特别需要用到的字段。

KeepAlive和Put一样,如果在执行之前Lease就已经过期了,那么需要重新分配Lease。Etcd并没有提供API来实现原子的Put with Lease。

六、OP

Op字面意思就是”操作”,Get和Put都属于Op,只是为了简化用户开发而开放的特殊API。

实际上,KV有一个Do方法接受一个Op:

// Do applies a single Op on KV without a transaction.
// Do is useful when creating arbitrary operations to be issued at a
// later time; the user can range over the operations, calling Do to
// execute them. Get/Put/Delete, on the other hand, are best suited
// for when the operation should be issued at the time of declaration.
Do(ctx context.Context, op Op) (OpResponse, error)

其参数 Op 是一个抽象的操作,可以是 Put/Get/Delete…;而 OpResponse 是一个抽象的结果,可以是 PutResponse/GetResponse…

可以通过一些函数来分配Op:

  • func OpDelete(key string, opts …OpOption) Op
  • func OpGet(key string, opts …OpOption) Op
  • func OpPut(key, val string, opts …OpOption) Op
  • func OpTxn(cmps []Cmp, thenOps []Op, elseOps []Op) Op

其实和直接调用KV.Put,KV.GET没什么区别。

下面是一个例子:

op1 := clientv3.OpPut("/hi", "hello", clientv3.WithPrevKV())
opResp, err := kv.Do(context.TODO(), op1)

这里设置一个key=/hi,value=hello,希望结果中返回覆盖之前的value。

把这个op交给Do方法执行,返回的 opResp 结构如下:

type OpResponse struct {
put *PutResponse
get *GetResponse
del *DeleteResponse
txn *TxnResponse
}

你的操作是什么类型,你就用哪个指针来访问对应的结果,仅此而已。

七、Txn 事务

etcd中事务是原子执行的,只支持if … then … else …这种表达,能实现一些有意思的场景。

首先,我们需要开启一个事务,这是通过KV对象的方法实现的:

txn := kv.Txn(context.TODO())

我写了如下的测试代码,Then和Else还比较好理解,If是比较陌生的。

txnResp, err := txn.If(clientv3.Compare(clientv3.Value("/hi"), "=", "hello")).
Then(clientv3.OpGet("/hi")).
Else(clientv3.OpGet("/test/", clientv3.WithPrefix())).
Commit()

我们先看下 Txn 支持的方法:

type Txn interface {
// If takes a list of comparison. If all comparisons passed in succeed,
// the operations passed into Then() will be executed. Or the operations
// passed into Else() will be executed.
If(cs ...Cmp) Txn

// Then takes a list of operations. The Ops list will be executed, if the
// comparisons passed in If() succeed.
Then(ops ...Op) Txn

// Else takes a list of operations. The Ops list will be executed, if the
// comparisons passed in If() fail.
Else(ops ...Op) Txn

// Commit tries to commit the transaction.
Commit() (*TxnResponse, error)
}

Txn必须是这样使用的:If(满足条件) Then(执行若干Op) Else(执行若干Op)。

If中支持传入多个Cmp比较条件,如果所有条件满足,则执行Then中的Op(上一节介绍过Op),否则执行Else中的Op。

在我的例子中只传入了1个比较条件:

clientv3.Compare(clientv3.Value("/hi"), "=", "hello")

Value(“/hi”)是指key=/hi对应的value,它是条件表达式的”主语”,类型是Cmp:

func Value(key string) Cmp {
return Cmp{Key: []byte(key), Target: pb.Compare_VALUE}
}

这个Value(“/hi”)返回的Cmp表达了:”/hi这个key对应的value”。

接下来,利用Compare函数来继续为”主语”增加描述,形成了一个完整条件语句,即”/hi这个key对应的value”必须等于”hello”。

Compare函数实际上是对Value返回的Cmp对象进一步修饰,增加了”=”与”hello”两个描述信息:

func Compare(cmp Cmp, result string, v interface{}) Cmp {
var r pb.Compare_CompareResult

switch result {
case "=":
r = pb.Compare_EQUAL
case "!=":
r = pb.Compare_NOT_EQUAL
case ">":
r = pb.Compare_GREATER
case "<":
r = pb.Compare_LESS
default:
panic("Unknown result op")
}

cmp.Result = r
switch cmp.Target {
case pb.Compare_VALUE:
val, ok := v.(string)
if !ok {
panic("bad compare value")
}
cmp.TargetUnion = &pb.Compare_Value{Value: []byte(val)}
case pb.Compare_VERSION:
cmp.TargetUnion = &pb.Compare_Version{Version: mustInt64(v)}
case pb.Compare_CREATE:
cmp.TargetUnion = &pb.Compare_CreateRevision{CreateRevision: mustInt64(v)}
case pb.Compare_MOD:
cmp.TargetUnion = &pb.Compare_ModRevision{ModRevision: mustInt64(v)}
case pb.Compare_LEASE:
cmp.TargetUnion = &pb.Compare_Lease{Lease: mustInt64orLeaseID(v)}
default:
panic("Unknown compare type")
}
return cmp
}

Cmp可以用于描述”key=xxx的yyy属性,必须=、!=、<、>,kkk值”,比如:

  • key=xxx的value,必须!=,hello。
  • key=xxx的create版本号,必须=,11233。
  • key=xxx的lease id,必须=,12319231231238。

经过Compare函数修饰的Cmp对象,内部包含了完整的条件信息,传递给If函数即可。

类似于Value的函数用于指定yyy属性,有这么几个方法:

  • func CreateRevision(key string) Cmp:key=xxx的创建版本必须满足…
  • func LeaseValue(key string) Cmp:key=xxx的Lease ID必须满足…
  • func ModRevision(key string) Cmp:key=xxx的最后修改版本必须满足…
  • func Value(key string) Cmp:key=xxx的创建值必须满足…
  • func Version(key string) Cmp:key=xxx的累计更新次数必须满足…

最后Commit提交整个Txn事务,我们需要判断txnResp获知If条件是否成立:

if txnResp.Succeeded { // If = true
fmt.Println("~~~", txnResp.Responses[0].GetResponseRange().Kvs)
} else { // If =false
fmt.Println("!!!", txnResp.Responses[0].GetResponseRange().Kvs)
}

Succeed=true表示If条件成立,接下来我们需要获取Then或者Else中的OpResponse列表(因为可以传多个Op),可以看一下txnResp的结构:

type TxnResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
// succeeded is set to true if the compare evaluated to true or false otherwise.
Succeeded bool `protobuf:"varint,2,opt,name=succeeded,proto3" json:"succeeded,omitempty"`
// responses is a list of responses corresponding to the results from applying
// success if succeeded is true or failure if succeeded is false.
Responses []*ResponseOp `protobuf:"bytes,3,rep,name=responses" json:"responses,omitempty"`
}

每个Op有一个ResponseOp对象,而ResponseOp的结构我们在前面贴过,可以翻回去看看。

八、Watch

cli,err:=clientv3.New(clientv3.Config{
Endpoints:           []string{"127.0.0.1:2379"},
DialTimeout: 5*time.Second,
})

if err!=nil {
panic(err)
}
watcher:=clientv3.NewWatcher(cli)

defer watcher.Close()
dir:="/discovery/"

ctx,cancle:=context.WithCancel(context.Background())

go func() {
time.Sleep(time.Minute*30)
cancle()
}()

go func() {
// 监听所有以 /discovery/ 开头的 key
c:=watcher.Watch(ctx,dir,clientv3.WithPrefix())
// 管道中取事件,30 分钟后管道自动关闭
for watchResp:=range c{
for _,event:=range watchResp.Events{
switch event.Type {
case mvccpb.PUT:
fmt.Println("PUT 写入事件 key:"+string(event.Kv.Key)+"value:"+string(event.Kv.Value))
case mvccpb.DELETE:
fmt.Println("DELETE删除事件"+string(event.Kv.Key))
}
}
}
fmt.Println("管道关闭了")
}()
time.Sleep(time.Hour)
}

兰陵美酒郁金香

大道至简 Simplicity is the ultimate form of sophistication.

文章评论(0)

你必须 登录 才能发表评论