X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/558574e31f56db08c82ca7c2b955e74df04242dd..56edfaae396bd3e2c69d19425d887abe7e3bc0d5:/services/keepstore/handlers.go diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go index 2b96dbc582..95af1b4870 100644 --- a/services/keepstore/handlers.go +++ b/services/keepstore/handlers.go @@ -120,7 +120,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) { return } - err = PutBlock(buf, hash) + replication, err := PutBlock(buf, hash) bufs.Put(buf) if err != nil { @@ -137,6 +137,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) { expiry := time.Now().Add(blobSignatureTTL) returnHash = SignLocator(returnHash, apiToken, expiry) } + resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication)) resp.Write([]byte(returnHash + "\n")) } @@ -517,40 +518,40 @@ func GetBlock(hash string) ([]byte, error) { // all writes failed). The text of the error message should // provide as much detail as possible. // -func PutBlock(block []byte, hash string) error { +func PutBlock(block []byte, hash string) (int, error) { // Check that BLOCK's checksum matches HASH. blockhash := fmt.Sprintf("%x", md5.Sum(block)) if blockhash != hash { log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash) - return RequestHashError + return 0, RequestHashError } // If we already have this data, it's intact on disk, and we // can update its timestamp, return success. If we have // different data with the same hash, return failure. - if err := CompareAndTouch(hash, block); err == nil || err == CollisionError { - return err + if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError { + return n, err } // Choose a Keep volume to write to. // If this volume fails, try all of the volumes in order. if vol := KeepVM.NextWritable(); vol != nil { if err := vol.Put(hash, block); err == nil { - return nil // success! + return vol.Replication(), nil // success! } } writables := KeepVM.AllWritable() if len(writables) == 0 { log.Print("No writable volumes.") - return FullError + return 0, FullError } allFull := true for _, vol := range writables { err := vol.Put(hash, block) if err == nil { - return nil // success! + return vol.Replication(), nil // success! } if err != FullError { // The volume is not full but the @@ -563,17 +564,18 @@ func PutBlock(block []byte, hash string) error { if allFull { log.Print("All volumes are full.") - return FullError + return 0, FullError } // Already logged the non-full errors. - return GenericError + return 0, GenericError } -// CompareAndTouch returns nil if one of the volumes already has the -// given content and it successfully updates the relevant block's -// modification time in order to protect it from premature garbage -// collection. -func CompareAndTouch(hash string, buf []byte) error { +// CompareAndTouch returns the current replication level if one of the +// volumes already has the given content and it successfully updates +// the relevant block's modification time in order to protect it from +// premature garbage collection. Otherwise, it returns a non-nil +// error. +func CompareAndTouch(hash string, buf []byte) (int, error) { var bestErr error = NotFoundError for _, vol := range KeepVM.AllWritable() { if err := vol.Compare(hash, buf); err == CollisionError { @@ -583,7 +585,7 @@ func CompareAndTouch(hash string, buf []byte) error { // both, so there's no point writing it even // on a different volume.) log.Printf("%s: Compare(%s): %s", vol, hash, err) - return err + return 0, err } else if os.IsNotExist(err) { // Block does not exist. This is the only // "normal" error: we don't log anything. @@ -601,9 +603,9 @@ func CompareAndTouch(hash string, buf []byte) error { continue } // Compare and Touch both worked --> done. - return nil + return vol.Replication(), nil } - return bestErr + return 0, bestErr } var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)