7 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8 "git.curoverse.com/arvados.git/sdk/go/keepclient"
16 // keep-rsync arguments
18 srcConfig arvadosclient.APIConfig
19 dstConfig arvadosclient.APIConfig
21 srcKeepServicesJSON string
22 dstKeepServicesJSON string
28 var srcConfigFile string
29 var dstConfigFile string
35 "Source configuration filename with full path that contains "+
36 "an ARVADOS_API_TOKEN which is a valid datamanager token recognized by the source keep servers, "+
37 "ARVADOS_API_HOST, ARVADOS_API_HOST_INSECURE, ARVADOS_EXTERNAL_CLIENT and ARVADOS_BLOB_SIGNING_KEY.")
43 "Destination configuration filename with full path that contains "+
44 "an ARVADOS_API_TOKEN which is a valid datamanager token recognized by the destination keep servers, "+
45 "ARVADOS_API_HOST, ARVADOS_API_HOST_INSECURE, ARVADOS_EXTERNAL_CLIENT and ARVADOS_BLOB_SIGNING_KEY.")
49 "src-keep-services-json",
51 "An optional list of available source keepservices. "+
52 "If not provided, this list is obtained from api server configured in src-config-file.")
56 "dst-keep-services-json",
58 "An optional list of available destination keepservices. "+
59 "If not provided, this list is obtained from api server configured in dst-config-file.")
65 "Number of replications to write to the destination.")
78 if srcConfigFile == "" {
79 log.Fatal("-src-config-file must be specified.")
81 srcConfig, err = readConfigFromFile(srcConfigFile)
83 log.Fatal("Error reading source configuration: %s", err.Error())
86 if dstConfigFile == "" {
87 log.Fatal("-dst-config-file must be specified.")
89 dstConfig, err = readConfigFromFile(dstConfigFile)
91 log.Fatal("Error reading destination configuration: %s", err.Error())
94 // Initialize keep-rsync
95 err = initializeKeepRsync()
97 log.Fatal("Error configuring keep-rsync: %s", err.Error())
100 // Copy blocks not found in dst from src
101 err = performKeepRsync()
103 log.Fatal("Error while syncing data: %s", err.Error())
107 var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
109 // Reads config from file
110 func readConfigFromFile(filename string) (arvadosclient.APIConfig, error) {
111 var config arvadosclient.APIConfig
113 content, err := ioutil.ReadFile(filename)
118 lines := strings.Split(string(content), "\n")
119 for _, line := range lines {
123 kv := strings.Split(line, "=")
126 case "ARVADOS_API_TOKEN":
127 config.APIToken = kv[1]
128 case "ARVADOS_API_HOST":
129 config.APIHost = kv[1]
130 case "ARVADOS_API_HOST_INSECURE":
131 config.APIHostInsecure = matchTrue.MatchString(kv[1])
132 case "ARVADOS_EXTERNAL_CLIENT":
133 config.ExternalClient = matchTrue.MatchString(kv[1])
134 case "ARVADOS_BLOB_SIGNING_KEY":
135 blobSigningKey = kv[1]
141 // keep-rsync source and destination clients
143 arvSrc arvadosclient.ArvadosClient
144 arvDst arvadosclient.ArvadosClient
145 kcSrc *keepclient.KeepClient
146 kcDst *keepclient.KeepClient
149 // Initializes keep-rsync using the config provided
150 func initializeKeepRsync() (err error) {
151 // arvSrc from srcConfig
152 arvSrc, err = arvadosclient.New(srcConfig)
157 // arvDst from dstConfig
158 arvDst, err = arvadosclient.New(dstConfig)
163 // Get default replications value from destination, if it is not already provided
164 if replications == 0 {
165 value, err := arvDst.Discovery("defaultCollectionReplication")
167 replications = int(value.(float64))
173 // if srcKeepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
174 if srcKeepServicesJSON == "" {
175 kcSrc, err = keepclient.MakeKeepClient(&arvSrc)
180 kcSrc, err = keepclient.MakeKeepClientFromJSON(&arvSrc, srcKeepServicesJSON)
186 // if dstKeepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
187 if dstKeepServicesJSON == "" {
188 kcDst, err = keepclient.MakeKeepClient(&arvDst)
193 kcDst, err = keepclient.MakeKeepClientFromJSON(&arvDst, dstKeepServicesJSON)
198 kcDst.Want_replicas = replications
203 // Get unique block locators from src and dst
204 // Copy any blocks missing in dst
205 func performKeepRsync() error {
206 // Get unique locators from src
207 srcIndex, err := getUniqueLocators(kcSrc, prefix)
212 // Get unique locators from dst
213 dstIndex, err := getUniqueLocators(kcDst, prefix)
218 // Get list of locators found in src, but missing in dst
219 toBeCopied := getMissingLocators(srcIndex, dstIndex)
221 // Copy each missing block to dst
222 err = copyBlocksToDst(toBeCopied)
227 // Get list of unique locators from the specified cluster
228 func getUniqueLocators(kc *keepclient.KeepClient, indexPrefix string) (map[string]bool, error) {
229 var indexBytes []byte
231 for uuid := range kc.LocalRoots() {
232 reader, err := kc.GetIndex(uuid, indexPrefix)
238 readBytes, err = ioutil.ReadAll(reader)
243 indexBytes = append(indexBytes, readBytes...)
246 // Got index; Now dedup it
247 locators := bytes.Split(indexBytes, []byte("\n"))
249 uniqueLocators := map[string]bool{}
250 for _, loc := range locators {
255 locator := string(bytes.Split(loc, []byte(" "))[0])
256 if _, ok := uniqueLocators[locator]; !ok {
257 uniqueLocators[locator] = true
260 return uniqueLocators, nil
263 // Get list of locators that are in src but not in dst
264 func getMissingLocators(srcLocators map[string]bool, dstLocators map[string]bool) []string {
265 var missingLocators []string
266 for locator := range srcLocators {
267 if _, ok := dstLocators[locator]; !ok {
268 missingLocators = append(missingLocators, locator)
271 return missingLocators
274 // Copy blocks from src to dst; only those that are missing in dst are copied
275 func copyBlocksToDst(toBeCopied []string) error {
277 total := len(toBeCopied)
279 for _, locator := range toBeCopied {
280 log.Printf("Getting block %d of %d", done+1, total)
282 log.Printf("Getting block: %v", locator)
284 getLocator := locator
285 expiresAt := time.Now().AddDate(0, 0, 1)
286 if blobSigningKey != "" {
287 getLocator = keepclient.SignLocator(getLocator, arvSrc.ApiToken, expiresAt, []byte(blobSigningKey))
290 reader, _, _, err := kcSrc.Get(getLocator)
292 log.Printf("Error getting block: %q %v", locator, err)
295 data, err := ioutil.ReadAll(reader)
297 log.Printf("Error reading block data: %q %v", locator, err)
301 log.Printf("Copying block: %q", locator)
302 _, rep, err := kcDst.PutB(data)
304 log.Printf("Error putting block data: %q %v", locator, err)
307 if rep != replications {
308 log.Printf("Failed to put enough number of replicas. Wanted: %d; Put: %d", replications, rep)
309 return errors.New("Failed to put enough number of replicas")
313 log.Printf("%.2f%% done", float64(done)/float64(total)*100)
316 log.Printf("Successfully copied to destination %d blocks.", total)