1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
| package main
import (
"context"
"flag"
"log"
"strings"
"sync"
"time"
"github.com/redis/go-redis/v9"
)
func main() {
src := flag.String("s", "", "src redis addrs")
dest := flag.String("d", "", "dest redis addrs")
match := flag.String("m", "", "match expr")
interval := flag.Int64("i", 100, "scan interval, ms")
count := flag.Int64("c", 1000, "scan count,redis cmd qps < count * 1000 / interval * 2")
ttl := flag.Int64("t", 0, "dest ttl, hour")
flag.Parse()
srcAddrs := strings.Split(*src, ",")
log.Println("src addrs:")
for _, addr := range srcAddrs {
log.Println(addr)
}
descAddrs := strings.Split(*dest, ",")
log.Println("dest addrs:")
for _, addr := range descAddrs {
log.Println(addr)
}
redisSrc := redis.NewClusterClient(&redis.ClusterOptions{
Addrs: srcAddrs,
PoolSize: 100,
PoolTimeout: 30 * time.Second,
DisableIndentity: true,
})
redisDest := redis.NewClusterClient(&redis.ClusterOptions{
Addrs: descAddrs,
PoolSize: 100,
PoolTimeout: 30 * time.Second,
DisableIndentity: true, // < 7 err: Unknown subcommand or wrong number of arguments for 'setinfo'
})
ctx := context.Background()
var wg sync.WaitGroup
redisSrc.ForEachMaster(ctx, func(ctx context.Context, client *redis.Client) error {
var cursor uint64
var keys []string
var err error
for {
time.Sleep(time.Duration(*interval) * time.Millisecond)
var nextCursor uint64
keys, nextCursor, err = client.Scan(ctx, cursor, *match, *count).Result()
if err != nil {
if err == redis.Nil {
log.Println("no more keys")
return err
}
log.Println("scan err: ", err.Error())
return err
}
log.Printf("scan: cusor %d @ %s, keys: %d\n", cursor, client.Options().Addr, len(keys))
cursor = nextCursor
if len(keys) == 0 {
continue
}
// log.Printf("scan keys: %v \n", keys)
wg.Add(1)
go func(src *redis.Client, dest *redis.ClusterClient, keys []string) {
defer wg.Done()
for _, k := range keys {
value, err := client.HGetAll(ctx, k).Result()
if err == redis.Nil {
continue
}
if err != nil {
log.Printf("get key: %s err: %s", k, err.Error())
return
}
err = redisDest.HMSet(ctx, k, value).Err()
if err != nil {
log.Println("set err: ", err.Error(), k, value)
return
}
if *ttl != 0 {
redisDest.Expire(ctx, k, time.Duration(*ttl)*time.Hour)
}
// log.Printf("set key: %s, value: %s \n", k, value)
}
}(client, redisDest, keys)
if cursor == 0 {
log.Println("cursor end")
return nil
}
}
})
wg.Wait()
log.Println("all routine done")
}
|