Merge branch 'master' into 7167-keep-rsync
[arvados.git] / tools / keep-rsync / keep-rsync.go
1 package main
2
3 import (
4         "bytes"
5         "flag"
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "git.curoverse.com/arvados.git/sdk/go/keepclient"
8         "io/ioutil"
9         "log"
10         "regexp"
11         "strings"
12 )
13
14 // keep-rsync arguments
15 var (
16         srcConfig           arvadosclient.APIConfig
17         dstConfig           arvadosclient.APIConfig
18         srcKeepServicesJSON string
19         dstKeepServicesJSON string
20         replications        int
21         prefix              string
22 )
23
24 func main() {
25         var srcConfigFile string
26         var dstConfigFile string
27
28         flag.StringVar(
29                 &srcConfigFile,
30                 "src-config-file",
31                 "",
32                 "Source configuration filename with full path that contains "+
33                         "an ARVADOS_API_TOKEN which is a valid datamanager token recognized by the source keep servers, "+
34                         "ARVADOS_API_HOST, ARVADOS_API_HOST_INSECURE, and ARVADOS_BLOB_SIGNING_KEY.")
35
36         flag.StringVar(
37                 &dstConfigFile,
38                 "dst-config-file",
39                 "",
40                 "Destination configuration filename with full path that contains "+
41                         "an ARVADOS_API_TOKEN which is a valid datamanager token recognized by the destination keep servers, "+
42                         "ARVADOS_API_HOST, ARVADOS_API_HOST_INSECURE, and ARVADOS_BLOB_SIGNING_KEY.")
43
44         flag.StringVar(
45                 &srcKeepServicesJSON,
46                 "src-keep-services-json",
47                 "",
48                 "An optional list of available source keepservices. "+
49                         "If not provided, this list is obtained from api server configured in src-config-file.")
50
51         flag.StringVar(
52                 &dstKeepServicesJSON,
53                 "dst-keep-services-json",
54                 "",
55                 "An optional list of available destination keepservices. "+
56                         "If not provided, this list is obtained from api server configured in dst-config-file.")
57
58         flag.IntVar(
59                 &replications,
60                 "replications",
61                 3,
62                 "Number of replications to write to the destination.")
63
64         flag.StringVar(
65                 &prefix,
66                 "prefix",
67                 "",
68                 "Index prefix")
69
70         flag.Parse()
71
72         var err error
73
74         // Load config
75         if srcConfigFile == "" {
76                 log.Fatal("-src-config-file must be specified.")
77         }
78         srcConfig, err = readConfigFromFile(srcConfigFile)
79         if err != nil {
80                 log.Fatal("Error reading source configuration: %s", err.Error())
81         }
82
83         if dstConfigFile == "" {
84                 log.Fatal("-dst-config-file must be specified.")
85         }
86         dstConfig, err = readConfigFromFile(dstConfigFile)
87         if err != nil {
88                 log.Fatal("Error reading destination configuration: %s", err.Error())
89         }
90
91         // Initialize keep-rsync
92         err = initializeKeepRsync()
93         if err != nil {
94                 log.Fatal("Error configurating keep-rsync: %s", err.Error())
95         }
96
97         // Copy blocks not found in dst from src
98         performKeepRsync()
99 }
100
101 var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
102
103 // Reads config from file
104 func readConfigFromFile(filename string) (arvadosclient.APIConfig, error) {
105         var config arvadosclient.APIConfig
106
107         content, err := ioutil.ReadFile(filename)
108         if err != nil {
109                 return config, err
110         }
111
112         lines := strings.Split(string(content), "\n")
113         for _, line := range lines {
114                 if line == "" {
115                         continue
116                 }
117                 kv := strings.Split(line, "=")
118
119                 switch kv[0] {
120                 case "ARVADOS_API_TOKEN":
121                         config.APIToken = kv[1]
122                 case "ARVADOS_API_HOST":
123                         config.APIHost = kv[1]
124                 case "ARVADOS_API_HOST_INSECURE":
125                         config.APIHostInsecure = matchTrue.MatchString(kv[1])
126                 case "ARVADOS_EXTERNAL_CLIENT":
127                         config.ExternalClient = matchTrue.MatchString(kv[1])
128                 }
129         }
130         return config, nil
131 }
132
133 // keep-rsync source and destination clients
134 var (
135         arvSrc arvadosclient.ArvadosClient
136         arvDst arvadosclient.ArvadosClient
137         kcSrc  *keepclient.KeepClient
138         kcDst  *keepclient.KeepClient
139 )
140
141 // Initializes keep-rsync using the config provided
142 func initializeKeepRsync() (err error) {
143         // arvSrc from srcConfig
144         arvSrc, err = arvadosclient.MakeArvadosClientWithConfig(srcConfig)
145         if err != nil {
146                 return
147         }
148
149         // arvDst from dstConfig
150         arvDst, err = arvadosclient.MakeArvadosClientWithConfig(dstConfig)
151         if err != nil {
152                 return
153         }
154
155         // if srcKeepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
156         if srcKeepServicesJSON == "" {
157                 kcSrc, err = keepclient.MakeKeepClient(&arvSrc)
158                 if err != nil {
159                         return
160                 }
161         } else {
162                 kcSrc, err = keepclient.MakeKeepClientFromJSON(&arvSrc, srcKeepServicesJSON)
163                 if err != nil {
164                         return
165                 }
166         }
167
168         // if dstKeepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
169         if dstKeepServicesJSON == "" {
170                 kcDst, err = keepclient.MakeKeepClient(&arvDst)
171                 if err != nil {
172                         return
173                 }
174         } else {
175                 kcDst, err = keepclient.MakeKeepClientFromJSON(&arvDst, dstKeepServicesJSON)
176                 if err != nil {
177                         return
178                 }
179         }
180         kcDst.Want_replicas = replications
181
182         return
183 }
184
185 // Get unique block locators from src and dst
186 // Copy any blocks missing in dst
187 func performKeepRsync() error {
188         // Get unique locators from src
189         srcIndex, err := getUniqueLocators(kcSrc, prefix)
190         if err != nil {
191                 return err
192         }
193
194         // Get unique locators from dst
195         dstIndex, err := getUniqueLocators(kcDst, prefix)
196         if err != nil {
197                 return err
198         }
199
200         // Get list of locators found in src, but missing in dst
201         toBeCopied := getMissingLocators(srcIndex, dstIndex)
202
203         // Copy each missing block to dst
204         copyBlocksToDst(toBeCopied)
205
206         return nil
207 }
208
209 // Get list of unique locators from the specified cluster
210 func getUniqueLocators(kc *keepclient.KeepClient, prefix string) (map[string]bool, error) {
211         var indexBytes []byte
212
213         for uuid := range kc.LocalRoots() {
214                 reader, err := kc.GetIndex(uuid, prefix)
215                 if err != nil {
216                         return nil, err
217                 }
218
219                 var readBytes []byte
220                 readBytes, err = ioutil.ReadAll(reader)
221                 if err != nil {
222                         return nil, err
223                 }
224
225                 indexBytes = append(indexBytes, readBytes...)
226         }
227
228         // Got index; Now dedup it
229         locators := bytes.Split(indexBytes, []byte("\n"))
230
231         uniqueLocators := map[string]bool{}
232         for _, loc := range locators {
233                 if len(loc) == 0 {
234                         continue
235                 }
236
237                 locator := string(bytes.Split(loc, []byte(" "))[0])
238                 if _, ok := uniqueLocators[locator]; !ok {
239                         uniqueLocators[locator] = true
240                 }
241         }
242         return uniqueLocators, nil
243 }
244
245 // Get list of locators that are in src but not in dst
246 func getMissingLocators(srcLocators map[string]bool, dstLocators map[string]bool) []string {
247         var missingLocators []string
248         for locator := range srcLocators {
249                 if _, ok := dstLocators[locator]; !ok {
250                         missingLocators = append(missingLocators, locator)
251                 }
252         }
253         return missingLocators
254 }
255
256 // Copy blocks from src to dst; only those that are missing in dst are copied
257 func copyBlocksToDst(toBeCopied []string) {
258         done := 0
259         total := len(toBeCopied)
260         var failed []string
261
262         for _, locator := range toBeCopied {
263                 log.Printf("Getting block %d of %d", done+1, total)
264
265                 log.Printf("Getting block: %v", locator)
266
267                 reader, _, _, err := kcSrc.Get(locator)
268                 if err != nil {
269                         log.Printf("Error getting block: %q %v", locator, err)
270                         failed = append(failed, locator)
271                         continue
272                 }
273                 data, err := ioutil.ReadAll(reader)
274                 if err != nil {
275                         log.Printf("Error reading block data: %q %v", locator, err)
276                         failed = append(failed, locator)
277                         continue
278                 }
279
280                 log.Printf("Copying block: %q", locator)
281                 _, rep, err := kcDst.PutB(data)
282                 if err != nil {
283                         log.Printf("Error putting block data: %q %v", locator, err)
284                         failed = append(failed, locator)
285                         continue
286                 }
287                 if rep != replications {
288                         log.Printf("Failed to put enough number of replicas. Wanted: %d; Put: %d", replications, rep)
289                         failed = append(failed, locator)
290                         continue
291                 }
292
293                 done++
294                 log.Printf("%.2f%% done", float64(done)/float64(total)*100)
295         }
296
297         log.Printf("Successfully copied to destination %d and failed %d out of a total of %d", done, len(failed), total)
298         log.Printf("Failed blocks %v", failed)
299 }