}
}
-// Delete a Keep block.
-func (v *AzureBlobVolume) Delete(loc string) error {
+// Trash a Keep block.
+func (v *AzureBlobVolume) Trash(loc string) error {
if v.readonly {
return MethodDisabledError
}
})
}
+// Untrash a Keep block.
+// TBD
+func (v *AzureBlobVolume) Untrash(loc string) error {
+ return nil
+}
+
// Status returns a VolumeStatus struct with placeholder data.
func (v *AzureBlobVolume) Status() *VolumeStatus {
return &VolumeStatus{
t.Errorf("Got X-Keep-Replicas-Stored: %q, expected %q", r, "1")
}
}
+
+func TestUndeleteHandler(t *testing.T) {
+ defer teardown()
+
+ // Set up Keep volumes
+ KeepVM = MakeTestVolumeManager(2)
+ defer KeepVM.Close()
+ vols := KeepVM.AllWritable()
+ vols[0].Put(TestHash, TestBlock)
+
+ // Only the datamanager user should be allowed to undelete blocks
+ dataManagerToken = "DATA MANAGER TOKEN"
+
+ // unauthenticatedReq => UnauthorizedError
+ unauthenticatedReq := &RequestTester{
+ method: "PUT",
+ uri: "/undelete/" + TestHash,
+ }
+ response := IssueRequest(unauthenticatedReq)
+ ExpectStatusCode(t,
+ "enforcePermissions on, unauthenticated request",
+ UnauthorizedError.HTTPCode,
+ response)
+
+ // notDataManagerReq => UnauthorizedError
+ notDataManagerReq := &RequestTester{
+ method: "PUT",
+ uri: "/undelete/" + TestHash,
+ apiToken: knownToken,
+ }
+
+ response = IssueRequest(notDataManagerReq)
+ ExpectStatusCode(t,
+ "permissions on, unauthenticated /index/prefix request",
+ UnauthorizedError.HTTPCode,
+ response)
+
+ // datamanagerWithBadHashReq => StatusBadRequest
+ datamanagerWithBadHashReq := &RequestTester{
+ method: "PUT",
+ uri: "/undelete/thisisnotalocator",
+ apiToken: dataManagerToken,
+ }
+ response = IssueRequest(datamanagerWithBadHashReq)
+ ExpectStatusCode(t,
+ "permissions on, authenticated request, non-superuser",
+ http.StatusBadRequest,
+ response)
+
+ // datamanagerWrongMethodReq => StatusBadRequest
+ datamanagerWrongMethodReq := &RequestTester{
+ method: "GET",
+ uri: "/undelete/" + TestHash,
+ apiToken: dataManagerToken,
+ }
+ response = IssueRequest(datamanagerWrongMethodReq)
+ ExpectStatusCode(t,
+ "permissions on, authenticated request, non-superuser",
+ http.StatusBadRequest,
+ response)
+
+ // datamanagerReq => StatusOK
+ datamanagerReq := &RequestTester{
+ method: "PUT",
+ uri: "/undelete/" + TestHash,
+ apiToken: dataManagerToken,
+ }
+ response = IssueRequest(datamanagerReq)
+ ExpectStatusCode(t,
+ "permissions on, authenticated request, non-superuser",
+ http.StatusOK,
+ response)
+ expected := `Untrashed on volume`
+ match, _ := regexp.MatchString(expected, response.Body.String())
+ if !match {
+ t.Errorf(
+ "Undelete response mismatched: expected %s, got:\n%s",
+ expected, response.Body.String())
+ }
+}
// Replace the current trash queue.
rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
+ // Undelete moves blocks from trash back into store
+ rest.HandleFunc(`/undelete/{hash:[0-9a-f]{32}}`, UndeleteHandler).Methods("PUT")
+
// Any request which does not match any of these routes gets
// 400 Bad Request.
rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
Failed int `json:"copies_failed"`
}
for _, vol := range KeepVM.AllWritable() {
- if err := vol.Delete(hash); err == nil {
+ if err := vol.Trash(hash); err == nil {
result.Deleted++
} else if os.IsNotExist(err) {
continue
trashq.ReplaceQueue(tlist)
}
+// UndeleteHandler processes "PUT /undelete/{hash:[0-9a-f]{32}}" requests for the data manager.
+func UndeleteHandler(resp http.ResponseWriter, req *http.Request) {
+ // Reject unauthorized requests.
+ if !IsDataManagerToken(GetApiToken(req)) {
+ http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
+ return
+ }
+
+ hash := mux.Vars(req)["hash"]
+ successResp := "Untrashed on volume: "
+ var st int
+ for _, vol := range KeepVM.AllWritable() {
+ if err := vol.Untrash(hash); err == nil {
+ log.Printf("Untrashed %v on volume %v", hash, vol.String())
+ st = http.StatusOK
+ successResp += vol.String()
+ break
+ } else {
+ log.Printf("Error untrashing %v on volume %v: %v", hash, vol.String(), err)
+ st = 500
+ }
+ }
+
+ if st == http.StatusOK {
+ resp.Write([]byte(successResp))
+ }
+
+ resp.WriteHeader(st)
+}
+
// ==============================
// GetBlock and PutBlock implement lower-level code for handling
// blocks by rooting through volumes connected to the local machine.
SizeRequiredError = &KeepError{411, "Missing Content-Length"}
TooLongError = &KeepError{413, "Block is too large"}
MethodDisabledError = &KeepError{405, "Method disabled"}
+ ErrNotImplemented = &KeepError{500, "Unsupported configuration"}
)
func (e *KeepError) Error() string {
flagSerializeIO bool
flagReadonly bool
volumes volumeSet
+ trashLifetime int
)
func (vs *volumeSet) String() string {
"max-buffers",
maxBuffers,
fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
+ flag.IntVar(
+ &trashLifetime,
+ "trash-lifetime",
+ 0,
+ fmt.Sprintf("Trashed blocks will stay in trash for trash-lifetime interval before they are actually deleted by the system."))
flag.Parse()
return nil
}
-func (v *S3Volume) Delete(loc string) error {
+func (v *S3Volume) Trash(loc string) error {
if v.readonly {
return MethodDisabledError
}
return v.Bucket.Del(loc)
}
+// TBD
+func (v *S3Volume) Untrash(loc string) error {
+ return nil
+}
+
func (v *S3Volume) Status() *VolumeStatus {
return &VolumeStatus{
DeviceNum: 1,
if neverDelete {
err = errors.New("did not delete block because neverDelete is true")
} else {
- err = volume.Delete(trashRequest.Locator)
+ err = volume.Trash(trashRequest.Locator)
}
if err != nil {
// particular order.
IndexTo(prefix string, writer io.Writer) error
- // Delete deletes the block data from the underlying storage
- // device.
+ // Trash moves the block data from the underlying storage
+ // device to trash area. The block then stays in trash for
+ // -trash-lifetime interval before it is actually deleted.
//
// loc is as described in Get.
//
// If the timestamp for the given locator is newer than
- // blobSignatureTTL, Delete must not delete the data.
+ // blobSignatureTTL, Trash must not trash the data.
//
- // If a Delete operation overlaps with any Touch or Put
+ // If a Trash operation overlaps with any Touch or Put
// operations on the same locator, the implementation must
// ensure one of the following outcomes:
//
// - Touch and Put return a non-nil error, or
- // - Delete does not delete the block, or
+ // - Trash does not trash the block, or
// - Both of the above.
//
// If it is possible for the storage device to be accessed by
// reliably or fail outright.
//
// Corollary: A successful Touch or Put guarantees a block
- // will not be deleted for at least blobSignatureTTL
+ // will not be trashed for at least blobSignatureTTL
// seconds.
- Delete(loc string) error
+ Trash(loc string) error
+
+ // Untrash moves block from trash back into store
+ Untrash(loc string) error
// Status returns a *VolumeStatus representing the current
// in-use and available storage capacity and an
v.Put(TestHash, TestBlock)
- if err := v.Delete(TestHash); err != nil {
+ if err := v.Trash(TestHash); err != nil {
t.Error(err)
}
data, err := v.Get(TestHash)
v.Put(TestHash, TestBlock)
v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
- if err := v.Delete(TestHash); err != nil {
+ if err := v.Trash(TestHash); err != nil {
t.Error(err)
}
if _, err := v.Get(TestHash); err == nil || !os.IsNotExist(err) {
v := factory(t)
defer v.Teardown()
- if err := v.Delete(TestHash2); err == nil {
+ if err := v.Trash(TestHash2); err == nil {
t.Errorf("Expected error when attempting to delete a non-existing block")
}
}
}
// Delete a block from a read-only volume should result in error
- err = v.Delete(TestHash)
+ err = v.Trash(TestHash)
if err == nil {
t.Errorf("Expected error when deleting block from a read-only volume")
}
return nil
}
-func (v *MockVolume) Delete(loc string) error {
+func (v *MockVolume) Trash(loc string) error {
v.gotCall("Delete")
<-v.Gate
if v.Readonly {
return os.ErrNotExist
}
+// TBD
+func (v *MockVolume) Untrash(loc string) error {
+ return nil
+}
+
func (v *MockVolume) Status() *VolumeStatus {
var used uint64
for _, block := range v.Store {
}
// Delete deletes the block data from the unix storage
-func (v *UnixVolume) Delete(loc string) error {
+func (v *UnixVolume) Trash(loc string) error {
// Touch() must be called before calling Write() on a block. Touch()
// also uses lockfile(). This avoids a race condition between Write()
// and Delete() because either (a) the file will be deleted and Touch()
return os.Remove(p)
}
+// Untrash moves block from trash back into store
+// TBD
+func (v *UnixVolume) Untrash(loc string) error {
+ return nil
+}
+
// blockDir returns the fully qualified directory name for the directory
// where loc is (or would be) stored on this volume.
func (v *UnixVolume) blockDir(loc string) string {
t.Errorf("got err %v, expected MethodDisabledError", err)
}
- err = v.Delete(TestHash)
+ err = v.Trash(TestHash)
if err != MethodDisabledError {
t.Errorf("got err %v, expected MethodDisabledError", err)
}