7159: Work around CreateBlob race by polling for updates when a brand new blob is...
[arvados.git] / services / keepstore / handlers.go
index 2b96dbc582f8b584e731401f3e467dae635ea837..95af1b48707c6b189982dc18762cb517769bd117 100644 (file)
@@ -120,7 +120,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
                return
        }
 
-       err = PutBlock(buf, hash)
+       replication, err := PutBlock(buf, hash)
        bufs.Put(buf)
 
        if err != nil {
@@ -137,6 +137,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
                expiry := time.Now().Add(blobSignatureTTL)
                returnHash = SignLocator(returnHash, apiToken, expiry)
        }
+       resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
        resp.Write([]byte(returnHash + "\n"))
 }
 
@@ -517,40 +518,40 @@ func GetBlock(hash string) ([]byte, error) {
 //          all writes failed). The text of the error message should
 //          provide as much detail as possible.
 //
-func PutBlock(block []byte, hash string) error {
+func PutBlock(block []byte, hash string) (int, error) {
        // Check that BLOCK's checksum matches HASH.
        blockhash := fmt.Sprintf("%x", md5.Sum(block))
        if blockhash != hash {
                log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
-               return RequestHashError
+               return 0, RequestHashError
        }
 
        // If we already have this data, it's intact on disk, and we
        // can update its timestamp, return success. If we have
        // different data with the same hash, return failure.
-       if err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
-               return err
+       if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
+               return n, err
        }
 
        // Choose a Keep volume to write to.
        // If this volume fails, try all of the volumes in order.
        if vol := KeepVM.NextWritable(); vol != nil {
                if err := vol.Put(hash, block); err == nil {
-                       return nil // success!
+                       return vol.Replication(), nil // success!
                }
        }
 
        writables := KeepVM.AllWritable()
        if len(writables) == 0 {
                log.Print("No writable volumes.")
-               return FullError
+               return 0, FullError
        }
 
        allFull := true
        for _, vol := range writables {
                err := vol.Put(hash, block)
                if err == nil {
-                       return nil // success!
+                       return vol.Replication(), nil // success!
                }
                if err != FullError {
                        // The volume is not full but the
@@ -563,17 +564,18 @@ func PutBlock(block []byte, hash string) error {
 
        if allFull {
                log.Print("All volumes are full.")
-               return FullError
+               return 0, FullError
        }
        // Already logged the non-full errors.
-       return GenericError
+       return 0, GenericError
 }
 
-// CompareAndTouch returns nil if one of the volumes already has the
-// given content and it successfully updates the relevant block's
-// modification time in order to protect it from premature garbage
-// collection.
-func CompareAndTouch(hash string, buf []byte) error {
+// CompareAndTouch returns the current replication level if one of the
+// volumes already has the given content and it successfully updates
+// the relevant block's modification time in order to protect it from
+// premature garbage collection. Otherwise, it returns a non-nil
+// error.
+func CompareAndTouch(hash string, buf []byte) (int, error) {
        var bestErr error = NotFoundError
        for _, vol := range KeepVM.AllWritable() {
                if err := vol.Compare(hash, buf); err == CollisionError {
@@ -583,7 +585,7 @@ func CompareAndTouch(hash string, buf []byte) error {
                        // both, so there's no point writing it even
                        // on a different volume.)
                        log.Printf("%s: Compare(%s): %s", vol, hash, err)
-                       return err
+                       return 0, err
                } else if os.IsNotExist(err) {
                        // Block does not exist. This is the only
                        // "normal" error: we don't log anything.
@@ -601,9 +603,9 @@ func CompareAndTouch(hash string, buf []byte) error {
                        continue
                }
                // Compare and Touch both worked --> done.
-               return nil
+               return vol.Replication(), nil
        }
-       return bestErr
+       return 0, bestErr
 }
 
 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)