Merge branch '8087-arv-cli-request-body-from-file' of https://github.com/wtsi-hgi...
[arvados.git] / services / keepstore / handlers.go
index 2b96dbc582f8b584e731401f3e467dae635ea837..043ab69b17c255aa463fe8259a777cec682453f5 100644 (file)
@@ -20,6 +20,7 @@ import (
        "regexp"
        "runtime"
        "strconv"
+       "strings"
        "sync"
        "time"
 )
@@ -53,6 +54,9 @@ func MakeRESTRouter() *mux.Router {
        // Replace the current trash queue.
        rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
 
+       // Untrash moves blocks from trash back into store
+       rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
+
        // Any request which does not match any of these routes gets
        // 400 Bad Request.
        rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
@@ -120,7 +124,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
                return
        }
 
-       err = PutBlock(buf, hash)
+       replication, err := PutBlock(buf, hash)
        bufs.Put(buf)
 
        if err != nil {
@@ -137,6 +141,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
                expiry := time.Now().Add(blobSignatureTTL)
                returnHash = SignLocator(returnHash, apiToken, expiry)
        }
+       resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
        resp.Write([]byte(returnHash + "\n"))
 }
 
@@ -294,7 +299,7 @@ func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
                Failed  int `json:"copies_failed"`
        }
        for _, vol := range KeepVM.AllWritable() {
-               if err := vol.Delete(hash); err == nil {
+               if err := vol.Trash(hash); err == nil {
                        result.Deleted++
                } else if os.IsNotExist(err) {
                        continue
@@ -429,6 +434,53 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) {
        trashq.ReplaceQueue(tlist)
 }
 
+// UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
+func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
+       // Reject unauthorized requests.
+       if !IsDataManagerToken(GetApiToken(req)) {
+               http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
+               return
+       }
+
+       hash := mux.Vars(req)["hash"]
+
+       if len(KeepVM.AllWritable()) == 0 {
+               http.Error(resp, "No writable volumes", http.StatusNotFound)
+               return
+       }
+
+       var untrashedOn, failedOn []string
+       var numNotFound int
+       for _, vol := range KeepVM.AllWritable() {
+               err := vol.Untrash(hash)
+
+               if os.IsNotExist(err) {
+                       numNotFound++
+               } else if err != nil {
+                       log.Printf("Error untrashing %v on volume %v", hash, vol.String())
+                       failedOn = append(failedOn, vol.String())
+               } else {
+                       log.Printf("Untrashed %v on volume %v", hash, vol.String())
+                       untrashedOn = append(untrashedOn, vol.String())
+               }
+       }
+
+       if numNotFound == len(KeepVM.AllWritable()) {
+               http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
+               return
+       }
+
+       if len(failedOn) == len(KeepVM.AllWritable()) {
+               http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
+       } else {
+               respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
+               if len(failedOn) > 0 {
+                       respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
+               }
+               resp.Write([]byte(respBody))
+       }
+}
+
 // ==============================
 // GetBlock and PutBlock implement lower-level code for handling
 // blocks by rooting through volumes connected to the local machine.
@@ -517,40 +569,40 @@ func GetBlock(hash string) ([]byte, error) {
 //          all writes failed). The text of the error message should
 //          provide as much detail as possible.
 //
-func PutBlock(block []byte, hash string) error {
+func PutBlock(block []byte, hash string) (int, error) {
        // Check that BLOCK's checksum matches HASH.
        blockhash := fmt.Sprintf("%x", md5.Sum(block))
        if blockhash != hash {
                log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
-               return RequestHashError
+               return 0, RequestHashError
        }
 
        // If we already have this data, it's intact on disk, and we
        // can update its timestamp, return success. If we have
        // different data with the same hash, return failure.
-       if err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
-               return err
+       if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
+               return n, err
        }
 
        // Choose a Keep volume to write to.
        // If this volume fails, try all of the volumes in order.
        if vol := KeepVM.NextWritable(); vol != nil {
                if err := vol.Put(hash, block); err == nil {
-                       return nil // success!
+                       return vol.Replication(), nil // success!
                }
        }
 
        writables := KeepVM.AllWritable()
        if len(writables) == 0 {
                log.Print("No writable volumes.")
-               return FullError
+               return 0, FullError
        }
 
        allFull := true
        for _, vol := range writables {
                err := vol.Put(hash, block)
                if err == nil {
-                       return nil // success!
+                       return vol.Replication(), nil // success!
                }
                if err != FullError {
                        // The volume is not full but the
@@ -563,17 +615,18 @@ func PutBlock(block []byte, hash string) error {
 
        if allFull {
                log.Print("All volumes are full.")
-               return FullError
+               return 0, FullError
        }
        // Already logged the non-full errors.
-       return GenericError
+       return 0, GenericError
 }
 
-// CompareAndTouch returns nil if one of the volumes already has the
-// given content and it successfully updates the relevant block's
-// modification time in order to protect it from premature garbage
-// collection.
-func CompareAndTouch(hash string, buf []byte) error {
+// CompareAndTouch returns the current replication level if one of the
+// volumes already has the given content and it successfully updates
+// the relevant block's modification time in order to protect it from
+// premature garbage collection. Otherwise, it returns a non-nil
+// error.
+func CompareAndTouch(hash string, buf []byte) (int, error) {
        var bestErr error = NotFoundError
        for _, vol := range KeepVM.AllWritable() {
                if err := vol.Compare(hash, buf); err == CollisionError {
@@ -583,7 +636,7 @@ func CompareAndTouch(hash string, buf []byte) error {
                        // both, so there's no point writing it even
                        // on a different volume.)
                        log.Printf("%s: Compare(%s): %s", vol, hash, err)
-                       return err
+                       return 0, err
                } else if os.IsNotExist(err) {
                        // Block does not exist. This is the only
                        // "normal" error: we don't log anything.
@@ -601,9 +654,9 @@ func CompareAndTouch(hash string, buf []byte) error {
                        continue
                }
                // Compare and Touch both worked --> done.
-               return nil
+               return vol.Replication(), nil
        }
-       return bestErr
+       return 0, bestErr
 }
 
 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)