226b597e69f684f898d9bc627a0beb263bc7cf0d
[arvados.git] / tools / keep-rsync / keep-rsync.go
1 package main
2
3 import (
4         "bytes"
5         "errors"
6         "flag"
7         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8         "git.curoverse.com/arvados.git/sdk/go/keepclient"
9         "io/ioutil"
10         "log"
11         "regexp"
12         "strings"
13         "time"
14 )
15
16 // keep-rsync arguments
17 var (
18         srcConfig           arvadosclient.APIConfig
19         dstConfig           arvadosclient.APIConfig
20         blobSigningKey      string
21         srcKeepServicesJSON string
22         dstKeepServicesJSON string
23         replications        int
24         prefix              string
25 )
26
27 func main() {
28         var srcConfigFile string
29         var dstConfigFile string
30
31         flag.StringVar(
32                 &srcConfigFile,
33                 "src-config-file",
34                 "",
35                 "Source configuration filename with full path that contains "+
36                         "an ARVADOS_API_TOKEN which is a valid datamanager token recognized by the source keep servers, "+
37                         "ARVADOS_API_HOST, ARVADOS_API_HOST_INSECURE, ARVADOS_EXTERNAL_CLIENT and ARVADOS_BLOB_SIGNING_KEY.")
38
39         flag.StringVar(
40                 &dstConfigFile,
41                 "dst-config-file",
42                 "",
43                 "Destination configuration filename with full path that contains "+
44                         "an ARVADOS_API_TOKEN which is a valid datamanager token recognized by the destination keep servers, "+
45                         "ARVADOS_API_HOST, ARVADOS_API_HOST_INSECURE, ARVADOS_EXTERNAL_CLIENT and ARVADOS_BLOB_SIGNING_KEY.")
46
47         flag.StringVar(
48                 &srcKeepServicesJSON,
49                 "src-keep-services-json",
50                 "",
51                 "An optional list of available source keepservices. "+
52                         "If not provided, this list is obtained from api server configured in src-config-file.")
53
54         flag.StringVar(
55                 &dstKeepServicesJSON,
56                 "dst-keep-services-json",
57                 "",
58                 "An optional list of available destination keepservices. "+
59                         "If not provided, this list is obtained from api server configured in dst-config-file.")
60
61         flag.IntVar(
62                 &replications,
63                 "replications",
64                 0,
65                 "Number of replications to write to the destination.")
66
67         flag.StringVar(
68                 &prefix,
69                 "prefix",
70                 "",
71                 "Index prefix")
72
73         flag.Parse()
74
75         var err error
76
77         // Load config
78         if srcConfigFile == "" {
79                 log.Fatal("-src-config-file must be specified.")
80         }
81         srcConfig, err = readConfigFromFile(srcConfigFile)
82         if err != nil {
83                 log.Fatal("Error reading source configuration: %s", err.Error())
84         }
85
86         if dstConfigFile == "" {
87                 log.Fatal("-dst-config-file must be specified.")
88         }
89         dstConfig, err = readConfigFromFile(dstConfigFile)
90         if err != nil {
91                 log.Fatal("Error reading destination configuration: %s", err.Error())
92         }
93
94         // Initialize keep-rsync
95         err = initializeKeepRsync()
96         if err != nil {
97                 log.Fatal("Error configuring keep-rsync: %s", err.Error())
98         }
99
100         // Copy blocks not found in dst from src
101         err = performKeepRsync()
102         if err != nil {
103                 log.Fatal("Error while syncing data: %s", err.Error())
104         }
105 }
106
107 var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
108
109 // Reads config from file
110 func readConfigFromFile(filename string) (arvadosclient.APIConfig, error) {
111         var config arvadosclient.APIConfig
112
113         content, err := ioutil.ReadFile(filename)
114         if err != nil {
115                 return config, err
116         }
117
118         lines := strings.Split(string(content), "\n")
119         for _, line := range lines {
120                 if line == "" {
121                         continue
122                 }
123                 kv := strings.Split(line, "=")
124
125                 switch kv[0] {
126                 case "ARVADOS_API_TOKEN":
127                         config.APIToken = kv[1]
128                 case "ARVADOS_API_HOST":
129                         config.APIHost = kv[1]
130                 case "ARVADOS_API_HOST_INSECURE":
131                         config.APIHostInsecure = matchTrue.MatchString(kv[1])
132                 case "ARVADOS_EXTERNAL_CLIENT":
133                         config.ExternalClient = matchTrue.MatchString(kv[1])
134                 case "ARVADOS_BLOB_SIGNING_KEY":
135                         blobSigningKey = kv[1]
136                 }
137         }
138         return config, nil
139 }
140
141 // keep-rsync source and destination clients
142 var (
143         arvSrc arvadosclient.ArvadosClient
144         arvDst arvadosclient.ArvadosClient
145         kcSrc  *keepclient.KeepClient
146         kcDst  *keepclient.KeepClient
147 )
148
149 // Initializes keep-rsync using the config provided
150 func initializeKeepRsync() (err error) {
151         // arvSrc from srcConfig
152         arvSrc, err = arvadosclient.New(srcConfig)
153         if err != nil {
154                 return
155         }
156
157         // arvDst from dstConfig
158         arvDst, err = arvadosclient.New(dstConfig)
159         if err != nil {
160                 return
161         }
162
163         // Get default replications value from destination, if it is not already provided
164         if replications == 0 {
165                 value, err := arvDst.Discovery("defaultCollectionReplication")
166                 if err == nil {
167                         replications = int(value.(float64))
168                 } else {
169                         replications = 2
170                 }
171         }
172
173         // if srcKeepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
174         if srcKeepServicesJSON == "" {
175                 kcSrc, err = keepclient.MakeKeepClient(&arvSrc)
176                 if err != nil {
177                         return
178                 }
179         } else {
180                 kcSrc, err = keepclient.MakeKeepClientFromJSON(&arvSrc, srcKeepServicesJSON)
181                 if err != nil {
182                         return
183                 }
184         }
185
186         // if dstKeepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
187         if dstKeepServicesJSON == "" {
188                 kcDst, err = keepclient.MakeKeepClient(&arvDst)
189                 if err != nil {
190                         return
191                 }
192         } else {
193                 kcDst, err = keepclient.MakeKeepClientFromJSON(&arvDst, dstKeepServicesJSON)
194                 if err != nil {
195                         return
196                 }
197         }
198         kcDst.Want_replicas = replications
199
200         return
201 }
202
203 // Get unique block locators from src and dst
204 // Copy any blocks missing in dst
205 func performKeepRsync() error {
206         // Get unique locators from src
207         srcIndex, err := getUniqueLocators(kcSrc, prefix)
208         if err != nil {
209                 return err
210         }
211
212         // Get unique locators from dst
213         dstIndex, err := getUniqueLocators(kcDst, prefix)
214         if err != nil {
215                 return err
216         }
217
218         // Get list of locators found in src, but missing in dst
219         toBeCopied := getMissingLocators(srcIndex, dstIndex)
220
221         // Copy each missing block to dst
222         err = copyBlocksToDst(toBeCopied)
223
224         return err
225 }
226
227 // Get list of unique locators from the specified cluster
228 func getUniqueLocators(kc *keepclient.KeepClient, indexPrefix string) (map[string]bool, error) {
229         var indexBytes []byte
230
231         for uuid := range kc.LocalRoots() {
232                 reader, err := kc.GetIndex(uuid, indexPrefix)
233                 if err != nil {
234                         return nil, err
235                 }
236
237                 var readBytes []byte
238                 readBytes, err = ioutil.ReadAll(reader)
239                 if err != nil {
240                         return nil, err
241                 }
242
243                 indexBytes = append(indexBytes, readBytes...)
244         }
245
246         // Got index; Now dedup it
247         locators := bytes.Split(indexBytes, []byte("\n"))
248
249         uniqueLocators := map[string]bool{}
250         for _, loc := range locators {
251                 if len(loc) == 0 {
252                         continue
253                 }
254
255                 locator := string(bytes.Split(loc, []byte(" "))[0])
256                 if _, ok := uniqueLocators[locator]; !ok {
257                         uniqueLocators[locator] = true
258                 }
259         }
260         return uniqueLocators, nil
261 }
262
263 // Get list of locators that are in src but not in dst
264 func getMissingLocators(srcLocators map[string]bool, dstLocators map[string]bool) []string {
265         var missingLocators []string
266         for locator := range srcLocators {
267                 if _, ok := dstLocators[locator]; !ok {
268                         missingLocators = append(missingLocators, locator)
269                 }
270         }
271         return missingLocators
272 }
273
274 // Copy blocks from src to dst; only those that are missing in dst are copied
275 func copyBlocksToDst(toBeCopied []string) error {
276         done := 0
277         total := len(toBeCopied)
278
279         for _, locator := range toBeCopied {
280                 log.Printf("Getting block %d of %d", done+1, total)
281
282                 log.Printf("Getting block: %v", locator)
283
284                 getLocator := locator
285                 expiresAt := time.Now().AddDate(0, 0, 1)
286                 if blobSigningKey != "" {
287                         getLocator = keepclient.SignLocator(getLocator, arvSrc.ApiToken, expiresAt, []byte(blobSigningKey))
288                 }
289
290                 reader, _, _, err := kcSrc.Get(getLocator)
291                 if err != nil {
292                         log.Printf("Error getting block: %q %v", locator, err)
293                         return err
294                 }
295                 data, err := ioutil.ReadAll(reader)
296                 if err != nil {
297                         log.Printf("Error reading block data: %q %v", locator, err)
298                         return err
299                 }
300
301                 log.Printf("Copying block: %q", locator)
302                 _, rep, err := kcDst.PutB(data)
303                 if err != nil {
304                         log.Printf("Error putting block data: %q %v", locator, err)
305                         return err
306                 }
307                 if rep != replications {
308                         log.Printf("Failed to put enough number of replicas. Wanted: %d; Put: %d", replications, rep)
309                         return errors.New("Failed to put enough number of replicas")
310                 }
311
312                 done++
313                 log.Printf("%.2f%% done", float64(done)/float64(total)*100)
314         }
315
316         log.Printf("Successfully copied to destination %d blocks.", total)
317         return nil
318 }