Merge branch '7167-keep-rsync-test-setup' into 7167-keep-rsync
[arvados.git] / tools / keep-rsync / keep-rsync.go
1 package main
2
3 import (
4         "bytes"
5         "flag"
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "git.curoverse.com/arvados.git/sdk/go/keepclient"
8         "io/ioutil"
9         "log"
10         "regexp"
11         "strings"
12         "time"
13 )
14
15 // keep-rsync arguments
16 var (
17         srcConfig           arvadosclient.APIConfig
18         dstConfig           arvadosclient.APIConfig
19         blobSigningKey      string
20         srcKeepServicesJSON string
21         dstKeepServicesJSON string
22         replications        int
23         prefix              string
24 )
25
26 func main() {
27         var srcConfigFile string
28         var dstConfigFile string
29
30         flag.StringVar(
31                 &srcConfigFile,
32                 "src-config-file",
33                 "",
34                 "Source configuration filename with full path that contains "+
35                         "an ARVADOS_API_TOKEN which is a valid datamanager token recognized by the source keep servers, "+
36                         "ARVADOS_API_HOST, ARVADOS_API_HOST_INSECURE, ARVADOS_EXTERNAL_CLIENT and ARVADOS_BLOB_SIGNING_KEY.")
37
38         flag.StringVar(
39                 &dstConfigFile,
40                 "dst-config-file",
41                 "",
42                 "Destination configuration filename with full path that contains "+
43                         "an ARVADOS_API_TOKEN which is a valid datamanager token recognized by the destination keep servers, "+
44                         "ARVADOS_API_HOST, ARVADOS_API_HOST_INSECURE, ARVADOS_EXTERNAL_CLIENT and ARVADOS_BLOB_SIGNING_KEY.")
45
46         flag.StringVar(
47                 &srcKeepServicesJSON,
48                 "src-keep-services-json",
49                 "",
50                 "An optional list of available source keepservices. "+
51                         "If not provided, this list is obtained from api server configured in src-config-file.")
52
53         flag.StringVar(
54                 &dstKeepServicesJSON,
55                 "dst-keep-services-json",
56                 "",
57                 "An optional list of available destination keepservices. "+
58                         "If not provided, this list is obtained from api server configured in dst-config-file.")
59
60         flag.IntVar(
61                 &replications,
62                 "replications",
63                 0,
64                 "Number of replications to write to the destination.")
65
66         flag.StringVar(
67                 &prefix,
68                 "prefix",
69                 "",
70                 "Index prefix")
71
72         flag.Parse()
73
74         var err error
75
76         // Load config
77         if srcConfigFile == "" {
78                 log.Fatal("-src-config-file must be specified.")
79         }
80         srcConfig, err = readConfigFromFile(srcConfigFile)
81         if err != nil {
82                 log.Fatal("Error reading source configuration: %s", err.Error())
83         }
84
85         if dstConfigFile == "" {
86                 log.Fatal("-dst-config-file must be specified.")
87         }
88         dstConfig, err = readConfigFromFile(dstConfigFile)
89         if err != nil {
90                 log.Fatal("Error reading destination configuration: %s", err.Error())
91         }
92
93         // Initialize keep-rsync
94         err = initializeKeepRsync()
95         if err != nil {
96                 log.Fatal("Error configuring keep-rsync: %s", err.Error())
97         }
98
99         // Copy blocks not found in dst from src
100         performKeepRsync()
101 }
102
103 var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
104
105 // Reads config from file
106 func readConfigFromFile(filename string) (arvadosclient.APIConfig, error) {
107         var config arvadosclient.APIConfig
108
109         content, err := ioutil.ReadFile(filename)
110         if err != nil {
111                 return config, err
112         }
113
114         lines := strings.Split(string(content), "\n")
115         for _, line := range lines {
116                 if line == "" {
117                         continue
118                 }
119                 kv := strings.Split(line, "=")
120
121                 switch kv[0] {
122                 case "ARVADOS_API_TOKEN":
123                         config.APIToken = kv[1]
124                 case "ARVADOS_API_HOST":
125                         config.APIHost = kv[1]
126                 case "ARVADOS_API_HOST_INSECURE":
127                         config.APIHostInsecure = matchTrue.MatchString(kv[1])
128                 case "ARVADOS_EXTERNAL_CLIENT":
129                         config.ExternalClient = matchTrue.MatchString(kv[1])
130                 case "ARVADOS_BLOB_SIGNING_KEY":
131                         blobSigningKey = kv[1]
132                 }
133         }
134         return config, nil
135 }
136
137 // keep-rsync source and destination clients
138 var (
139         arvSrc arvadosclient.ArvadosClient
140         arvDst arvadosclient.ArvadosClient
141         kcSrc  *keepclient.KeepClient
142         kcDst  *keepclient.KeepClient
143 )
144
145 // Initializes keep-rsync using the config provided
146 func initializeKeepRsync() (err error) {
147         // arvSrc from srcConfig
148         arvSrc, err = arvadosclient.New(srcConfig)
149         if err != nil {
150                 return
151         }
152
153         // arvDst from dstConfig
154         arvDst, err = arvadosclient.New(dstConfig)
155         if err != nil {
156                 return
157         }
158
159         // Get default replications value from destination, if it is not already provided
160         if replications == 0 {
161                 value, err := arvDst.Discovery("defaultCollectionReplication")
162                 if err == nil {
163                         replications = int(value.(float64))
164                 } else {
165                         replications = 2
166                 }
167         }
168
169         // if srcKeepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
170         if srcKeepServicesJSON == "" {
171                 kcSrc, err = keepclient.MakeKeepClient(&arvSrc)
172                 if err != nil {
173                         return
174                 }
175         } else {
176                 kcSrc, err = keepclient.MakeKeepClientFromJSON(&arvSrc, srcKeepServicesJSON)
177                 if err != nil {
178                         return
179                 }
180         }
181
182         // if dstKeepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
183         if dstKeepServicesJSON == "" {
184                 kcDst, err = keepclient.MakeKeepClient(&arvDst)
185                 if err != nil {
186                         return
187                 }
188         } else {
189                 kcDst, err = keepclient.MakeKeepClientFromJSON(&arvDst, dstKeepServicesJSON)
190                 if err != nil {
191                         return
192                 }
193         }
194         kcDst.Want_replicas = replications
195
196         return
197 }
198
199 // Get unique block locators from src and dst
200 // Copy any blocks missing in dst
201 func performKeepRsync() error {
202         // Get unique locators from src
203         srcIndex, err := getUniqueLocators(kcSrc, prefix)
204         if err != nil {
205                 return err
206         }
207
208         // Get unique locators from dst
209         dstIndex, err := getUniqueLocators(kcDst, prefix)
210         if err != nil {
211                 return err
212         }
213
214         // Get list of locators found in src, but missing in dst
215         toBeCopied := getMissingLocators(srcIndex, dstIndex)
216
217         // Copy each missing block to dst
218         copyBlocksToDst(toBeCopied)
219
220         return nil
221 }
222
223 // Get list of unique locators from the specified cluster
224 func getUniqueLocators(kc *keepclient.KeepClient, indexPrefix string) (map[string]bool, error) {
225         var indexBytes []byte
226
227         for uuid := range kc.LocalRoots() {
228                 reader, err := kc.GetIndex(uuid, indexPrefix)
229                 if err != nil {
230                         return nil, err
231                 }
232
233                 var readBytes []byte
234                 readBytes, err = ioutil.ReadAll(reader)
235                 if err != nil {
236                         return nil, err
237                 }
238
239                 indexBytes = append(indexBytes, readBytes...)
240         }
241
242         // Got index; Now dedup it
243         locators := bytes.Split(indexBytes, []byte("\n"))
244
245         uniqueLocators := map[string]bool{}
246         for _, loc := range locators {
247                 if len(loc) == 0 {
248                         continue
249                 }
250
251                 locator := string(bytes.Split(loc, []byte(" "))[0])
252                 if _, ok := uniqueLocators[locator]; !ok {
253                         uniqueLocators[locator] = true
254                 }
255         }
256         return uniqueLocators, nil
257 }
258
259 // Get list of locators that are in src but not in dst
260 func getMissingLocators(srcLocators map[string]bool, dstLocators map[string]bool) []string {
261         var missingLocators []string
262         for locator := range srcLocators {
263                 if _, ok := dstLocators[locator]; !ok {
264                         missingLocators = append(missingLocators, locator)
265                 }
266         }
267         return missingLocators
268 }
269
270 // Copy blocks from src to dst; only those that are missing in dst are copied
271 func copyBlocksToDst(toBeCopied []string) {
272         done := 0
273         total := len(toBeCopied)
274         var failed []string
275
276         for _, locator := range toBeCopied {
277                 log.Printf("Getting block %d of %d", done+1, total)
278
279                 log.Printf("Getting block: %v", locator)
280
281                 getLocator := locator
282                 expiresAt := time.Now().AddDate(0, 0, 1)
283                 if blobSigningKey != "" {
284                         getLocator = keepclient.SignLocator(getLocator, arvSrc.ApiToken, expiresAt, []byte(blobSigningKey))
285                 }
286
287                 reader, _, _, err := kcSrc.Get(getLocator)
288                 if err != nil {
289                         log.Printf("Error getting block: %q %v", locator, err)
290                         failed = append(failed, locator)
291                         continue
292                 }
293                 data, err := ioutil.ReadAll(reader)
294                 if err != nil {
295                         log.Printf("Error reading block data: %q %v", locator, err)
296                         failed = append(failed, locator)
297                         continue
298                 }
299
300                 log.Printf("Copying block: %q", locator)
301                 _, rep, err := kcDst.PutB(data)
302                 if err != nil {
303                         log.Printf("Error putting block data: %q %v", locator, err)
304                         failed = append(failed, locator)
305                         continue
306                 }
307                 if rep != replications {
308                         log.Printf("Failed to put enough number of replicas. Wanted: %d; Put: %d", replications, rep)
309                         failed = append(failed, locator)
310                         continue
311                 }
312
313                 done++
314                 log.Printf("%.2f%% done", float64(done)/float64(total)*100)
315         }
316
317         log.Printf("Successfully copied to destination %d and failed %d out of a total of %d", done, len(failed), total)
318         log.Printf("Failed blocks %v", failed)
319 }