6 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7 "git.curoverse.com/arvados.git/sdk/go/keepclient"
15 // keep-rsync arguments
17 srcConfig arvadosclient.APIConfig
18 dstConfig arvadosclient.APIConfig
20 srcKeepServicesJSON string
21 dstKeepServicesJSON string
27 var srcConfigFile string
28 var dstConfigFile string
34 "Source configuration filename with full path that contains "+
35 "an ARVADOS_API_TOKEN which is a valid datamanager token recognized by the source keep servers, "+
36 "ARVADOS_API_HOST, ARVADOS_API_HOST_INSECURE, ARVADOS_EXTERNAL_CLIENT and ARVADOS_BLOB_SIGNING_KEY.")
42 "Destination configuration filename with full path that contains "+
43 "an ARVADOS_API_TOKEN which is a valid datamanager token recognized by the destination keep servers, "+
44 "ARVADOS_API_HOST, ARVADOS_API_HOST_INSECURE, ARVADOS_EXTERNAL_CLIENT and ARVADOS_BLOB_SIGNING_KEY.")
48 "src-keep-services-json",
50 "An optional list of available source keepservices. "+
51 "If not provided, this list is obtained from api server configured in src-config-file.")
55 "dst-keep-services-json",
57 "An optional list of available destination keepservices. "+
58 "If not provided, this list is obtained from api server configured in dst-config-file.")
64 "Number of replications to write to the destination.")
77 if srcConfigFile == "" {
78 log.Fatal("-src-config-file must be specified.")
80 srcConfig, err = readConfigFromFile(srcConfigFile)
82 log.Fatal("Error reading source configuration: %s", err.Error())
85 if dstConfigFile == "" {
86 log.Fatal("-dst-config-file must be specified.")
88 dstConfig, err = readConfigFromFile(dstConfigFile)
90 log.Fatal("Error reading destination configuration: %s", err.Error())
93 // Initialize keep-rsync
94 err = initializeKeepRsync()
96 log.Fatal("Error configuring keep-rsync: %s", err.Error())
99 // Copy blocks not found in dst from src
103 var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
105 // Reads config from file
106 func readConfigFromFile(filename string) (arvadosclient.APIConfig, error) {
107 var config arvadosclient.APIConfig
109 content, err := ioutil.ReadFile(filename)
114 lines := strings.Split(string(content), "\n")
115 for _, line := range lines {
119 kv := strings.Split(line, "=")
122 case "ARVADOS_API_TOKEN":
123 config.APIToken = kv[1]
124 case "ARVADOS_API_HOST":
125 config.APIHost = kv[1]
126 case "ARVADOS_API_HOST_INSECURE":
127 config.APIHostInsecure = matchTrue.MatchString(kv[1])
128 case "ARVADOS_EXTERNAL_CLIENT":
129 config.ExternalClient = matchTrue.MatchString(kv[1])
130 case "ARVADOS_BLOB_SIGNING_KEY":
131 blobSigningKey = kv[1]
137 // keep-rsync source and destination clients
139 arvSrc arvadosclient.ArvadosClient
140 arvDst arvadosclient.ArvadosClient
141 kcSrc *keepclient.KeepClient
142 kcDst *keepclient.KeepClient
145 // Initializes keep-rsync using the config provided
146 func initializeKeepRsync() (err error) {
147 // arvSrc from srcConfig
148 arvSrc, err = arvadosclient.New(srcConfig)
153 // arvDst from dstConfig
154 arvDst, err = arvadosclient.New(dstConfig)
159 // Get default replications value from destination, if it is not already provided
160 if replications == 0 {
161 value, err := arvDst.Discovery("defaultCollectionReplication")
163 replications = int(value.(float64))
169 // if srcKeepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
170 if srcKeepServicesJSON == "" {
171 kcSrc, err = keepclient.MakeKeepClient(&arvSrc)
176 kcSrc, err = keepclient.MakeKeepClientFromJSON(&arvSrc, srcKeepServicesJSON)
182 // if dstKeepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
183 if dstKeepServicesJSON == "" {
184 kcDst, err = keepclient.MakeKeepClient(&arvDst)
189 kcDst, err = keepclient.MakeKeepClientFromJSON(&arvDst, dstKeepServicesJSON)
194 kcDst.Want_replicas = replications
199 // Get unique block locators from src and dst
200 // Copy any blocks missing in dst
201 func performKeepRsync() error {
202 // Get unique locators from src
203 srcIndex, err := getUniqueLocators(kcSrc, prefix)
208 // Get unique locators from dst
209 dstIndex, err := getUniqueLocators(kcDst, prefix)
214 // Get list of locators found in src, but missing in dst
215 toBeCopied := getMissingLocators(srcIndex, dstIndex)
217 // Copy each missing block to dst
218 copyBlocksToDst(toBeCopied)
223 // Get list of unique locators from the specified cluster
224 func getUniqueLocators(kc *keepclient.KeepClient, prefix string) (map[string]bool, error) {
225 var indexBytes []byte
227 for uuid := range kc.LocalRoots() {
228 reader, err := kc.GetIndex(uuid, prefix)
234 readBytes, err = ioutil.ReadAll(reader)
239 indexBytes = append(indexBytes, readBytes...)
242 // Got index; Now dedup it
243 locators := bytes.Split(indexBytes, []byte("\n"))
245 uniqueLocators := map[string]bool{}
246 for _, loc := range locators {
251 locator := string(bytes.Split(loc, []byte(" "))[0])
252 if _, ok := uniqueLocators[locator]; !ok {
253 uniqueLocators[locator] = true
256 return uniqueLocators, nil
259 // Get list of locators that are in src but not in dst
260 func getMissingLocators(srcLocators map[string]bool, dstLocators map[string]bool) []string {
261 var missingLocators []string
262 for locator := range srcLocators {
263 if _, ok := dstLocators[locator]; !ok {
264 missingLocators = append(missingLocators, locator)
267 return missingLocators
270 // Copy blocks from src to dst; only those that are missing in dst are copied
271 func copyBlocksToDst(toBeCopied []string) {
273 total := len(toBeCopied)
276 for _, locator := range toBeCopied {
277 log.Printf("Getting block %d of %d", done+1, total)
279 log.Printf("Getting block: %v", locator)
281 getLocator := locator
282 expiresAt := time.Now().AddDate(0, 0, 1)
283 if blobSigningKey != "" {
284 getLocator = keepclient.SignLocator(getLocator, arvSrc.ApiToken, expiresAt, []byte(blobSigningKey))
287 reader, _, _, err := kcSrc.Get(getLocator)
289 log.Printf("Error getting block: %q %v", locator, err)
290 failed = append(failed, locator)
293 data, err := ioutil.ReadAll(reader)
295 log.Printf("Error reading block data: %q %v", locator, err)
296 failed = append(failed, locator)
300 log.Printf("Copying block: %q", locator)
301 _, rep, err := kcDst.PutB(data)
303 log.Printf("Error putting block data: %q %v", locator, err)
304 failed = append(failed, locator)
307 if rep != replications {
308 log.Printf("Failed to put enough number of replicas. Wanted: %d; Put: %d", replications, rep)
309 failed = append(failed, locator)
314 log.Printf("%.2f%% done", float64(done)/float64(total)*100)
317 log.Printf("Successfully copied to destination %d and failed %d out of a total of %d", done, len(failed), total)
318 log.Printf("Failed blocks %v", failed)