ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

go任务调度7(etcd的watch的用法)

2019-04-20 18:55:52  阅读:274  来源: 互联网

标签:clientv3 job7 am watch etcd go 任务调度 Revision


监听etcd中的kv变化,常用来做集群中的配置下发、状态同步,非常有价值。

package main

import (
    "go.etcd.io/etcd/clientv3"
    "time"
    "fmt"
    "context"
    "go.etcd.io/etcd/mvcc/mvccpb"
)

func main() {
    var (
        config clientv3.Config
        client *clientv3.Client
        err error
        kv clientv3.KV
        watcher clientv3.Watcher
        getResp *clientv3.GetResponse
        watchStartRevision int64
        watchRespChan <-chan clientv3.WatchResponse
        watchResp clientv3.WatchResponse
        event *clientv3.Event
    )

    // 客户端配置
    config = clientv3.Config{
        Endpoints: []string{"0.0.0.0:2379"},
        DialTimeout: 5 * time.Second,
    }

    // 建立连接
    if client, err = clientv3.New(config); err != nil {
        fmt.Println(err)
        return
    }

    // KV
    kv = clientv3.NewKV(client)

    // 模拟etcd中KV的变化
    go func() {
        for {
            kv.Put(context.TODO(), "/cron/jobs/job7", "i am job7")

            kv.Delete(context.TODO(), "/cron/jobs/job7")

            time.Sleep(1 * time.Second)
        }
    }()

    // 先GET到当前的值,并监听后续变化
    if getResp, err = kv.Get(context.TODO(), "/cron/jobs/job7"); err != nil {
        fmt.Println(err)
        return
    }

    // 现在key是存在的
    if len(getResp.Kvs) != 0 {
        fmt.Println("当前值:", string(getResp.Kvs[0].Value))
    }

    // 当前etcd集群事务ID, 单调递增的(监听/cron/jobs/job7后续的变化,也就是通过监听版本变化)
    watchStartRevision = getResp.Header.Revision + 1

    // 创建一个watcher(监听器)
    watcher = clientv3.NewWatcher(client)

    // 启动监听
    fmt.Println("从该版本向后监听:", watchStartRevision)

    ctx, cancelFunc := context.WithCancel(context.TODO())
    //5秒钟后取消
    time.AfterFunc(5 * time.Second, func() {
        cancelFunc()
    })
    //这里ctx感知到cancel则会关闭watcher
    watchRespChan = watcher.Watch(ctx, "/cron/jobs/job7", clientv3.WithRev(watchStartRevision))

    // 处理kv变化事件
    for watchResp = range watchRespChan {
        for _, event = range watchResp.Events {
            switch event.Type {
            case mvccpb.PUT:
                fmt.Println("修改为:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
            case mvccpb.DELETE:
                fmt.Println("删除了", "Revision:", event.Kv.ModRevision)
            }
        }
    }
}

输出:

[root@bogon etcd]# go run demo8.go
当前值: i am job7
从该版本向后监听: 33
删除了 Revision: 33
修改为: i am job7 Revision: 34 34
删除了 Revision: 35
修改为: i am job7 Revision: 36 36
删除了 Revision: 37
修改为: i am job7 Revision: 38 38
删除了 Revision: 39
修改为: i am job7 Revision: 40 40
删除了 Revision: 41
[root@bogon etcd]#

标签:clientv3,job7,am,watch,etcd,go,任务调度,Revision
来源: https://blog.51cto.com/5660061/2381962

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有