7167: Convert blobSigningKey also into local variable and make necessary changes...
[arvados.git] / tools / keep-rsync / keep-rsync.go
1 package main
2
3 import (
4         "bufio"
5         "crypto/tls"
6         "errors"
7         "flag"
8         "fmt"
9         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
10         "git.curoverse.com/arvados.git/sdk/go/keepclient"
11         "io/ioutil"
12         "log"
13         "net/http"
14         "regexp"
15         "strings"
16         "time"
17 )
18
19 func main() {
20         var srcConfigFile, dstConfigFile, srcKeepServicesJSON, dstKeepServicesJSON, prefix string
21         var replications int
22         var srcBlobSigningKey string
23
24         flag.StringVar(
25                 &srcConfigFile,
26                 "src-config-file",
27                 "",
28                 "Source configuration filename with full path that contains "+
29                         "an ARVADOS_API_TOKEN which is a valid datamanager token recognized by the source keep servers, "+
30                         "ARVADOS_API_HOST, ARVADOS_API_HOST_INSECURE, ARVADOS_EXTERNAL_CLIENT and ARVADOS_BLOB_SIGNING_KEY.")
31
32         flag.StringVar(
33                 &dstConfigFile,
34                 "dst-config-file",
35                 "",
36                 "Destination configuration filename with full path that contains "+
37                         "an ARVADOS_API_TOKEN which is a valid datamanager token recognized by the destination keep servers, "+
38                         "ARVADOS_API_HOST, ARVADOS_API_HOST_INSECURE, ARVADOS_EXTERNAL_CLIENT and ARVADOS_BLOB_SIGNING_KEY.")
39
40         flag.StringVar(
41                 &srcKeepServicesJSON,
42                 "src-keep-services-json",
43                 "",
44                 "An optional list of available source keepservices. "+
45                         "If not provided, this list is obtained from api server configured in src-config-file.")
46
47         flag.StringVar(
48                 &dstKeepServicesJSON,
49                 "dst-keep-services-json",
50                 "",
51                 "An optional list of available destination keepservices. "+
52                         "If not provided, this list is obtained from api server configured in dst-config-file.")
53
54         flag.IntVar(
55                 &replications,
56                 "replications",
57                 0,
58                 "Number of replications to write to the destination.")
59
60         flag.StringVar(
61                 &prefix,
62                 "prefix",
63                 "",
64                 "Index prefix")
65
66         flag.Parse()
67
68         srcConfig, dstConfig, srcBlobSigningKey, _, err := loadConfig(srcConfigFile, dstConfigFile)
69         if err != nil {
70                 log.Fatalf("Error loading configuration from files: %s", err.Error())
71         }
72
73         // setup src and dst keepclients
74         kcSrc, kcDst, err := setupKeepClients(srcConfig, dstConfig, srcKeepServicesJSON, dstKeepServicesJSON, replications)
75         if err != nil {
76                 log.Fatalf("Error configuring keep-rsync: %s", err.Error())
77         }
78
79         // Copy blocks not found in dst from src
80         err = performKeepRsync(kcSrc, kcDst, srcBlobSigningKey, prefix)
81         if err != nil {
82                 log.Fatalf("Error while syncing data: %s", err.Error())
83         }
84 }
85
86 // Load src and dst config from given files
87 func loadConfig(srcConfigFile, dstConfigFile string) (srcConfig, dstConfig arvadosclient.APIConfig, srcBlobSigningKey, dstBlobSigningKey string, err error) {
88         if srcConfigFile == "" {
89                 return srcConfig, dstConfig, srcBlobSigningKey, dstBlobSigningKey, errors.New("-src-config-file must be specified")
90         }
91
92         srcConfig, srcBlobSigningKey, err = readConfigFromFile(srcConfigFile)
93         if err != nil {
94                 return srcConfig, dstConfig, srcBlobSigningKey, dstBlobSigningKey, fmt.Errorf("Error reading source configuration: %v", err)
95         }
96
97         if dstConfigFile == "" {
98                 return srcConfig, dstConfig, srcBlobSigningKey, dstBlobSigningKey, errors.New("-dst-config-file must be specified")
99         }
100         dstConfig, dstBlobSigningKey, err = readConfigFromFile(dstConfigFile)
101         if err != nil {
102                 return srcConfig, dstConfig, srcBlobSigningKey, dstBlobSigningKey, fmt.Errorf("Error reading destination configuration: %v", err)
103         }
104
105         return
106 }
107
108 var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
109
110 // Read config from file
111 func readConfigFromFile(filename string) (config arvadosclient.APIConfig, blobSigningKey string, err error) {
112         content, err := ioutil.ReadFile(filename)
113         if err != nil {
114                 return config, "", err
115         }
116
117         lines := strings.Split(string(content), "\n")
118         for _, line := range lines {
119                 if line == "" {
120                         continue
121                 }
122
123                 kv := strings.SplitN(line, "=", 2)
124                 key := strings.TrimSpace(kv[0])
125                 value := strings.TrimSpace(kv[1])
126
127                 switch key {
128                 case "ARVADOS_API_TOKEN":
129                         config.APIToken = value
130                 case "ARVADOS_API_HOST":
131                         config.APIHost = value
132                 case "ARVADOS_API_HOST_INSECURE":
133                         config.APIHostInsecure = matchTrue.MatchString(value)
134                 case "ARVADOS_EXTERNAL_CLIENT":
135                         config.ExternalClient = matchTrue.MatchString(value)
136                 case "ARVADOS_BLOB_SIGNING_KEY":
137                         blobSigningKey = value
138                 }
139         }
140         return
141 }
142
143 // Initializes keep-rsync using the config provided
144 func setupKeepClients(srcConfig, dstConfig arvadosclient.APIConfig, srcKeepServicesJSON, dstKeepServicesJSON string, replications int) (kcSrc, kcDst *keepclient.KeepClient, err error) {
145         // arvSrc from srcConfig
146         arvSrc := arvadosclient.ArvadosClient{
147                 ApiToken:    srcConfig.APIToken,
148                 ApiServer:   srcConfig.APIHost,
149                 ApiInsecure: srcConfig.APIHostInsecure,
150                 Client: &http.Client{Transport: &http.Transport{
151                         TLSClientConfig: &tls.Config{InsecureSkipVerify: srcConfig.APIHostInsecure}}},
152                 External: srcConfig.ExternalClient,
153         }
154
155         // arvDst from dstConfig
156         arvDst := arvadosclient.ArvadosClient{
157                 ApiToken:    dstConfig.APIToken,
158                 ApiServer:   dstConfig.APIHost,
159                 ApiInsecure: dstConfig.APIHostInsecure,
160                 Client: &http.Client{Transport: &http.Transport{
161                         TLSClientConfig: &tls.Config{InsecureSkipVerify: dstConfig.APIHostInsecure}}},
162                 External: dstConfig.ExternalClient,
163         }
164
165         // Get default replications value from destination, if it is not already provided
166         if replications == 0 {
167                 value, err := arvDst.Discovery("defaultCollectionReplication")
168                 if err == nil {
169                         replications = int(value.(float64))
170                 } else {
171                         replications = 2
172                 }
173         }
174
175         // if srcKeepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
176         if srcKeepServicesJSON == "" {
177                 kcSrc, err = keepclient.MakeKeepClient(&arvSrc)
178                 if err != nil {
179                         return nil, nil, err
180                 }
181         } else {
182                 kcSrc, err = keepclient.MakeKeepClientFromJSON(&arvSrc, srcKeepServicesJSON)
183                 if err != nil {
184                         return kcSrc, kcDst, err
185                 }
186         }
187
188         // if dstKeepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
189         if dstKeepServicesJSON == "" {
190                 kcDst, err = keepclient.MakeKeepClient(&arvDst)
191                 if err != nil {
192                         return kcSrc, kcDst, err
193                 }
194         } else {
195                 kcDst, err = keepclient.MakeKeepClientFromJSON(&arvDst, dstKeepServicesJSON)
196                 if err != nil {
197                         return kcSrc, kcDst, err
198                 }
199         }
200         kcDst.Want_replicas = replications
201
202         return kcSrc, kcDst, nil
203 }
204
205 // Get unique block locators from src and dst
206 // Copy any blocks missing in dst
207 func performKeepRsync(kcSrc, kcDst *keepclient.KeepClient, blobSigningKey, prefix string) error {
208         // Get unique locators from src
209         srcIndex, err := getUniqueLocators(kcSrc, prefix)
210         if err != nil {
211                 return err
212         }
213
214         // Get unique locators from dst
215         dstIndex, err := getUniqueLocators(kcDst, prefix)
216         if err != nil {
217                 return err
218         }
219
220         // Get list of locators found in src, but missing in dst
221         toBeCopied := getMissingLocators(srcIndex, dstIndex)
222
223         // Copy each missing block to dst
224         err = copyBlocksToDst(toBeCopied, kcSrc, kcDst, blobSigningKey)
225
226         return err
227 }
228
229 // Get list of unique locators from the specified cluster
230 func getUniqueLocators(kc *keepclient.KeepClient, prefix string) (map[string]bool, error) {
231         uniqueLocators := map[string]bool{}
232
233         // Get index and dedup
234         for uuid := range kc.LocalRoots() {
235                 reader, err := kc.GetIndex(uuid, prefix)
236                 if err != nil {
237                         return uniqueLocators, err
238                 }
239                 scanner := bufio.NewScanner(reader)
240                 for scanner.Scan() {
241                         uniqueLocators[strings.Split(scanner.Text(), " ")[0]] = true
242                 }
243         }
244
245         return uniqueLocators, nil
246 }
247
248 // Get list of locators that are in src but not in dst
249 func getMissingLocators(srcLocators, dstLocators map[string]bool) []string {
250         var missingLocators []string
251         for locator := range srcLocators {
252                 if _, ok := dstLocators[locator]; !ok {
253                         missingLocators = append(missingLocators, locator)
254                 }
255         }
256         return missingLocators
257 }
258
259 // Copy blocks from src to dst; only those that are missing in dst are copied
260 func copyBlocksToDst(toBeCopied []string, kcSrc, kcDst *keepclient.KeepClient, blobSigningKey string) error {
261         done := 0
262         total := len(toBeCopied)
263
264         for _, locator := range toBeCopied {
265                 log.Printf("Getting block %d of %d: %v", done+1, total, locator)
266
267                 getLocator := locator
268                 expiresAt := time.Now().AddDate(0, 0, 1)
269                 if blobSigningKey != "" {
270                         getLocator = keepclient.SignLocator(getLocator, kcSrc.Arvados.ApiToken, expiresAt, []byte(blobSigningKey))
271                 }
272
273                 reader, _, _, err := kcSrc.Get(getLocator)
274                 if err != nil {
275                         return fmt.Errorf("Error getting block: %v %v", locator, err)
276                 }
277                 data, err := ioutil.ReadAll(reader)
278                 if err != nil {
279                         return fmt.Errorf("Error reading block data: %v %v", locator, err)
280                 }
281
282                 log.Printf("Writing block%d of %d: %v", locator)
283                 _, _, err = kcDst.PutB(data)
284                 if err != nil {
285                         return fmt.Errorf("Error putting block data: %v %v", locator, err)
286                 }
287
288                 done++
289                 log.Printf("%.2f%% done", float64(done)/float64(total)*100)
290         }
291
292         log.Printf("Successfully copied to destination %d blocks.", total)
293         return nil
294 }