7167: loadConfig setupKeepclient do only one set at a time.
[arvados.git] / tools / keep-rsync / keep-rsync.go
1 package main
2
3 import (
4         "bufio"
5         "crypto/tls"
6         "errors"
7         "flag"
8         "fmt"
9         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
10         "git.curoverse.com/arvados.git/sdk/go/keepclient"
11         "io/ioutil"
12         "log"
13         "net/http"
14         "os"
15         "regexp"
16         "strings"
17         "time"
18 )
19
20 func main() {
21         var srcConfigFile, dstConfigFile, srcKeepServicesJSON, dstKeepServicesJSON, prefix string
22         var replications int
23         var srcBlobSigningKey string
24
25         flag.StringVar(
26                 &srcConfigFile,
27                 "src-config-file",
28                 "",
29                 "Source configuration filename with full path that contains "+
30                         "an ARVADOS_API_TOKEN which is a valid datamanager token recognized by the source keep servers, "+
31                         "ARVADOS_API_HOST, ARVADOS_API_HOST_INSECURE, ARVADOS_EXTERNAL_CLIENT and ARVADOS_BLOB_SIGNING_KEY.")
32
33         flag.StringVar(
34                 &dstConfigFile,
35                 "dst-config-file",
36                 "",
37                 "Destination configuration filename with full path that contains "+
38                         "an ARVADOS_API_TOKEN which is a valid datamanager token recognized by the destination keep servers, "+
39                         "ARVADOS_API_HOST, ARVADOS_API_HOST_INSECURE, ARVADOS_EXTERNAL_CLIENT and ARVADOS_BLOB_SIGNING_KEY.")
40
41         flag.StringVar(
42                 &srcKeepServicesJSON,
43                 "src-keep-services-json",
44                 "",
45                 "An optional list of available source keepservices. "+
46                         "If not provided, this list is obtained from api server configured in src-config-file.")
47
48         flag.StringVar(
49                 &dstKeepServicesJSON,
50                 "dst-keep-services-json",
51                 "",
52                 "An optional list of available destination keepservices. "+
53                         "If not provided, this list is obtained from api server configured in dst-config-file.")
54
55         flag.IntVar(
56                 &replications,
57                 "replications",
58                 0,
59                 "Number of replications to write to the destination. If replications not specified, "+
60                         "default replication level configured on destination server will be used.")
61
62         flag.StringVar(
63                 &prefix,
64                 "prefix",
65                 "",
66                 "Index prefix")
67
68         flag.Parse()
69
70         srcConfig, srcBlobSigningKey, err := loadConfig(srcConfigFile)
71         if err != nil {
72                 log.Fatalf("Error loading src configuration from file: %s", err.Error())
73         }
74
75         dstConfig, _, err := loadConfig(dstConfigFile)
76         if err != nil {
77                 log.Fatalf("Error loading dst configuration from file: %s", err.Error())
78         }
79
80         // setup src and dst keepclients
81         kcSrc, err := setupKeepClient(srcConfig, srcKeepServicesJSON, false, 0)
82         if err != nil {
83                 log.Fatalf("Error configuring src keepclient: %s", err.Error())
84         }
85
86         kcDst, err := setupKeepClient(dstConfig, dstKeepServicesJSON, true, replications)
87         if err != nil {
88                 log.Fatalf("Error configuring dst keepclient: %s", err.Error())
89         }
90
91         // Copy blocks not found in dst from src
92         err = performKeepRsync(kcSrc, kcDst, srcBlobSigningKey, prefix)
93         if err != nil {
94                 log.Fatalf("Error while syncing data: %s", err.Error())
95         }
96 }
97
98 type apiConfig struct {
99         APIToken        string
100         APIHost         string
101         APIHostInsecure bool
102         ExternalClient  bool
103 }
104
105 // Load src and dst config from given files
106 func loadConfig(configFile string) (config apiConfig, blobSigningKey string, err error) {
107         if configFile == "" {
108                 return config, blobSigningKey, errors.New("config file not specified")
109         }
110
111         config, blobSigningKey, err = readConfigFromFile(configFile)
112         if err != nil {
113                 return config, blobSigningKey, fmt.Errorf("Error reading config file: %v", err)
114         }
115
116         return
117 }
118
119 var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
120
121 // Read config from file
122 func readConfigFromFile(filename string) (config apiConfig, blobSigningKey string, err error) {
123         if !strings.Contains(filename, "/") {
124                 filename = os.Getenv("HOME") + "/.config/arvados/" + filename
125         }
126
127         content, err := ioutil.ReadFile(filename)
128
129         if err != nil {
130                 return config, "", err
131         }
132
133         lines := strings.Split(string(content), "\n")
134         for _, line := range lines {
135                 if line == "" {
136                         continue
137                 }
138
139                 kv := strings.SplitN(line, "=", 2)
140                 key := strings.TrimSpace(kv[0])
141                 value := strings.TrimSpace(kv[1])
142
143                 switch key {
144                 case "ARVADOS_API_TOKEN":
145                         config.APIToken = value
146                 case "ARVADOS_API_HOST":
147                         config.APIHost = value
148                 case "ARVADOS_API_HOST_INSECURE":
149                         config.APIHostInsecure = matchTrue.MatchString(value)
150                 case "ARVADOS_EXTERNAL_CLIENT":
151                         config.ExternalClient = matchTrue.MatchString(value)
152                 case "ARVADOS_BLOB_SIGNING_KEY":
153                         blobSigningKey = value
154                 }
155         }
156         return
157 }
158
159 // setup keepclient using the config provided
160 func setupKeepClient(config apiConfig, keepServicesJSON string, isDst bool, replications int) (kc *keepclient.KeepClient, err error) {
161         arv := arvadosclient.ArvadosClient{
162                 ApiToken:    config.APIToken,
163                 ApiServer:   config.APIHost,
164                 ApiInsecure: config.APIHostInsecure,
165                 Client: &http.Client{Transport: &http.Transport{
166                         TLSClientConfig: &tls.Config{InsecureSkipVerify: config.APIHostInsecure}}},
167                 External: config.ExternalClient,
168         }
169
170         // if keepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
171         if keepServicesJSON == "" {
172                 kc, err = keepclient.MakeKeepClient(&arv)
173                 if err != nil {
174                         return nil, err
175                 }
176         } else {
177                 kc = keepclient.New(&arv)
178                 err = kc.LoadKeepServicesFromJSON(keepServicesJSON)
179                 if err != nil {
180                         return kc, err
181                 }
182         }
183
184         if isDst {
185                 // Get default replications value from destination, if it is not already provided
186                 if replications == 0 {
187                         value, err := arv.Discovery("defaultCollectionReplication")
188                         if err == nil {
189                                 replications = int(value.(float64))
190                         } else {
191                                 return nil, err
192                         }
193                 }
194
195                 kc.Want_replicas = replications
196         }
197
198         return kc, nil
199 }
200
201 // Get unique block locators from src and dst
202 // Copy any blocks missing in dst
203 func performKeepRsync(kcSrc, kcDst *keepclient.KeepClient, blobSigningKey, prefix string) error {
204         // Get unique locators from src
205         srcIndex, err := getUniqueLocators(kcSrc, prefix)
206         if err != nil {
207                 return err
208         }
209
210         // Get unique locators from dst
211         dstIndex, err := getUniqueLocators(kcDst, prefix)
212         if err != nil {
213                 return err
214         }
215
216         // Get list of locators found in src, but missing in dst
217         toBeCopied := getMissingLocators(srcIndex, dstIndex)
218
219         // Copy each missing block to dst
220         err = copyBlocksToDst(toBeCopied, kcSrc, kcDst, blobSigningKey)
221
222         return err
223 }
224
225 // Get list of unique locators from the specified cluster
226 func getUniqueLocators(kc *keepclient.KeepClient, prefix string) (map[string]bool, error) {
227         uniqueLocators := map[string]bool{}
228
229         // Get index and dedup
230         for uuid := range kc.LocalRoots() {
231                 reader, err := kc.GetIndex(uuid, prefix)
232                 if err != nil {
233                         return uniqueLocators, err
234                 }
235                 scanner := bufio.NewScanner(reader)
236                 for scanner.Scan() {
237                         uniqueLocators[strings.Split(scanner.Text(), " ")[0]] = true
238                 }
239         }
240
241         return uniqueLocators, nil
242 }
243
244 // Get list of locators that are in src but not in dst
245 func getMissingLocators(srcLocators, dstLocators map[string]bool) []string {
246         var missingLocators []string
247         for locator := range srcLocators {
248                 if _, ok := dstLocators[locator]; !ok {
249                         missingLocators = append(missingLocators, locator)
250                 }
251         }
252         return missingLocators
253 }
254
255 // Copy blocks from src to dst; only those that are missing in dst are copied
256 func copyBlocksToDst(toBeCopied []string, kcSrc, kcDst *keepclient.KeepClient, blobSigningKey string) error {
257         done := 0
258         total := len(toBeCopied)
259
260         for _, locator := range toBeCopied {
261                 log.Printf("Getting block %d of %d: %v", done+1, total, locator)
262
263                 getLocator := locator
264                 expiresAt := time.Now().AddDate(0, 0, 1)
265                 if blobSigningKey != "" {
266                         getLocator = keepclient.SignLocator(getLocator, kcSrc.Arvados.ApiToken, expiresAt, []byte(blobSigningKey))
267                 }
268
269                 reader, _, _, err := kcSrc.Get(getLocator)
270                 if err != nil {
271                         return fmt.Errorf("Error getting block: %v %v", locator, err)
272                 }
273                 data, err := ioutil.ReadAll(reader)
274                 if err != nil {
275                         return fmt.Errorf("Error reading block data: %v %v", locator, err)
276                 }
277
278                 log.Printf("Writing block%d of %d: %v", locator)
279                 _, _, err = kcDst.PutB(data)
280                 if err != nil {
281                         return fmt.Errorf("Error putting block data: %v %v", locator, err)
282                 }
283
284                 done++
285                 log.Printf("%.2f%% done", float64(done)/float64(total)*100)
286         }
287
288         log.Printf("Successfully copied to destination %d blocks.", total)
289         return nil
290 }