7167: Convert most of the globals in keep-sync into locals and update all the code...
[arvados.git] / tools / keep-rsync / keep-rsync.go
1 package main
2
3 import (
4         "bufio"
5         "errors"
6         "flag"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9         "git.curoverse.com/arvados.git/sdk/go/keepclient"
10         "io/ioutil"
11         "log"
12         "regexp"
13         "strings"
14         "time"
15 )
16
17 // keep-rsync arguments
18 var (
19         blobSigningKey string
20 )
21
22 func main() {
23         var srcConfigFile, dstConfigFile, srcKeepServicesJSON, dstKeepServicesJSON, prefix string
24         var replications int
25
26         flag.StringVar(
27                 &srcConfigFile,
28                 "src-config-file",
29                 "",
30                 "Source configuration filename with full path that contains "+
31                         "an ARVADOS_API_TOKEN which is a valid datamanager token recognized by the source keep servers, "+
32                         "ARVADOS_API_HOST, ARVADOS_API_HOST_INSECURE, ARVADOS_EXTERNAL_CLIENT and ARVADOS_BLOB_SIGNING_KEY.")
33
34         flag.StringVar(
35                 &dstConfigFile,
36                 "dst-config-file",
37                 "",
38                 "Destination configuration filename with full path that contains "+
39                         "an ARVADOS_API_TOKEN which is a valid datamanager token recognized by the destination keep servers, "+
40                         "ARVADOS_API_HOST, ARVADOS_API_HOST_INSECURE, ARVADOS_EXTERNAL_CLIENT and ARVADOS_BLOB_SIGNING_KEY.")
41
42         flag.StringVar(
43                 &srcKeepServicesJSON,
44                 "src-keep-services-json",
45                 "",
46                 "An optional list of available source keepservices. "+
47                         "If not provided, this list is obtained from api server configured in src-config-file.")
48
49         flag.StringVar(
50                 &dstKeepServicesJSON,
51                 "dst-keep-services-json",
52                 "",
53                 "An optional list of available destination keepservices. "+
54                         "If not provided, this list is obtained from api server configured in dst-config-file.")
55
56         flag.IntVar(
57                 &replications,
58                 "replications",
59                 0,
60                 "Number of replications to write to the destination.")
61
62         flag.StringVar(
63                 &prefix,
64                 "prefix",
65                 "",
66                 "Index prefix")
67
68         flag.Parse()
69
70         srcConfig, dstConfig, err := loadConfig(srcConfigFile, dstConfigFile)
71         if err != nil {
72                 log.Fatalf("Error loading configuration from files: %s", err.Error())
73         }
74
75         // setup src and dst keepclients
76         kcSrc, kcDst, err := setupKeepClients(srcConfig, dstConfig, srcKeepServicesJSON, dstKeepServicesJSON, replications)
77         if err != nil {
78                 log.Fatalf("Error configuring keep-rsync: %s", err.Error())
79         }
80
81         // Copy blocks not found in dst from src
82         err = performKeepRsync(kcSrc, kcDst, prefix)
83         if err != nil {
84                 log.Fatalf("Error while syncing data: %s", err.Error())
85         }
86 }
87
88 // Load src and dst config from given files
89 func loadConfig(srcConfigFile, dstConfigFile string) (srcConfig, dstConfig arvadosclient.APIConfig, err error) {
90         if srcConfigFile == "" {
91                 return srcConfig, dstConfig, errors.New("-src-config-file must be specified")
92         }
93
94         srcConfig, err = readConfigFromFile(srcConfigFile)
95         if err != nil {
96                 return srcConfig, dstConfig, fmt.Errorf("Error reading source configuration: %v", err)
97         }
98
99         if dstConfigFile == "" {
100                 return srcConfig, dstConfig, errors.New("-dst-config-file must be specified")
101         }
102         dstConfig, err = readConfigFromFile(dstConfigFile)
103         if err != nil {
104                 return srcConfig, dstConfig, fmt.Errorf("Error reading destination configuration: %v", err)
105         }
106
107         return srcConfig, dstConfig, err
108 }
109
110 var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
111
112 // Read config from file
113 func readConfigFromFile(filename string) (arvadosclient.APIConfig, error) {
114         var config arvadosclient.APIConfig
115
116         content, err := ioutil.ReadFile(filename)
117         if err != nil {
118                 return config, err
119         }
120
121         lines := strings.Split(string(content), "\n")
122         for _, line := range lines {
123                 if line == "" {
124                         continue
125                 }
126
127                 kv := strings.SplitN(line, "=", 2)
128                 key := strings.TrimSpace(kv[0])
129                 value := strings.TrimSpace(kv[1])
130
131                 switch key {
132                 case "ARVADOS_API_TOKEN":
133                         config.APIToken = value
134                 case "ARVADOS_API_HOST":
135                         config.APIHost = value
136                 case "ARVADOS_API_HOST_INSECURE":
137                         config.APIHostInsecure = matchTrue.MatchString(value)
138                 case "ARVADOS_EXTERNAL_CLIENT":
139                         config.ExternalClient = matchTrue.MatchString(value)
140                 case "ARVADOS_BLOB_SIGNING_KEY":
141                         blobSigningKey = value
142                 }
143         }
144         return config, nil
145 }
146
147 // Initializes keep-rsync using the config provided
148 func setupKeepClients(srcConfig, dstConfig arvadosclient.APIConfig, srcKeepServicesJSON, dstKeepServicesJSON string, replications int) (kcSrc, kcDst *keepclient.KeepClient, err error) {
149         // arvSrc from srcConfig
150         arvSrc, err := arvadosclient.New(srcConfig)
151         if err != nil {
152                 return kcSrc, kcDst, err
153         }
154
155         // arvDst from dstConfig
156         arvDst, err := arvadosclient.New(dstConfig)
157         if err != nil {
158                 return kcSrc, kcDst, err
159         }
160
161         // Get default replications value from destination, if it is not already provided
162         if replications == 0 {
163                 value, err := arvDst.Discovery("defaultCollectionReplication")
164                 if err == nil {
165                         replications = int(value.(float64))
166                 } else {
167                         replications = 2
168                 }
169         }
170
171         // if srcKeepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
172         if srcKeepServicesJSON == "" {
173                 kcSrc, err = keepclient.MakeKeepClient(&arvSrc)
174                 if err != nil {
175                         return nil, nil, err
176                 }
177         } else {
178                 kcSrc, err = keepclient.MakeKeepClientFromJSON(&arvSrc, srcKeepServicesJSON)
179                 if err != nil {
180                         return kcSrc, kcDst, err
181                 }
182         }
183
184         // if dstKeepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
185         if dstKeepServicesJSON == "" {
186                 kcDst, err = keepclient.MakeKeepClient(&arvDst)
187                 if err != nil {
188                         return kcSrc, kcDst, err
189                 }
190         } else {
191                 kcDst, err = keepclient.MakeKeepClientFromJSON(&arvDst, dstKeepServicesJSON)
192                 if err != nil {
193                         return kcSrc, kcDst, err
194                 }
195         }
196         kcDst.Want_replicas = replications
197
198         return kcSrc, kcDst, nil
199 }
200
201 // Get unique block locators from src and dst
202 // Copy any blocks missing in dst
203 func performKeepRsync(kcSrc, kcDst *keepclient.KeepClient, prefix string) error {
204         // Get unique locators from src
205         srcIndex, err := getUniqueLocators(kcSrc, prefix)
206         if err != nil {
207                 return err
208         }
209
210         // Get unique locators from dst
211         dstIndex, err := getUniqueLocators(kcDst, prefix)
212         if err != nil {
213                 return err
214         }
215
216         // Get list of locators found in src, but missing in dst
217         toBeCopied := getMissingLocators(srcIndex, dstIndex)
218
219         // Copy each missing block to dst
220         err = copyBlocksToDst(toBeCopied, kcSrc, kcDst)
221
222         return err
223 }
224
225 // Get list of unique locators from the specified cluster
226 func getUniqueLocators(kc *keepclient.KeepClient, prefix string) (map[string]bool, error) {
227         uniqueLocators := map[string]bool{}
228
229         // Get index and dedup
230         for uuid := range kc.LocalRoots() {
231                 reader, err := kc.GetIndex(uuid, prefix)
232                 if err != nil {
233                         return uniqueLocators, err
234                 }
235                 scanner := bufio.NewScanner(reader)
236                 for scanner.Scan() {
237                         uniqueLocators[strings.Split(scanner.Text(), " ")[0]] = true
238                 }
239         }
240
241         return uniqueLocators, nil
242 }
243
244 // Get list of locators that are in src but not in dst
245 func getMissingLocators(srcLocators, dstLocators map[string]bool) []string {
246         var missingLocators []string
247         for locator := range srcLocators {
248                 if _, ok := dstLocators[locator]; !ok {
249                         missingLocators = append(missingLocators, locator)
250                 }
251         }
252         return missingLocators
253 }
254
255 // Copy blocks from src to dst; only those that are missing in dst are copied
256 func copyBlocksToDst(toBeCopied []string, kcSrc, kcDst *keepclient.KeepClient) error {
257         done := 0
258         total := len(toBeCopied)
259
260         for _, locator := range toBeCopied {
261                 log.Printf("Getting block %d of %d: %v", done+1, total, locator)
262
263                 getLocator := locator
264                 expiresAt := time.Now().AddDate(0, 0, 1)
265                 if blobSigningKey != "" {
266                         getLocator = keepclient.SignLocator(getLocator, kcSrc.Arvados.ApiToken, expiresAt, []byte(blobSigningKey))
267                 }
268
269                 reader, _, _, err := kcSrc.Get(getLocator)
270                 if err != nil {
271                         return fmt.Errorf("Error getting block: %v %v", locator, err)
272                 }
273                 data, err := ioutil.ReadAll(reader)
274                 if err != nil {
275                         return fmt.Errorf("Error reading block data: %v %v", locator, err)
276                 }
277
278                 log.Printf("Writing block%d of %d: %v", locator)
279                 _, _, err = kcDst.PutB(data)
280                 if err != nil {
281                         return fmt.Errorf("Error putting block data: %v %v", locator, err)
282                 }
283
284                 done++
285                 log.Printf("%.2f%% done", float64(done)/float64(total)*100)
286         }
287
288         log.Printf("Successfully copied to destination %d blocks.", total)
289         return nil
290 }