6 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7 "git.curoverse.com/arvados.git/sdk/go/keepclient"
15 // keep-rsync arguments
17 srcConfig arvadosclient.APIConfig
18 dstConfig arvadosclient.APIConfig
20 srcKeepServicesJSON string
21 dstKeepServicesJSON string
27 var srcConfigFile string
28 var dstConfigFile string
34 "Source configuration filename with full path that contains "+
35 "an ARVADOS_API_TOKEN which is a valid datamanager token recognized by the source keep servers, "+
36 "ARVADOS_API_HOST, ARVADOS_API_HOST_INSECURE, ARVADOS_EXTERNAL_CLIENT and ARVADOS_BLOB_SIGNING_KEY.")
42 "Destination configuration filename with full path that contains "+
43 "an ARVADOS_API_TOKEN which is a valid datamanager token recognized by the destination keep servers, "+
44 "ARVADOS_API_HOST, ARVADOS_API_HOST_INSECURE, ARVADOS_EXTERNAL_CLIENT and ARVADOS_BLOB_SIGNING_KEY.")
48 "src-keep-services-json",
50 "An optional list of available source keepservices. "+
51 "If not provided, this list is obtained from api server configured in src-config-file.")
55 "dst-keep-services-json",
57 "An optional list of available destination keepservices. "+
58 "If not provided, this list is obtained from api server configured in dst-config-file.")
64 "Number of replications to write to the destination.")
77 if srcConfigFile == "" {
78 log.Fatal("-src-config-file must be specified.")
80 srcConfig, err = readConfigFromFile(srcConfigFile)
82 log.Fatal("Error reading source configuration: %s", err.Error())
85 if dstConfigFile == "" {
86 log.Fatal("-dst-config-file must be specified.")
88 dstConfig, err = readConfigFromFile(dstConfigFile)
90 log.Fatal("Error reading destination configuration: %s", err.Error())
93 // Initialize keep-rsync
94 err = initializeKeepRsync()
96 log.Fatal("Error configuring keep-rsync: %s", err.Error())
99 // Copy blocks not found in dst from src
100 err = performKeepRsync()
102 log.Fatal("Error while syncing data: %s", err.Error())
106 var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
108 // Reads config from file
109 func readConfigFromFile(filename string) (arvadosclient.APIConfig, error) {
110 var config arvadosclient.APIConfig
112 content, err := ioutil.ReadFile(filename)
117 lines := strings.Split(string(content), "\n")
118 for _, line := range lines {
122 kv := strings.Split(line, "=")
125 case "ARVADOS_API_TOKEN":
126 config.APIToken = kv[1]
127 case "ARVADOS_API_HOST":
128 config.APIHost = kv[1]
129 case "ARVADOS_API_HOST_INSECURE":
130 config.APIHostInsecure = matchTrue.MatchString(kv[1])
131 case "ARVADOS_EXTERNAL_CLIENT":
132 config.ExternalClient = matchTrue.MatchString(kv[1])
133 case "ARVADOS_BLOB_SIGNING_KEY":
134 blobSigningKey = kv[1]
140 // keep-rsync source and destination clients
142 arvSrc arvadosclient.ArvadosClient
143 arvDst arvadosclient.ArvadosClient
144 kcSrc *keepclient.KeepClient
145 kcDst *keepclient.KeepClient
148 // Initializes keep-rsync using the config provided
149 func initializeKeepRsync() (err error) {
150 // arvSrc from srcConfig
151 arvSrc, err = arvadosclient.New(srcConfig)
156 // arvDst from dstConfig
157 arvDst, err = arvadosclient.New(dstConfig)
162 // Get default replications value from destination, if it is not already provided
163 if replications == 0 {
164 value, err := arvDst.Discovery("defaultCollectionReplication")
166 replications = int(value.(float64))
172 // if srcKeepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
173 if srcKeepServicesJSON == "" {
174 kcSrc, err = keepclient.MakeKeepClient(&arvSrc)
179 kcSrc, err = keepclient.MakeKeepClientFromJSON(&arvSrc, srcKeepServicesJSON)
185 // if dstKeepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
186 if dstKeepServicesJSON == "" {
187 kcDst, err = keepclient.MakeKeepClient(&arvDst)
192 kcDst, err = keepclient.MakeKeepClientFromJSON(&arvDst, dstKeepServicesJSON)
197 kcDst.Want_replicas = replications
202 // Get unique block locators from src and dst
203 // Copy any blocks missing in dst
204 func performKeepRsync() error {
205 // Get unique locators from src
206 srcIndex, err := getUniqueLocators(kcSrc, prefix)
211 // Get unique locators from dst
212 dstIndex, err := getUniqueLocators(kcDst, prefix)
217 // Get list of locators found in src, but missing in dst
218 toBeCopied := getMissingLocators(srcIndex, dstIndex)
220 // Copy each missing block to dst
221 err = copyBlocksToDst(toBeCopied)
226 // Get list of unique locators from the specified cluster
227 func getUniqueLocators(kc *keepclient.KeepClient, indexPrefix string) (map[string]bool, error) {
228 var indexBytes []byte
230 for uuid := range kc.LocalRoots() {
231 reader, err := kc.GetIndex(uuid, indexPrefix)
237 readBytes, err = ioutil.ReadAll(reader)
242 indexBytes = append(indexBytes, readBytes...)
245 // Got index; Now dedup it
246 locators := bytes.Split(indexBytes, []byte("\n"))
248 uniqueLocators := map[string]bool{}
249 for _, loc := range locators {
254 locator := string(bytes.Split(loc, []byte(" "))[0])
255 if _, ok := uniqueLocators[locator]; !ok {
256 uniqueLocators[locator] = true
259 return uniqueLocators, nil
262 // Get list of locators that are in src but not in dst
263 func getMissingLocators(srcLocators map[string]bool, dstLocators map[string]bool) []string {
264 var missingLocators []string
265 for locator := range srcLocators {
266 if _, ok := dstLocators[locator]; !ok {
267 missingLocators = append(missingLocators, locator)
270 return missingLocators
273 // Copy blocks from src to dst; only those that are missing in dst are copied
274 func copyBlocksToDst(toBeCopied []string) error {
276 total := len(toBeCopied)
278 for _, locator := range toBeCopied {
279 log.Printf("Getting block %d of %d", done+1, total)
281 log.Printf("Getting block: %v", locator)
283 getLocator := locator
284 expiresAt := time.Now().AddDate(0, 0, 1)
285 if blobSigningKey != "" {
286 getLocator = keepclient.SignLocator(getLocator, arvSrc.ApiToken, expiresAt, []byte(blobSigningKey))
289 reader, _, _, err := kcSrc.Get(getLocator)
291 log.Printf("Error getting block: %q %v", locator, err)
294 data, err := ioutil.ReadAll(reader)
296 log.Printf("Error reading block data: %q %v", locator, err)
300 log.Printf("Copying block: %q", locator)
301 _, _, err = kcDst.PutB(data)
303 log.Printf("Error putting block data: %q %v", locator, err)
308 log.Printf("%.2f%% done", float64(done)/float64(total)*100)
311 log.Printf("Successfully copied to destination %d blocks.", total)