6 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7 "git.curoverse.com/arvados.git/sdk/go/keepclient"
14 // keep-rsync arguments
16 srcConfig arvadosclient.APIConfig
17 dstConfig arvadosclient.APIConfig
18 srcKeepServicesJSON string
19 dstKeepServicesJSON string
25 var srcConfigFile string
26 var dstConfigFile string
32 "Source configuration filename with full path that contains "+
33 "an ARVADOS_API_TOKEN which is a valid datamanager token recognized by the source keep servers, "+
34 "ARVADOS_API_HOST, ARVADOS_API_HOST_INSECURE, and ARVADOS_BLOB_SIGNING_KEY.")
40 "Destination configuration filename with full path that contains "+
41 "an ARVADOS_API_TOKEN which is a valid datamanager token recognized by the destination keep servers, "+
42 "ARVADOS_API_HOST, ARVADOS_API_HOST_INSECURE, and ARVADOS_BLOB_SIGNING_KEY.")
46 "src-keep-services-json",
48 "An optional list of available source keepservices. "+
49 "If not provided, this list is obtained from api server configured in src-config-file.")
53 "dst-keep-services-json",
55 "An optional list of available destination keepservices. "+
56 "If not provided, this list is obtained from api server configured in dst-config-file.")
62 "Number of replications to write to the destination.")
75 if srcConfigFile == "" {
76 log.Fatal("-src-config-file must be specified.")
78 srcConfig, err = readConfigFromFile(srcConfigFile)
80 log.Fatal("Error reading source configuration: %s", err.Error())
83 if dstConfigFile == "" {
84 log.Fatal("-dst-config-file must be specified.")
86 dstConfig, err = readConfigFromFile(dstConfigFile)
88 log.Fatal("Error reading destination configuration: %s", err.Error())
91 // Initialize keep-rsync
92 err = initializeKeepRsync()
94 log.Fatal("Error configurating keep-rsync: %s", err.Error())
97 // Copy blocks not found in dst from src
101 var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
103 // Reads config from file
104 func readConfigFromFile(filename string) (arvadosclient.APIConfig, error) {
105 var config arvadosclient.APIConfig
107 content, err := ioutil.ReadFile(filename)
112 lines := strings.Split(string(content), "\n")
113 for _, line := range lines {
117 kv := strings.Split(line, "=")
120 case "ARVADOS_API_TOKEN":
121 config.APIToken = kv[1]
122 case "ARVADOS_API_HOST":
123 config.APIHost = kv[1]
124 case "ARVADOS_API_HOST_INSECURE":
125 config.APIHostInsecure = matchTrue.MatchString(kv[1])
126 case "ARVADOS_EXTERNAL_CLIENT":
127 config.ExternalClient = matchTrue.MatchString(kv[1])
133 // keep-rsync source and destination clients
135 arvSrc arvadosclient.ArvadosClient
136 arvDst arvadosclient.ArvadosClient
137 kcSrc *keepclient.KeepClient
138 kcDst *keepclient.KeepClient
141 // Initializes keep-rsync using the config provided
142 func initializeKeepRsync() (err error) {
143 // arvSrc from srcConfig
144 arvSrc, err = arvadosclient.New(srcConfig)
149 // arvDst from dstConfig
150 arvDst, err = arvadosclient.New(dstConfig)
155 // if srcKeepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
156 if srcKeepServicesJSON == "" {
157 kcSrc, err = keepclient.MakeKeepClient(&arvSrc)
162 kcSrc, err = keepclient.MakeKeepClientFromJSON(&arvSrc, srcKeepServicesJSON)
168 // if dstKeepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
169 if dstKeepServicesJSON == "" {
170 kcDst, err = keepclient.MakeKeepClient(&arvDst)
175 kcDst, err = keepclient.MakeKeepClientFromJSON(&arvDst, dstKeepServicesJSON)
180 kcDst.Want_replicas = replications
185 // Get unique block locators from src and dst
186 // Copy any blocks missing in dst
187 func performKeepRsync() error {
188 // Get unique locators from src
189 srcIndex, err := getUniqueLocators(kcSrc, prefix)
194 // Get unique locators from dst
195 dstIndex, err := getUniqueLocators(kcDst, prefix)
200 // Get list of locators found in src, but missing in dst
201 toBeCopied := getMissingLocators(srcIndex, dstIndex)
203 // Copy each missing block to dst
204 copyBlocksToDst(toBeCopied)
209 // Get list of unique locators from the specified cluster
210 func getUniqueLocators(kc *keepclient.KeepClient, prefix string) (map[string]bool, error) {
211 var indexBytes []byte
213 for uuid := range kc.LocalRoots() {
214 reader, err := kc.GetIndex(uuid, prefix)
220 readBytes, err = ioutil.ReadAll(reader)
225 indexBytes = append(indexBytes, readBytes...)
228 // Got index; Now dedup it
229 locators := bytes.Split(indexBytes, []byte("\n"))
231 uniqueLocators := map[string]bool{}
232 for _, loc := range locators {
237 locator := string(bytes.Split(loc, []byte(" "))[0])
238 if _, ok := uniqueLocators[locator]; !ok {
239 uniqueLocators[locator] = true
242 return uniqueLocators, nil
245 // Get list of locators that are in src but not in dst
246 func getMissingLocators(srcLocators map[string]bool, dstLocators map[string]bool) []string {
247 var missingLocators []string
248 for locator := range srcLocators {
249 if _, ok := dstLocators[locator]; !ok {
250 missingLocators = append(missingLocators, locator)
253 return missingLocators
256 // Copy blocks from src to dst; only those that are missing in dst are copied
257 func copyBlocksToDst(toBeCopied []string) {
259 total := len(toBeCopied)
262 for _, locator := range toBeCopied {
263 log.Printf("Getting block %d of %d", done+1, total)
265 log.Printf("Getting block: %v", locator)
267 reader, _, _, err := kcSrc.Get(locator)
269 log.Printf("Error getting block: %q %v", locator, err)
270 failed = append(failed, locator)
273 data, err := ioutil.ReadAll(reader)
275 log.Printf("Error reading block data: %q %v", locator, err)
276 failed = append(failed, locator)
280 log.Printf("Copying block: %q", locator)
281 _, rep, err := kcDst.PutB(data)
283 log.Printf("Error putting block data: %q %v", locator, err)
284 failed = append(failed, locator)
287 if rep != replications {
288 log.Printf("Failed to put enough number of replicas. Wanted: %d; Put: %d", replications, rep)
289 failed = append(failed, locator)
294 log.Printf("%.2f%% done", float64(done)/float64(total)*100)
297 log.Printf("Successfully copied to destination %d and failed %d out of a total of %d", done, len(failed), total)
298 log.Printf("Failed blocks %v", failed)