7167: some more error tests such as error getting block from src and error putting...
[arvados.git] / tools / keep-rsync / keep-rsync.go
1 package main
2
3 import (
4         "bytes"
5         "flag"
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "git.curoverse.com/arvados.git/sdk/go/keepclient"
8         "io/ioutil"
9         "log"
10         "regexp"
11         "strings"
12         "time"
13 )
14
15 // keep-rsync arguments
16 var (
17         srcConfig           arvadosclient.APIConfig
18         dstConfig           arvadosclient.APIConfig
19         blobSigningKey      string
20         srcKeepServicesJSON string
21         dstKeepServicesJSON string
22         replications        int
23         prefix              string
24 )
25
26 func main() {
27         var srcConfigFile string
28         var dstConfigFile string
29
30         flag.StringVar(
31                 &srcConfigFile,
32                 "src-config-file",
33                 "",
34                 "Source configuration filename with full path that contains "+
35                         "an ARVADOS_API_TOKEN which is a valid datamanager token recognized by the source keep servers, "+
36                         "ARVADOS_API_HOST, ARVADOS_API_HOST_INSECURE, ARVADOS_EXTERNAL_CLIENT and ARVADOS_BLOB_SIGNING_KEY.")
37
38         flag.StringVar(
39                 &dstConfigFile,
40                 "dst-config-file",
41                 "",
42                 "Destination configuration filename with full path that contains "+
43                         "an ARVADOS_API_TOKEN which is a valid datamanager token recognized by the destination keep servers, "+
44                         "ARVADOS_API_HOST, ARVADOS_API_HOST_INSECURE, ARVADOS_EXTERNAL_CLIENT and ARVADOS_BLOB_SIGNING_KEY.")
45
46         flag.StringVar(
47                 &srcKeepServicesJSON,
48                 "src-keep-services-json",
49                 "",
50                 "An optional list of available source keepservices. "+
51                         "If not provided, this list is obtained from api server configured in src-config-file.")
52
53         flag.StringVar(
54                 &dstKeepServicesJSON,
55                 "dst-keep-services-json",
56                 "",
57                 "An optional list of available destination keepservices. "+
58                         "If not provided, this list is obtained from api server configured in dst-config-file.")
59
60         flag.IntVar(
61                 &replications,
62                 "replications",
63                 0,
64                 "Number of replications to write to the destination.")
65
66         flag.StringVar(
67                 &prefix,
68                 "prefix",
69                 "",
70                 "Index prefix")
71
72         flag.Parse()
73
74         var err error
75
76         // Load config
77         if srcConfigFile == "" {
78                 log.Fatal("-src-config-file must be specified.")
79         }
80         srcConfig, err = readConfigFromFile(srcConfigFile)
81         if err != nil {
82                 log.Fatal("Error reading source configuration: %s", err.Error())
83         }
84
85         if dstConfigFile == "" {
86                 log.Fatal("-dst-config-file must be specified.")
87         }
88         dstConfig, err = readConfigFromFile(dstConfigFile)
89         if err != nil {
90                 log.Fatal("Error reading destination configuration: %s", err.Error())
91         }
92
93         // Initialize keep-rsync
94         err = initializeKeepRsync()
95         if err != nil {
96                 log.Fatal("Error configuring keep-rsync: %s", err.Error())
97         }
98
99         // Copy blocks not found in dst from src
100         err = performKeepRsync()
101         if err != nil {
102                 log.Fatal("Error while syncing data: %s", err.Error())
103         }
104 }
105
106 var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
107
108 // Reads config from file
109 func readConfigFromFile(filename string) (arvadosclient.APIConfig, error) {
110         var config arvadosclient.APIConfig
111
112         content, err := ioutil.ReadFile(filename)
113         if err != nil {
114                 return config, err
115         }
116
117         lines := strings.Split(string(content), "\n")
118         for _, line := range lines {
119                 if line == "" {
120                         continue
121                 }
122                 kv := strings.Split(line, "=")
123
124                 switch kv[0] {
125                 case "ARVADOS_API_TOKEN":
126                         config.APIToken = kv[1]
127                 case "ARVADOS_API_HOST":
128                         config.APIHost = kv[1]
129                 case "ARVADOS_API_HOST_INSECURE":
130                         config.APIHostInsecure = matchTrue.MatchString(kv[1])
131                 case "ARVADOS_EXTERNAL_CLIENT":
132                         config.ExternalClient = matchTrue.MatchString(kv[1])
133                 case "ARVADOS_BLOB_SIGNING_KEY":
134                         blobSigningKey = kv[1]
135                 }
136         }
137         return config, nil
138 }
139
140 // keep-rsync source and destination clients
141 var (
142         arvSrc arvadosclient.ArvadosClient
143         arvDst arvadosclient.ArvadosClient
144         kcSrc  *keepclient.KeepClient
145         kcDst  *keepclient.KeepClient
146 )
147
148 // Initializes keep-rsync using the config provided
149 func initializeKeepRsync() (err error) {
150         // arvSrc from srcConfig
151         arvSrc, err = arvadosclient.New(srcConfig)
152         if err != nil {
153                 return
154         }
155
156         // arvDst from dstConfig
157         arvDst, err = arvadosclient.New(dstConfig)
158         if err != nil {
159                 return
160         }
161
162         // Get default replications value from destination, if it is not already provided
163         if replications == 0 {
164                 value, err := arvDst.Discovery("defaultCollectionReplication")
165                 if err == nil {
166                         replications = int(value.(float64))
167                 } else {
168                         replications = 2
169                 }
170         }
171
172         // if srcKeepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
173         if srcKeepServicesJSON == "" {
174                 kcSrc, err = keepclient.MakeKeepClient(&arvSrc)
175                 if err != nil {
176                         return
177                 }
178         } else {
179                 kcSrc, err = keepclient.MakeKeepClientFromJSON(&arvSrc, srcKeepServicesJSON)
180                 if err != nil {
181                         return
182                 }
183         }
184
185         // if dstKeepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
186         if dstKeepServicesJSON == "" {
187                 kcDst, err = keepclient.MakeKeepClient(&arvDst)
188                 if err != nil {
189                         return
190                 }
191         } else {
192                 kcDst, err = keepclient.MakeKeepClientFromJSON(&arvDst, dstKeepServicesJSON)
193                 if err != nil {
194                         return
195                 }
196         }
197         kcDst.Want_replicas = replications
198
199         return
200 }
201
202 // Get unique block locators from src and dst
203 // Copy any blocks missing in dst
204 func performKeepRsync() error {
205         // Get unique locators from src
206         srcIndex, err := getUniqueLocators(kcSrc, prefix)
207         if err != nil {
208                 return err
209         }
210
211         // Get unique locators from dst
212         dstIndex, err := getUniqueLocators(kcDst, prefix)
213         if err != nil {
214                 return err
215         }
216
217         // Get list of locators found in src, but missing in dst
218         toBeCopied := getMissingLocators(srcIndex, dstIndex)
219
220         // Copy each missing block to dst
221         err = copyBlocksToDst(toBeCopied)
222
223         return err
224 }
225
226 // Get list of unique locators from the specified cluster
227 func getUniqueLocators(kc *keepclient.KeepClient, indexPrefix string) (map[string]bool, error) {
228         var indexBytes []byte
229
230         for uuid := range kc.LocalRoots() {
231                 reader, err := kc.GetIndex(uuid, indexPrefix)
232                 if err != nil {
233                         return nil, err
234                 }
235
236                 var readBytes []byte
237                 readBytes, err = ioutil.ReadAll(reader)
238                 if err != nil {
239                         return nil, err
240                 }
241
242                 indexBytes = append(indexBytes, readBytes...)
243         }
244
245         // Got index; Now dedup it
246         locators := bytes.Split(indexBytes, []byte("\n"))
247
248         uniqueLocators := map[string]bool{}
249         for _, loc := range locators {
250                 if len(loc) == 0 {
251                         continue
252                 }
253
254                 locator := string(bytes.Split(loc, []byte(" "))[0])
255                 if _, ok := uniqueLocators[locator]; !ok {
256                         uniqueLocators[locator] = true
257                 }
258         }
259         return uniqueLocators, nil
260 }
261
262 // Get list of locators that are in src but not in dst
263 func getMissingLocators(srcLocators map[string]bool, dstLocators map[string]bool) []string {
264         var missingLocators []string
265         for locator := range srcLocators {
266                 if _, ok := dstLocators[locator]; !ok {
267                         missingLocators = append(missingLocators, locator)
268                 }
269         }
270         return missingLocators
271 }
272
273 // Copy blocks from src to dst; only those that are missing in dst are copied
274 func copyBlocksToDst(toBeCopied []string) error {
275         done := 0
276         total := len(toBeCopied)
277
278         for _, locator := range toBeCopied {
279                 log.Printf("Getting block %d of %d", done+1, total)
280
281                 log.Printf("Getting block: %v", locator)
282
283                 getLocator := locator
284                 expiresAt := time.Now().AddDate(0, 0, 1)
285                 if blobSigningKey != "" {
286                         getLocator = keepclient.SignLocator(getLocator, arvSrc.ApiToken, expiresAt, []byte(blobSigningKey))
287                 }
288
289                 reader, _, _, err := kcSrc.Get(getLocator)
290                 if err != nil {
291                         log.Printf("Error getting block: %q %v", locator, err)
292                         return err
293                 }
294                 data, err := ioutil.ReadAll(reader)
295                 if err != nil {
296                         log.Printf("Error reading block data: %q %v", locator, err)
297                         return err
298                 }
299
300                 log.Printf("Copying block: %q", locator)
301                 _, _, err = kcDst.PutB(data)
302                 if err != nil {
303                         log.Printf("Error putting block data: %q %v", locator, err)
304                         return err
305                 }
306
307                 done++
308                 log.Printf("%.2f%% done", float64(done)/float64(total)*100)
309         }
310
311         log.Printf("Successfully copied to destination %d blocks.", total)
312         return nil
313 }