7 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8 "git.curoverse.com/arvados.git/sdk/go/keepclient"
16 // keep-rsync arguments
18 srcConfig arvadosclient.APIConfig
19 dstConfig arvadosclient.APIConfig
21 srcKeepServicesJSON string
22 dstKeepServicesJSON string
27 var srcConfigFile string
28 var dstConfigFile string
35 "Source configuration filename with full path that contains "+
36 "an ARVADOS_API_TOKEN which is a valid datamanager token recognized by the source keep servers, "+
37 "ARVADOS_API_HOST, ARVADOS_API_HOST_INSECURE, ARVADOS_EXTERNAL_CLIENT and ARVADOS_BLOB_SIGNING_KEY.")
43 "Destination configuration filename with full path that contains "+
44 "an ARVADOS_API_TOKEN which is a valid datamanager token recognized by the destination keep servers, "+
45 "ARVADOS_API_HOST, ARVADOS_API_HOST_INSECURE, ARVADOS_EXTERNAL_CLIENT and ARVADOS_BLOB_SIGNING_KEY.")
49 "src-keep-services-json",
51 "An optional list of available source keepservices. "+
52 "If not provided, this list is obtained from api server configured in src-config-file.")
56 "dst-keep-services-json",
58 "An optional list of available destination keepservices. "+
59 "If not provided, this list is obtained from api server configured in dst-config-file.")
65 "Number of replications to write to the destination.")
79 log.Fatal("Error loading configuration from files: %s", err.Error())
82 // Initialize keep-rsync
83 err = initializeKeepRsync()
85 log.Fatal("Error configuring keep-rsync: %s", err.Error())
88 // Copy blocks not found in dst from src
89 err = performKeepRsync()
91 log.Fatal("Error while syncing data: %s", err.Error())
95 // Load src and dst config from given files
96 func loadConfig() error {
97 if srcConfigFile == "" {
98 return errors.New("-src-config-file must be specified")
103 srcConfig, err = readConfigFromFile(srcConfigFile)
105 log.Printf("Error reading source configuration: %s", err.Error())
109 if dstConfigFile == "" {
110 return errors.New("-dst-config-file must be specified")
112 dstConfig, err = readConfigFromFile(dstConfigFile)
114 log.Printf("Error reading destination configuration: %s", err.Error())
120 var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
122 // Reads config from file
123 func readConfigFromFile(filename string) (arvadosclient.APIConfig, error) {
124 var config arvadosclient.APIConfig
126 content, err := ioutil.ReadFile(filename)
131 lines := strings.Split(string(content), "\n")
132 for _, line := range lines {
136 kv := strings.Split(line, "=")
139 case "ARVADOS_API_TOKEN":
140 config.APIToken = kv[1]
141 case "ARVADOS_API_HOST":
142 config.APIHost = kv[1]
143 case "ARVADOS_API_HOST_INSECURE":
144 config.APIHostInsecure = matchTrue.MatchString(kv[1])
145 case "ARVADOS_EXTERNAL_CLIENT":
146 config.ExternalClient = matchTrue.MatchString(kv[1])
147 case "ARVADOS_BLOB_SIGNING_KEY":
148 blobSigningKey = kv[1]
154 // keep-rsync source and destination clients
156 arvSrc arvadosclient.ArvadosClient
157 arvDst arvadosclient.ArvadosClient
158 kcSrc *keepclient.KeepClient
159 kcDst *keepclient.KeepClient
162 // Initializes keep-rsync using the config provided
163 func initializeKeepRsync() (err error) {
164 // arvSrc from srcConfig
165 arvSrc, err = arvadosclient.New(srcConfig)
170 // arvDst from dstConfig
171 arvDst, err = arvadosclient.New(dstConfig)
176 // Get default replications value from destination, if it is not already provided
177 if replications == 0 {
178 value, err := arvDst.Discovery("defaultCollectionReplication")
180 replications = int(value.(float64))
186 // if srcKeepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
187 if srcKeepServicesJSON == "" {
188 kcSrc, err = keepclient.MakeKeepClient(&arvSrc)
193 kcSrc, err = keepclient.MakeKeepClientFromJSON(&arvSrc, srcKeepServicesJSON)
199 // if dstKeepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
200 if dstKeepServicesJSON == "" {
201 kcDst, err = keepclient.MakeKeepClient(&arvDst)
206 kcDst, err = keepclient.MakeKeepClientFromJSON(&arvDst, dstKeepServicesJSON)
211 kcDst.Want_replicas = replications
216 // Get unique block locators from src and dst
217 // Copy any blocks missing in dst
218 func performKeepRsync() error {
219 // Get unique locators from src
220 srcIndex, err := getUniqueLocators(kcSrc, prefix)
225 // Get unique locators from dst
226 dstIndex, err := getUniqueLocators(kcDst, prefix)
231 // Get list of locators found in src, but missing in dst
232 toBeCopied := getMissingLocators(srcIndex, dstIndex)
234 // Copy each missing block to dst
235 err = copyBlocksToDst(toBeCopied)
240 // Get list of unique locators from the specified cluster
241 func getUniqueLocators(kc *keepclient.KeepClient, indexPrefix string) (map[string]bool, error) {
242 var indexBytes []byte
244 for uuid := range kc.LocalRoots() {
245 reader, err := kc.GetIndex(uuid, indexPrefix)
251 readBytes, err = ioutil.ReadAll(reader)
256 indexBytes = append(indexBytes, readBytes...)
259 // Got index; Now dedup it
260 locators := bytes.Split(indexBytes, []byte("\n"))
262 uniqueLocators := map[string]bool{}
263 for _, loc := range locators {
268 locator := string(bytes.Split(loc, []byte(" "))[0])
269 if _, ok := uniqueLocators[locator]; !ok {
270 uniqueLocators[locator] = true
273 return uniqueLocators, nil
276 // Get list of locators that are in src but not in dst
277 func getMissingLocators(srcLocators map[string]bool, dstLocators map[string]bool) []string {
278 var missingLocators []string
279 for locator := range srcLocators {
280 if _, ok := dstLocators[locator]; !ok {
281 missingLocators = append(missingLocators, locator)
284 return missingLocators
287 // Copy blocks from src to dst; only those that are missing in dst are copied
288 func copyBlocksToDst(toBeCopied []string) error {
290 total := len(toBeCopied)
292 for _, locator := range toBeCopied {
293 log.Printf("Getting block %d of %d", done+1, total)
295 log.Printf("Getting block: %v", locator)
297 getLocator := locator
298 expiresAt := time.Now().AddDate(0, 0, 1)
299 if blobSigningKey != "" {
300 getLocator = keepclient.SignLocator(getLocator, arvSrc.ApiToken, expiresAt, []byte(blobSigningKey))
303 reader, _, _, err := kcSrc.Get(getLocator)
305 log.Printf("Error getting block: %q %v", locator, err)
308 data, err := ioutil.ReadAll(reader)
310 log.Printf("Error reading block data: %q %v", locator, err)
314 log.Printf("Copying block: %q", locator)
315 _, _, err = kcDst.PutB(data)
317 log.Printf("Error putting block data: %q %v", locator, err)
322 log.Printf("%.2f%% done", float64(done)/float64(total)*100)
325 log.Printf("Successfully copied to destination %d blocks.", total)