Merge branch 'master' into 7167-keep-rsync
[arvados.git] / tools / keep-rsync / keep-rsync.go
1 package main
2
3 import (
4         "bufio"
5         "crypto/tls"
6         "errors"
7         "flag"
8         "fmt"
9         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
10         "git.curoverse.com/arvados.git/sdk/go/keepclient"
11         "io/ioutil"
12         "log"
13         "net/http"
14         "os"
15         "regexp"
16         "strings"
17         "time"
18 )
19
20 func main() {
21         var srcConfigFile, dstConfigFile, srcKeepServicesJSON, dstKeepServicesJSON, prefix string
22         var replications int
23         var srcBlobSigningKey string
24
25         flag.StringVar(
26                 &srcConfigFile,
27                 "src-config-file",
28                 "",
29                 "Source configuration filename. May be either a pathname to a config file, or (for example) 'foo' as shorthand for $HOME/.config/arvados/foo.conf")
30
31         flag.StringVar(
32                 &dstConfigFile,
33                 "dst-config-file",
34                 "",
35                 "Destination configuration filename. May be either a pathname to a config file, or (for example) 'foo' as shorthand for $HOME/.config/arvados/foo.conf")
36
37         flag.StringVar(
38                 &srcKeepServicesJSON,
39                 "src-keep-services-json",
40                 "",
41                 "An optional list of available source keepservices. "+
42                         "If not provided, this list is obtained from api server configured in src-config-file.")
43
44         flag.StringVar(
45                 &dstKeepServicesJSON,
46                 "dst-keep-services-json",
47                 "",
48                 "An optional list of available destination keepservices. "+
49                         "If not provided, this list is obtained from api server configured in dst-config-file.")
50
51         flag.IntVar(
52                 &replications,
53                 "replications",
54                 0,
55                 "Number of replications to write to the destination. If replications not specified, "+
56                         "default replication level configured on destination server will be used.")
57
58         flag.StringVar(
59                 &prefix,
60                 "prefix",
61                 "",
62                 "Index prefix")
63
64         flag.Parse()
65
66         srcConfig, srcBlobSigningKey, err := loadConfig(srcConfigFile)
67         if err != nil {
68                 log.Fatalf("Error loading src configuration from file: %s", err.Error())
69         }
70
71         dstConfig, _, err := loadConfig(dstConfigFile)
72         if err != nil {
73                 log.Fatalf("Error loading dst configuration from file: %s", err.Error())
74         }
75
76         // setup src and dst keepclients
77         kcSrc, err := setupKeepClient(srcConfig, srcKeepServicesJSON, false, 0)
78         if err != nil {
79                 log.Fatalf("Error configuring src keepclient: %s", err.Error())
80         }
81
82         kcDst, err := setupKeepClient(dstConfig, dstKeepServicesJSON, true, replications)
83         if err != nil {
84                 log.Fatalf("Error configuring dst keepclient: %s", err.Error())
85         }
86
87         // Copy blocks not found in dst from src
88         err = performKeepRsync(kcSrc, kcDst, srcBlobSigningKey, prefix)
89         if err != nil {
90                 log.Fatalf("Error while syncing data: %s", err.Error())
91         }
92 }
93
94 type apiConfig struct {
95         APIToken        string
96         APIHost         string
97         APIHostInsecure bool
98         ExternalClient  bool
99 }
100
101 // Load src and dst config from given files
102 func loadConfig(configFile string) (config apiConfig, blobSigningKey string, err error) {
103         if configFile == "" {
104                 return config, blobSigningKey, errors.New("config file not specified")
105         }
106
107         config, blobSigningKey, err = readConfigFromFile(configFile)
108         if err != nil {
109                 return config, blobSigningKey, fmt.Errorf("Error reading config file: %v", err)
110         }
111
112         return
113 }
114
115 var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
116
117 // Read config from file
118 func readConfigFromFile(filename string) (config apiConfig, blobSigningKey string, err error) {
119         if !strings.Contains(filename, "/") {
120                 filename = os.Getenv("HOME") + "/.config/arvados/" + filename
121                 if !strings.HasSuffix(filename, ".conf") {
122                         filename = filename + ".conf"
123                 }
124         }
125
126         content, err := ioutil.ReadFile(filename)
127
128         if err != nil {
129                 return config, "", err
130         }
131
132         lines := strings.Split(string(content), "\n")
133         for _, line := range lines {
134                 if line == "" {
135                         continue
136                 }
137
138                 kv := strings.SplitN(line, "=", 2)
139                 key := strings.TrimSpace(kv[0])
140                 value := strings.TrimSpace(kv[1])
141
142                 switch key {
143                 case "ARVADOS_API_TOKEN":
144                         config.APIToken = value
145                 case "ARVADOS_API_HOST":
146                         config.APIHost = value
147                 case "ARVADOS_API_HOST_INSECURE":
148                         config.APIHostInsecure = matchTrue.MatchString(value)
149                 case "ARVADOS_EXTERNAL_CLIENT":
150                         config.ExternalClient = matchTrue.MatchString(value)
151                 case "ARVADOS_BLOB_SIGNING_KEY":
152                         blobSigningKey = value
153                 }
154         }
155         return
156 }
157
158 // setup keepclient using the config provided
159 func setupKeepClient(config apiConfig, keepServicesJSON string, isDst bool, replications int) (kc *keepclient.KeepClient, err error) {
160         arv := arvadosclient.ArvadosClient{
161                 ApiToken:    config.APIToken,
162                 ApiServer:   config.APIHost,
163                 ApiInsecure: config.APIHostInsecure,
164                 Client: &http.Client{Transport: &http.Transport{
165                         TLSClientConfig: &tls.Config{InsecureSkipVerify: config.APIHostInsecure}}},
166                 External: config.ExternalClient,
167         }
168
169         // if keepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
170         if keepServicesJSON == "" {
171                 kc, err = keepclient.MakeKeepClient(&arv)
172                 if err != nil {
173                         return nil, err
174                 }
175         } else {
176                 kc = keepclient.New(&arv)
177                 err = kc.LoadKeepServicesFromJSON(keepServicesJSON)
178                 if err != nil {
179                         return kc, err
180                 }
181         }
182
183         if isDst {
184                 // Get default replications value from destination, if it is not already provided
185                 if replications == 0 {
186                         value, err := arv.Discovery("defaultCollectionReplication")
187                         if err == nil {
188                                 replications = int(value.(float64))
189                         } else {
190                                 return nil, err
191                         }
192                 }
193
194                 kc.Want_replicas = replications
195         }
196
197         return kc, nil
198 }
199
200 // Get unique block locators from src and dst
201 // Copy any blocks missing in dst
202 func performKeepRsync(kcSrc, kcDst *keepclient.KeepClient, blobSigningKey, prefix string) error {
203         // Get unique locators from src
204         srcIndex, err := getUniqueLocators(kcSrc, prefix)
205         if err != nil {
206                 return err
207         }
208
209         // Get unique locators from dst
210         dstIndex, err := getUniqueLocators(kcDst, prefix)
211         if err != nil {
212                 return err
213         }
214
215         // Get list of locators found in src, but missing in dst
216         toBeCopied := getMissingLocators(srcIndex, dstIndex)
217
218         // Copy each missing block to dst
219         log.Printf("Before keep-rsync, there are %d blocks in src and %d blocks in dst. Start copying %d blocks from src not found in dst.",
220                 len(srcIndex), len(dstIndex), len(toBeCopied))
221
222         err = copyBlocksToDst(toBeCopied, kcSrc, kcDst, blobSigningKey)
223
224         return err
225 }
226
227 // Get list of unique locators from the specified cluster
228 func getUniqueLocators(kc *keepclient.KeepClient, prefix string) (map[string]bool, error) {
229         uniqueLocators := map[string]bool{}
230
231         // Get index and dedup
232         for uuid := range kc.LocalRoots() {
233                 reader, err := kc.GetIndex(uuid, prefix)
234                 if err != nil {
235                         return uniqueLocators, err
236                 }
237                 scanner := bufio.NewScanner(reader)
238                 for scanner.Scan() {
239                         uniqueLocators[strings.Split(scanner.Text(), " ")[0]] = true
240                 }
241         }
242
243         return uniqueLocators, nil
244 }
245
246 // Get list of locators that are in src but not in dst
247 func getMissingLocators(srcLocators, dstLocators map[string]bool) []string {
248         var missingLocators []string
249         for locator := range srcLocators {
250                 if _, ok := dstLocators[locator]; !ok {
251                         missingLocators = append(missingLocators, locator)
252                 }
253         }
254         return missingLocators
255 }
256
257 // Copy blocks from src to dst; only those that are missing in dst are copied
258 func copyBlocksToDst(toBeCopied []string, kcSrc, kcDst *keepclient.KeepClient, blobSigningKey string) error {
259         done := 0
260         total := len(toBeCopied)
261
262         startedAt := time.Now()
263         var blockTime int64
264         for _, locator := range toBeCopied {
265                 log.Printf("Getting block %d of %d: %v", done+1, total, locator)
266
267                 getLocator := locator
268                 expiresAt := time.Now().AddDate(0, 0, 1)
269                 if blobSigningKey != "" {
270                         getLocator = keepclient.SignLocator(getLocator, kcSrc.Arvados.ApiToken, expiresAt, []byte(blobSigningKey))
271                 }
272
273                 reader, len, _, err := kcSrc.Get(getLocator)
274                 if err != nil {
275                         return fmt.Errorf("Error getting block: %v %v", locator, err)
276                 }
277
278                 if done == 0 {
279                         log.Printf("Copying data block %d of %d (%.2f%% done): %v", done+1, total,
280                                 float64(done)/float64(total)*100, locator)
281                 } else {
282                         log.Printf("Copying data block %d of %d (%.2f%% done, ETA %v): %v", done+1, total,
283                                 float64(done)/float64(total)*100, time.Duration(blockTime*int64(total-done)), locator)
284                 }
285                 _, _, err = kcDst.PutHR(getLocator[:32], reader, len)
286                 if err != nil {
287                         return fmt.Errorf("Error copying data block: %v %v", locator, err)
288                 }
289
290                 if done == 0 {
291                         blockTime = int64(time.Now().Sub(startedAt))
292                 }
293
294                 done++
295         }
296
297         log.Printf("Successfully copied to destination %d blocks.", total)
298         return nil
299 }