"regexp"
"runtime"
"strconv"
+ "strings"
"sync"
"time"
)
// Replace the current trash queue.
rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
+ // Untrash moves blocks from trash back into store
+ rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
+
// Any request which does not match any of these routes gets
// 400 Bad Request.
rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
Failed int `json:"copies_failed"`
}
for _, vol := range KeepVM.AllWritable() {
- if err := vol.Delete(hash); err == nil {
+ if err := vol.Trash(hash); err == nil {
result.Deleted++
} else if os.IsNotExist(err) {
continue
trashq.ReplaceQueue(tlist)
}
+// UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
+func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
+ // Reject unauthorized requests.
+ if !IsDataManagerToken(GetApiToken(req)) {
+ http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
+ return
+ }
+
+ hash := mux.Vars(req)["hash"]
+
+ if len(KeepVM.AllWritable()) == 0 {
+ http.Error(resp, "No writable volumes", http.StatusNotFound)
+ return
+ }
+
+ var untrashedOn, failedOn []string
+ var numNotFound int
+ for _, vol := range KeepVM.AllWritable() {
+ err := vol.Untrash(hash)
+
+ if os.IsNotExist(err) {
+ numNotFound++
+ } else if err != nil {
+ log.Printf("Error untrashing %v on volume %v", hash, vol.String())
+ failedOn = append(failedOn, vol.String())
+ } else {
+ log.Printf("Untrashed %v on volume %v", hash, vol.String())
+ untrashedOn = append(untrashedOn, vol.String())
+ }
+ }
+
+ if numNotFound == len(KeepVM.AllWritable()) {
+ http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
+ return
+ }
+
+ if len(failedOn) == len(KeepVM.AllWritable()) {
+ http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
+ } else {
+ respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
+ if len(failedOn) > 0 {
+ respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
+ }
+ resp.Write([]byte(respBody))
+ }
+}
+
// ==============================
// GetBlock and PutBlock implement lower-level code for handling
// blocks by rooting through volumes connected to the local machine.
// If we already have this data, it's intact on disk, and we
// can update its timestamp, return success. If we have
// different data with the same hash, return failure.
- if err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
- return 0, err
+ if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
+ return n, err
}
// Choose a Keep volume to write to.
return 0, GenericError
}
-// CompareAndTouch returns nil if one of the volumes already has the
-// given content and it successfully updates the relevant block's
-// modification time in order to protect it from premature garbage
-// collection.
-func CompareAndTouch(hash string, buf []byte) error {
+// CompareAndTouch returns the current replication level if one of the
+// volumes already has the given content and it successfully updates
+// the relevant block's modification time in order to protect it from
+// premature garbage collection. Otherwise, it returns a non-nil
+// error.
+func CompareAndTouch(hash string, buf []byte) (int, error) {
var bestErr error = NotFoundError
for _, vol := range KeepVM.AllWritable() {
if err := vol.Compare(hash, buf); err == CollisionError {
// both, so there's no point writing it even
// on a different volume.)
log.Printf("%s: Compare(%s): %s", vol, hash, err)
- return err
+ return 0, err
} else if os.IsNotExist(err) {
// Block does not exist. This is the only
// "normal" error: we don't log anything.
continue
}
// Compare and Touch both worked --> done.
- return nil
+ return vol.Replication(), nil
}
- return bestErr
+ return 0, bestErr
}
var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)