Merge branch '8087-arv-cli-request-body-from-file' of https://github.com/wtsi-hgi...
[arvados.git] / services / keepstore / handlers.go
index 95af1b48707c6b189982dc18762cb517769bd117..043ab69b17c255aa463fe8259a777cec682453f5 100644 (file)
@@ -20,6 +20,7 @@ import (
        "regexp"
        "runtime"
        "strconv"
+       "strings"
        "sync"
        "time"
 )
@@ -53,6 +54,9 @@ func MakeRESTRouter() *mux.Router {
        // Replace the current trash queue.
        rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
 
+       // Untrash moves blocks from trash back into store
+       rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
+
        // Any request which does not match any of these routes gets
        // 400 Bad Request.
        rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
@@ -295,7 +299,7 @@ func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
                Failed  int `json:"copies_failed"`
        }
        for _, vol := range KeepVM.AllWritable() {
-               if err := vol.Delete(hash); err == nil {
+               if err := vol.Trash(hash); err == nil {
                        result.Deleted++
                } else if os.IsNotExist(err) {
                        continue
@@ -430,6 +434,53 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) {
        trashq.ReplaceQueue(tlist)
 }
 
+// UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
+func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
+       // Reject unauthorized requests.
+       if !IsDataManagerToken(GetApiToken(req)) {
+               http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
+               return
+       }
+
+       hash := mux.Vars(req)["hash"]
+
+       if len(KeepVM.AllWritable()) == 0 {
+               http.Error(resp, "No writable volumes", http.StatusNotFound)
+               return
+       }
+
+       var untrashedOn, failedOn []string
+       var numNotFound int
+       for _, vol := range KeepVM.AllWritable() {
+               err := vol.Untrash(hash)
+
+               if os.IsNotExist(err) {
+                       numNotFound++
+               } else if err != nil {
+                       log.Printf("Error untrashing %v on volume %v", hash, vol.String())
+                       failedOn = append(failedOn, vol.String())
+               } else {
+                       log.Printf("Untrashed %v on volume %v", hash, vol.String())
+                       untrashedOn = append(untrashedOn, vol.String())
+               }
+       }
+
+       if numNotFound == len(KeepVM.AllWritable()) {
+               http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
+               return
+       }
+
+       if len(failedOn) == len(KeepVM.AllWritable()) {
+               http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
+       } else {
+               respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
+               if len(failedOn) > 0 {
+                       respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
+               }
+               resp.Write([]byte(respBody))
+       }
+}
+
 // ==============================
 // GetBlock and PutBlock implement lower-level code for handling
 // blocks by rooting through volumes connected to the local machine.