9 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
10 "git.curoverse.com/arvados.git/sdk/go/keepclient"
21 var srcConfigFile, dstConfigFile, srcKeepServicesJSON, dstKeepServicesJSON, prefix string
23 var srcBlobSigningKey string
29 "Source configuration filename with full path that contains "+
30 "an ARVADOS_API_TOKEN which is a valid datamanager token recognized by the source keep servers, "+
31 "ARVADOS_API_HOST, ARVADOS_API_HOST_INSECURE, ARVADOS_EXTERNAL_CLIENT and ARVADOS_BLOB_SIGNING_KEY.")
37 "Destination configuration filename with full path that contains "+
38 "an ARVADOS_API_TOKEN which is a valid datamanager token recognized by the destination keep servers, "+
39 "ARVADOS_API_HOST, ARVADOS_API_HOST_INSECURE, ARVADOS_EXTERNAL_CLIENT and ARVADOS_BLOB_SIGNING_KEY.")
43 "src-keep-services-json",
45 "An optional list of available source keepservices. "+
46 "If not provided, this list is obtained from api server configured in src-config-file.")
50 "dst-keep-services-json",
52 "An optional list of available destination keepservices. "+
53 "If not provided, this list is obtained from api server configured in dst-config-file.")
59 "Number of replications to write to the destination. If replications not specified, "+
60 "default replication level configured on destination server will be used.")
70 srcConfig, srcBlobSigningKey, err := loadConfig(srcConfigFile)
72 log.Fatalf("Error loading src configuration from file: %s", err.Error())
75 dstConfig, _, err := loadConfig(dstConfigFile)
77 log.Fatalf("Error loading dst configuration from file: %s", err.Error())
80 // setup src and dst keepclients
81 kcSrc, err := setupKeepClient(srcConfig, srcKeepServicesJSON, false, 0)
83 log.Fatalf("Error configuring src keepclient: %s", err.Error())
86 kcDst, err := setupKeepClient(dstConfig, dstKeepServicesJSON, true, replications)
88 log.Fatalf("Error configuring dst keepclient: %s", err.Error())
91 // Copy blocks not found in dst from src
92 err = performKeepRsync(kcSrc, kcDst, srcBlobSigningKey, prefix)
94 log.Fatalf("Error while syncing data: %s", err.Error())
98 type apiConfig struct {
105 // Load src and dst config from given files
106 func loadConfig(configFile string) (config apiConfig, blobSigningKey string, err error) {
107 if configFile == "" {
108 return config, blobSigningKey, errors.New("config file not specified")
111 config, blobSigningKey, err = readConfigFromFile(configFile)
113 return config, blobSigningKey, fmt.Errorf("Error reading config file: %v", err)
119 var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
121 // Read config from file
122 func readConfigFromFile(filename string) (config apiConfig, blobSigningKey string, err error) {
123 if !strings.Contains(filename, "/") {
124 filename = os.Getenv("HOME") + "/.config/arvados/" + filename
127 content, err := ioutil.ReadFile(filename)
130 return config, "", err
133 lines := strings.Split(string(content), "\n")
134 for _, line := range lines {
139 kv := strings.SplitN(line, "=", 2)
140 key := strings.TrimSpace(kv[0])
141 value := strings.TrimSpace(kv[1])
144 case "ARVADOS_API_TOKEN":
145 config.APIToken = value
146 case "ARVADOS_API_HOST":
147 config.APIHost = value
148 case "ARVADOS_API_HOST_INSECURE":
149 config.APIHostInsecure = matchTrue.MatchString(value)
150 case "ARVADOS_EXTERNAL_CLIENT":
151 config.ExternalClient = matchTrue.MatchString(value)
152 case "ARVADOS_BLOB_SIGNING_KEY":
153 blobSigningKey = value
159 // setup keepclient using the config provided
160 func setupKeepClient(config apiConfig, keepServicesJSON string, isDst bool, replications int) (kc *keepclient.KeepClient, err error) {
161 arv := arvadosclient.ArvadosClient{
162 ApiToken: config.APIToken,
163 ApiServer: config.APIHost,
164 ApiInsecure: config.APIHostInsecure,
165 Client: &http.Client{Transport: &http.Transport{
166 TLSClientConfig: &tls.Config{InsecureSkipVerify: config.APIHostInsecure}}},
167 External: config.ExternalClient,
170 // if keepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
171 if keepServicesJSON == "" {
172 kc, err = keepclient.MakeKeepClient(&arv)
177 kc = keepclient.New(&arv)
178 err = kc.LoadKeepServicesFromJSON(keepServicesJSON)
185 // Get default replications value from destination, if it is not already provided
186 if replications == 0 {
187 value, err := arv.Discovery("defaultCollectionReplication")
189 replications = int(value.(float64))
195 kc.Want_replicas = replications
201 // Get unique block locators from src and dst
202 // Copy any blocks missing in dst
203 func performKeepRsync(kcSrc, kcDst *keepclient.KeepClient, blobSigningKey, prefix string) error {
204 // Get unique locators from src
205 srcIndex, err := getUniqueLocators(kcSrc, prefix)
210 // Get unique locators from dst
211 dstIndex, err := getUniqueLocators(kcDst, prefix)
216 // Get list of locators found in src, but missing in dst
217 toBeCopied := getMissingLocators(srcIndex, dstIndex)
219 // Copy each missing block to dst
220 err = copyBlocksToDst(toBeCopied, kcSrc, kcDst, blobSigningKey)
225 // Get list of unique locators from the specified cluster
226 func getUniqueLocators(kc *keepclient.KeepClient, prefix string) (map[string]bool, error) {
227 uniqueLocators := map[string]bool{}
229 // Get index and dedup
230 for uuid := range kc.LocalRoots() {
231 reader, err := kc.GetIndex(uuid, prefix)
233 return uniqueLocators, err
235 scanner := bufio.NewScanner(reader)
237 uniqueLocators[strings.Split(scanner.Text(), " ")[0]] = true
241 return uniqueLocators, nil
244 // Get list of locators that are in src but not in dst
245 func getMissingLocators(srcLocators, dstLocators map[string]bool) []string {
246 var missingLocators []string
247 for locator := range srcLocators {
248 if _, ok := dstLocators[locator]; !ok {
249 missingLocators = append(missingLocators, locator)
252 return missingLocators
255 // Copy blocks from src to dst; only those that are missing in dst are copied
256 func copyBlocksToDst(toBeCopied []string, kcSrc, kcDst *keepclient.KeepClient, blobSigningKey string) error {
258 total := len(toBeCopied)
260 for _, locator := range toBeCopied {
261 log.Printf("Getting block %d of %d: %v", done+1, total, locator)
263 getLocator := locator
264 expiresAt := time.Now().AddDate(0, 0, 1)
265 if blobSigningKey != "" {
266 getLocator = keepclient.SignLocator(getLocator, kcSrc.Arvados.ApiToken, expiresAt, []byte(blobSigningKey))
269 reader, _, _, err := kcSrc.Get(getLocator)
271 return fmt.Errorf("Error getting block: %v %v", locator, err)
273 data, err := ioutil.ReadAll(reader)
275 return fmt.Errorf("Error reading block data: %v %v", locator, err)
278 log.Printf("Writing block%d of %d: %v", locator)
279 _, _, err = kcDst.PutB(data)
281 return fmt.Errorf("Error putting block data: %v %v", locator, err)
285 log.Printf("%.2f%% done", float64(done)/float64(total)*100)
288 log.Printf("Successfully copied to destination %d blocks.", total)