ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

怎么实现一个分布式kv系统-2-静态分区

2022-03-02 09:32:48  阅读:177  来源: 互联网

标签:http err 分区 db shard value kv key 分布式


摘要

本节要实现的有3点

  • 解析toml文件
  • 计算key的hash值
  • 将请求路由到对应的shard

编程实现

1. 定义&解析toml规则文件

定义sharding.toml文件

[[shards]]
name = "Moscow"
id = 0

[[shards]]
name = "Minsk"
id = 1

[[shards]]
name = "Kiev"
id = 2

导入解析toml的包

添加configFile参数解析

在main中读取配置文件

import (
	"github.com/BurntSushi/toml"
)

var (
	configFile = flag.String("config-file", "sharding.toml", "The configuration file")
)

func main() {
	flag.Parse()

	var c config.Config
	if _, err := toml.DecodeFile(*configFile, &c); err != nil {
		log.Fatalf("failed to decode config file(%q):%v", *configFile, err)
	}
}

2. 添加config模块

创建config/config.go

定义Config & Shard结构

package config

type Shard struct {
	Name string
	Id   string
}

type Config struct {
	Shards []Shard
}

3. 指定shard

先不自动shard,先手动指定shard

添加shard参数解析

检查shard是否存在

确认shard的id

// main.go
var (
  shardName   = flag.String("shardName", "Moscow", "The name of the shard")
)

ok, shard := c.ExistsShard(*shardName)
if !ok {
  log.Fatalf("shard %v not exists", shardName)
}

// config.go
func (c *Config) ExistsShard(name string) (bool, Shard) {
  var rc Shard
  for _, shard := range c.Shards {
    if shard.Name == name {
      return true, shard
    }
  }
  return false, rc
}

4. 写入数据到指定shard

写入规则:hash(key) % shardCounter 就是需要写入的分片

更新Server结构,添加shardCounter&shardIndex字段

// web.go
type Server struct {
	db           *db.Database
	shardCounter int
	shardIdex    int
}

func NewServer(db *db.Database, shardCounter, shardIdex int) *Server {
	return &Server{
		db:           db,
		shardCounter: shardCounter,
		shardIndex:    shardIndex,
	}
}

导入计算hash的包: hash/fnv包,计算shard

更新GetHandler、SetHandler,调用getShard来获取应该写入数据的分片。

// web.go
func (s *Server) getShard(key string) uint64 {
	h := fnv.New64()
	h.Write([]byte(key))
	return h.Sum64() % uint64(s.shardCounter)
}

func (s *Server) GetHandler(w http.ResponseWriter, r *http.Request) {
	r.ParseForm()
	key := r.Form.Get("key")
	value, err := s.db.GetKey(key)
	fmt.Fprintf(w, "%q:%q; shard: %d; %v Get Called\n", key, value, s.getShard(key), err)
}

func (s *Server) SetHandler(w http.ResponseWriter, r *http.Request) {
	r.ParseForm()
	key := r.Form.Get("key")
	value := r.Form.Get("value")
	err := s.db.SetKey(key, []byte(value))
	fmt.Fprintf(w, "%q:%q; shard: %d; %v; Set called\n", key, value, s.getShard(key), err)
}

4.获取所有节点的地址

以上虽然可以计算出分片,可以还没有办法路由给其他的分片。

首先,需要知道其他分片的地址。

其次,将信息传入到Server对象中。

接着,定义redirect函数,请求转发。

最后,改造GetHandle、GetHandle转发请求。

// config.go
func (c *Config) GetAddress() map[int]string {
	addrs := make(map[int]string)
	for _, addr := range c.Shards {
		addrs[addr.Id] = addr.Address
	}
	return addrs
}

// web.go
type Server struct {
	db           *db.Database
	shardCounter int
	shardIndex   int
	addrs        map[int]string
}

// main.go
addrs := c.GetAddress()
svr := web.NewServer(db, len(c.Shards), shard.Id, addrs)

// web.go
func (s *Server) redirect(w http.ResponseWriter, r *http.Request, shard int) error {
	resp, err := http.Get("http://" + s.addrs[shard] + r.RequestURI)
	if err != nil {
		w.WriteHeader(http.StatusInternalServerError)
		fmt.Fprintf(w, "Error redirect request:%v", err)
		return err
	}
	defer resp.Body.Close()
	io.Copy(w, resp.Body)
	return nil
}



func (s *Server) GetHandler(w http.ResponseWriter, r *http.Request) {
	r.ParseForm()
	key := r.Form.Get("key")
	shard := s.getShard(key)
	if shard != uint32(s.shardIndex) {
		err := s.redirect(w, r, int(shard))
		if err != nil {
			return
		} 
	} else {
		value, err := s.db.GetKey(key)
		fmt.Fprintf(w, "%q:%q; target shard:%d; current shard:%d; %v Get Called\n", key, value, shard, s.shardIndex, err)
	}
}

func (s *Server) SetHandler(w http.ResponseWriter, r *http.Request) {
	r.ParseForm()
	key := r.Form.Get("key")
	value := r.Form.Get("value")
	shard := s.getShard(key)
	if shard != uint32(s.shardIndex) {
		err := s.redirect(w, r, int(shard))
		if err != nil {
			return
		}
	} else {
		err := s.db.SetKey(key, []byte(value))
		fmt.Fprintf(w, "%q:%q; target shard:%d; current shard:%d; %v Set Called\n", key, value, shard, s.shardIndex, err)
	}
}

5.测试

get(){
    for key in a b c;do 
        echo -e "\n========= get $key========="
        for port in 8000 8001 8002;do 
            curl "http://127.0.0.1:$port/get?key=$key"
        done
    done
}

set(){
    for key in a b c;do 
        echo -e "\n========= set $key========="
        for port in 8000 8001 8002;do 
            curl "http://127.0.0.1:$port/set?key=$key&value=$key"
        done
    done
}

set
get

参考资料

本节完整代码:https://github.com/YuriyNasretdinov/distribkv/tree/part2

youtube视频:https://www.youtube.com/watch?v=5VK5tAyZDxQ&list=PLWwSgbaBp9XrMkjEhmTIC37WX2JfwZp7I&index=3

B站视频:https://www.bilibili.com/video/BV1nR4y177YM?p=2

标签:http,err,分区,db,shard,value,kv,key,分布式
来源: https://blog.csdn.net/luzhangting/article/details/123222101

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有