hashmap复制

有时新的redis集群并不需要全部的数据,只需要复制一部分数据到另一个redis集群,且这些数据更新并不频繁。

执行命令

1
2
3
4
5
6
nohup ./redis-cluster-migrate \
-s 172.18.206.144:7001,172.18.206.144:7002,172.18.83.165:7003,172.18.83.165:7004,172.18.25.248:7005,172.18.25.248:7006 \
-d 172.16.0.11:7001,172.16.0.11:7002,172.16.0.22:7003,172.16.0.22:7004,172.16.0.33:7005,172.16.0.33:7006 \
-m "token:*" \
-i 500 \
-t 100 &

完整代码

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
package main

import (
	"context"
	"flag"
	"log"
	"strings"
	"sync"
	"time"

	"github.com/redis/go-redis/v9"
)

func main() {
	src := flag.String("s", "", "src redis addrs")
	dest := flag.String("d", "", "dest redis addrs")
	match := flag.String("m", "", "match expr")
	interval := flag.Int64("i", 100, "scan interval, ms")
	count := flag.Int64("c", 1000, "scan count,redis cmd qps < count * 1000 / interval * 2")
	ttl := flag.Int64("t", 0, "dest ttl, hour")
	flag.Parse()

	srcAddrs := strings.Split(*src, ",")
	log.Println("src addrs:")
	for _, addr := range srcAddrs {
		log.Println(addr)
	}
	descAddrs := strings.Split(*dest, ",")
	log.Println("dest addrs:")
	for _, addr := range descAddrs {
		log.Println(addr)
	}
	redisSrc := redis.NewClusterClient(&redis.ClusterOptions{
		Addrs:            srcAddrs,
		PoolSize:         100,
		PoolTimeout:      30 * time.Second,
		DisableIndentity: true,
	})
	redisDest := redis.NewClusterClient(&redis.ClusterOptions{
		Addrs:            descAddrs,
		PoolSize:         100,
		PoolTimeout:      30 * time.Second,
		DisableIndentity: true, // < 7 err: Unknown subcommand or wrong number of arguments for 'setinfo'
	})
	ctx := context.Background()
	var wg sync.WaitGroup
	redisSrc.ForEachMaster(ctx, func(ctx context.Context, client *redis.Client) error {
		var cursor uint64
		var keys []string
		var err error
		for {
			time.Sleep(time.Duration(*interval) * time.Millisecond)
			var nextCursor uint64
			keys, nextCursor, err = client.Scan(ctx, cursor, *match, *count).Result()
			if err != nil {
				if err == redis.Nil {
					log.Println("no more keys")
					return err
				}
				log.Println("scan err: ", err.Error())
				return err
			}
			log.Printf("scan: cusor %d @ %s, keys: %d\n", cursor, client.Options().Addr, len(keys))
			cursor = nextCursor
			if len(keys) == 0 {
				continue
			}
			// log.Printf("scan keys: %v \n", keys)
			wg.Add(1)
			go func(src *redis.Client, dest *redis.ClusterClient, keys []string) {
				defer wg.Done()
				for _, k := range keys {
					value, err := client.HGetAll(ctx, k).Result()
					if err == redis.Nil {
						continue
					}
					if err != nil {
						log.Printf("get key: %s err: %s", k, err.Error())
						return
					}
					err = redisDest.HMSet(ctx, k, value).Err()
					if err != nil {
						log.Println("set err: ", err.Error(), k, value)
						return
					}
					if *ttl != 0 {
						redisDest.Expire(ctx, k, time.Duration(*ttl)*time.Hour)
					}
					// log.Printf("set key: %s, value: %s \n", k, value)
				}
			}(client, redisDest, keys)
			if cursor == 0 {
				log.Println("cursor end")
				return nil
			}
		}
	})
	wg.Wait()
	log.Println("all routine done")
}