7167: rename the newly added StopKeepServers as StopKeepWithParams; it now sounds...
[arvados.git] / tools / keep-rsync / keep-rsync.go
1 package main
2
3 import (
4         "bufio"
5         "crypto/tls"
6         "errors"
7         "flag"
8         "fmt"
9         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
10         "git.curoverse.com/arvados.git/sdk/go/keepclient"
11         "io/ioutil"
12         "log"
13         "net/http"
14         "os"
15         "regexp"
16         "strings"
17         "time"
18 )
19
20 func main() {
21         var srcConfigFile, dstConfigFile, srcKeepServicesJSON, dstKeepServicesJSON, prefix string
22         var replications int
23         var srcBlobSigningKey string
24
25         flag.StringVar(
26                 &srcConfigFile,
27                 "src",
28                 "",
29                 "Source configuration filename. May be either a pathname to a config file, or (for example) 'foo' as shorthand for $HOME/.config/arvados/foo.conf")
30
31         flag.StringVar(
32                 &dstConfigFile,
33                 "dst",
34                 "",
35                 "Destination configuration filename. May be either a pathname to a config file, or (for example) 'foo' as shorthand for $HOME/.config/arvados/foo.conf")
36
37         flag.StringVar(
38                 &srcKeepServicesJSON,
39                 "src-keep-services-json",
40                 "",
41                 "An optional list of available source keepservices. "+
42                         "If not provided, this list is obtained from api server configured in src-config-file.")
43
44         flag.StringVar(
45                 &dstKeepServicesJSON,
46                 "dst-keep-services-json",
47                 "",
48                 "An optional list of available destination keepservices. "+
49                         "If not provided, this list is obtained from api server configured in dst-config-file.")
50
51         flag.IntVar(
52                 &replications,
53                 "replications",
54                 0,
55                 "Number of replications to write to the destination. If replications not specified, "+
56                         "default replication level configured on destination server will be used.")
57
58         flag.StringVar(
59                 &prefix,
60                 "prefix",
61                 "",
62                 "Index prefix")
63
64         flag.Parse()
65
66         srcConfig, srcBlobSigningKey, err := loadConfig(srcConfigFile)
67         if err != nil {
68                 log.Fatalf("Error loading src configuration from file: %s", err.Error())
69         }
70
71         dstConfig, _, err := loadConfig(dstConfigFile)
72         if err != nil {
73                 log.Fatalf("Error loading dst configuration from file: %s", err.Error())
74         }
75
76         // setup src and dst keepclients
77         kcSrc, err := setupKeepClient(srcConfig, srcKeepServicesJSON, false, 0)
78         if err != nil {
79                 log.Fatalf("Error configuring src keepclient: %s", err.Error())
80         }
81
82         kcDst, err := setupKeepClient(dstConfig, dstKeepServicesJSON, true, replications)
83         if err != nil {
84                 log.Fatalf("Error configuring dst keepclient: %s", err.Error())
85         }
86
87         // Copy blocks not found in dst from src
88         err = performKeepRsync(kcSrc, kcDst, srcBlobSigningKey, prefix)
89         if err != nil {
90                 log.Fatalf("Error while syncing data: %s", err.Error())
91         }
92 }
93
94 type apiConfig struct {
95         APIToken        string
96         APIHost         string
97         APIHostInsecure bool
98         ExternalClient  bool
99 }
100
101 // Load src and dst config from given files
102 func loadConfig(configFile string) (config apiConfig, blobSigningKey string, err error) {
103         if configFile == "" {
104                 return config, blobSigningKey, errors.New("config file not specified")
105         }
106
107         config, blobSigningKey, err = readConfigFromFile(configFile)
108         if err != nil {
109                 return config, blobSigningKey, fmt.Errorf("Error reading config file: %v", err)
110         }
111
112         return
113 }
114
115 var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
116
117 // Read config from file
118 func readConfigFromFile(filename string) (config apiConfig, blobSigningKey string, err error) {
119         if !strings.Contains(filename, "/") {
120                 filename = os.Getenv("HOME") + "/.config/arvados/" + filename + ".conf"
121         }
122
123         content, err := ioutil.ReadFile(filename)
124
125         if err != nil {
126                 return config, "", err
127         }
128
129         lines := strings.Split(string(content), "\n")
130         for _, line := range lines {
131                 if line == "" {
132                         continue
133                 }
134
135                 kv := strings.SplitN(line, "=", 2)
136                 key := strings.TrimSpace(kv[0])
137                 value := strings.TrimSpace(kv[1])
138
139                 switch key {
140                 case "ARVADOS_API_TOKEN":
141                         config.APIToken = value
142                 case "ARVADOS_API_HOST":
143                         config.APIHost = value
144                 case "ARVADOS_API_HOST_INSECURE":
145                         config.APIHostInsecure = matchTrue.MatchString(value)
146                 case "ARVADOS_EXTERNAL_CLIENT":
147                         config.ExternalClient = matchTrue.MatchString(value)
148                 case "ARVADOS_BLOB_SIGNING_KEY":
149                         blobSigningKey = value
150                 }
151         }
152         return
153 }
154
155 // setup keepclient using the config provided
156 func setupKeepClient(config apiConfig, keepServicesJSON string, isDst bool, replications int) (kc *keepclient.KeepClient, err error) {
157         arv := arvadosclient.ArvadosClient{
158                 ApiToken:    config.APIToken,
159                 ApiServer:   config.APIHost,
160                 ApiInsecure: config.APIHostInsecure,
161                 Client: &http.Client{Transport: &http.Transport{
162                         TLSClientConfig: &tls.Config{InsecureSkipVerify: config.APIHostInsecure}}},
163                 External: config.ExternalClient,
164         }
165
166         // if keepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
167         if keepServicesJSON == "" {
168                 kc, err = keepclient.MakeKeepClient(&arv)
169                 if err != nil {
170                         return nil, err
171                 }
172         } else {
173                 kc = keepclient.New(&arv)
174                 err = kc.LoadKeepServicesFromJSON(keepServicesJSON)
175                 if err != nil {
176                         return kc, err
177                 }
178         }
179
180         if isDst {
181                 // Get default replications value from destination, if it is not already provided
182                 if replications == 0 {
183                         value, err := arv.Discovery("defaultCollectionReplication")
184                         if err == nil {
185                                 replications = int(value.(float64))
186                         } else {
187                                 return nil, err
188                         }
189                 }
190
191                 kc.Want_replicas = replications
192         }
193
194         return kc, nil
195 }
196
197 // Get unique block locators from src and dst
198 // Copy any blocks missing in dst
199 func performKeepRsync(kcSrc, kcDst *keepclient.KeepClient, blobSigningKey, prefix string) error {
200         // Get unique locators from src
201         srcIndex, err := getUniqueLocators(kcSrc, prefix)
202         if err != nil {
203                 return err
204         }
205
206         // Get unique locators from dst
207         dstIndex, err := getUniqueLocators(kcDst, prefix)
208         if err != nil {
209                 return err
210         }
211
212         // Get list of locators found in src, but missing in dst
213         toBeCopied := getMissingLocators(srcIndex, dstIndex)
214
215         // Copy each missing block to dst
216         log.Printf("Before keep-rsync, there are %d blocks in src and %d blocks in dst. Start copying %d blocks from src not found in dst.",
217                 len(srcIndex), len(dstIndex), len(toBeCopied))
218
219         err = copyBlocksToDst(toBeCopied, kcSrc, kcDst, blobSigningKey)
220
221         return err
222 }
223
224 // Get list of unique locators from the specified cluster
225 func getUniqueLocators(kc *keepclient.KeepClient, prefix string) (map[string]bool, error) {
226         uniqueLocators := map[string]bool{}
227
228         // Get index and dedup
229         for uuid := range kc.LocalRoots() {
230                 reader, err := kc.GetIndex(uuid, prefix)
231                 if err != nil {
232                         return uniqueLocators, err
233                 }
234                 scanner := bufio.NewScanner(reader)
235                 for scanner.Scan() {
236                         uniqueLocators[strings.Split(scanner.Text(), " ")[0]] = true
237                 }
238         }
239
240         return uniqueLocators, nil
241 }
242
243 // Get list of locators that are in src but not in dst
244 func getMissingLocators(srcLocators, dstLocators map[string]bool) []string {
245         var missingLocators []string
246         for locator := range srcLocators {
247                 if _, ok := dstLocators[locator]; !ok {
248                         missingLocators = append(missingLocators, locator)
249                 }
250         }
251         return missingLocators
252 }
253
254 // Copy blocks from src to dst; only those that are missing in dst are copied
255 func copyBlocksToDst(toBeCopied []string, kcSrc, kcDst *keepclient.KeepClient, blobSigningKey string) error {
256         total := len(toBeCopied)
257
258         startedAt := time.Now()
259         for done, locator := range toBeCopied {
260                 if done == 0 {
261                         log.Printf("Copying data block %d of %d (%.2f%% done): %v", done+1, total,
262                                 float64(done)/float64(total)*100, locator)
263                 } else {
264                         timePerBlock := time.Since(startedAt) / time.Duration(done)
265                         log.Printf("Copying data block %d of %d (%.2f%% done, ETA %v): %v", done+1, total,
266                                 float64(done)/float64(total)*100, timePerBlock*time.Duration(total-done), locator)
267                 }
268
269                 getLocator := locator
270                 expiresAt := time.Now().AddDate(0, 0, 1)
271                 if blobSigningKey != "" {
272                         getLocator = keepclient.SignLocator(getLocator, kcSrc.Arvados.ApiToken, expiresAt, []byte(blobSigningKey))
273                 }
274
275                 reader, len, _, err := kcSrc.Get(getLocator)
276                 if err != nil {
277                         return fmt.Errorf("Error getting block: %v %v", locator, err)
278                 }
279
280                 _, _, err = kcDst.PutHR(getLocator[:32], reader, len)
281                 if err != nil {
282                         return fmt.Errorf("Error copying data block: %v %v", locator, err)
283                 }
284         }
285
286         log.Printf("Successfully copied to destination %d blocks.", total)
287         return nil
288 }