8178: rename Delete api as Trash; add Untrash to volume interface; add UndeleteHandle...
authorradhika <radhika@curoverse.com>
Thu, 21 Jan 2016 18:59:36 +0000 (13:59 -0500)
committerradhika <radhika@curoverse.com>
Tue, 26 Jan 2016 15:21:06 +0000 (10:21 -0500)
services/keepstore/azure_blob_volume.go
services/keepstore/handler_test.go
services/keepstore/handlers.go
services/keepstore/keepstore.go
services/keepstore/s3_volume.go
services/keepstore/trash_worker.go
services/keepstore/volume.go
services/keepstore/volume_generic_test.go
services/keepstore/volume_test.go
services/keepstore/volume_unix.go
services/keepstore/volume_unix_test.go

index c0033d9186c3a6306ff4251aa3e84b37af6e5109..0071567afabed21084abce843038929a03afbaf8 100644 (file)
@@ -311,8 +311,8 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
        }
 }
 
-// Delete a Keep block.
-func (v *AzureBlobVolume) Delete(loc string) error {
+// Trash a Keep block.
+func (v *AzureBlobVolume) Trash(loc string) error {
        if v.readonly {
                return MethodDisabledError
        }
@@ -335,6 +335,12 @@ func (v *AzureBlobVolume) Delete(loc string) error {
        })
 }
 
+// Untrash a Keep block.
+// TBD
+func (v *AzureBlobVolume) Untrash(loc string) error {
+       return nil
+}
+
 // Status returns a VolumeStatus struct with placeholder data.
 func (v *AzureBlobVolume) Status() *VolumeStatus {
        return &VolumeStatus{
index 3817ea19002d1c18f14c2479a383fb2d1601d763..9526c3cc504772ab3559d27e1b7fe133556fef7c 100644 (file)
@@ -970,3 +970,83 @@ func TestPutReplicationHeader(t *testing.T) {
                t.Errorf("Got X-Keep-Replicas-Stored: %q, expected %q", r, "1")
        }
 }
+
+func TestUndeleteHandler(t *testing.T) {
+       defer teardown()
+
+       // Set up Keep volumes
+       KeepVM = MakeTestVolumeManager(2)
+       defer KeepVM.Close()
+       vols := KeepVM.AllWritable()
+       vols[0].Put(TestHash, TestBlock)
+
+       // Only the datamanager user should be allowed to undelete blocks
+       dataManagerToken = "DATA MANAGER TOKEN"
+
+       // unauthenticatedReq => UnauthorizedError
+       unauthenticatedReq := &RequestTester{
+               method: "PUT",
+               uri:    "/undelete/" + TestHash,
+       }
+       response := IssueRequest(unauthenticatedReq)
+       ExpectStatusCode(t,
+               "enforcePermissions on, unauthenticated request",
+               UnauthorizedError.HTTPCode,
+               response)
+
+       // notDataManagerReq => UnauthorizedError
+       notDataManagerReq := &RequestTester{
+               method:   "PUT",
+               uri:      "/undelete/" + TestHash,
+               apiToken: knownToken,
+       }
+
+       response = IssueRequest(notDataManagerReq)
+       ExpectStatusCode(t,
+               "permissions on, unauthenticated /index/prefix request",
+               UnauthorizedError.HTTPCode,
+               response)
+
+       // datamanagerWithBadHashReq => StatusBadRequest
+       datamanagerWithBadHashReq := &RequestTester{
+               method:   "PUT",
+               uri:      "/undelete/thisisnotalocator",
+               apiToken: dataManagerToken,
+       }
+       response = IssueRequest(datamanagerWithBadHashReq)
+       ExpectStatusCode(t,
+               "permissions on, authenticated request, non-superuser",
+               http.StatusBadRequest,
+               response)
+
+       // datamanagerWrongMethodReq => StatusBadRequest
+       datamanagerWrongMethodReq := &RequestTester{
+               method:   "GET",
+               uri:      "/undelete/" + TestHash,
+               apiToken: dataManagerToken,
+       }
+       response = IssueRequest(datamanagerWrongMethodReq)
+       ExpectStatusCode(t,
+               "permissions on, authenticated request, non-superuser",
+               http.StatusBadRequest,
+               response)
+
+       // datamanagerReq => StatusOK
+       datamanagerReq := &RequestTester{
+               method:   "PUT",
+               uri:      "/undelete/" + TestHash,
+               apiToken: dataManagerToken,
+       }
+       response = IssueRequest(datamanagerReq)
+       ExpectStatusCode(t,
+               "permissions on, authenticated request, non-superuser",
+               http.StatusOK,
+               response)
+       expected := `Untrashed on volume`
+       match, _ := regexp.MatchString(expected, response.Body.String())
+       if !match {
+               t.Errorf(
+                       "Undelete response mismatched: expected %s, got:\n%s",
+                       expected, response.Body.String())
+       }
+}
index 95af1b48707c6b189982dc18762cb517769bd117..f8e4d7145232bddc0042b62991d490a64d8934bd 100644 (file)
@@ -53,6 +53,9 @@ func MakeRESTRouter() *mux.Router {
        // Replace the current trash queue.
        rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
 
+       // Undelete moves blocks from trash back into store
+       rest.HandleFunc(`/undelete/{hash:[0-9a-f]{32}}`, UndeleteHandler).Methods("PUT")
+
        // Any request which does not match any of these routes gets
        // 400 Bad Request.
        rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
@@ -295,7 +298,7 @@ func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
                Failed  int `json:"copies_failed"`
        }
        for _, vol := range KeepVM.AllWritable() {
-               if err := vol.Delete(hash); err == nil {
+               if err := vol.Trash(hash); err == nil {
                        result.Deleted++
                } else if os.IsNotExist(err) {
                        continue
@@ -430,6 +433,36 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) {
        trashq.ReplaceQueue(tlist)
 }
 
+// UndeleteHandler processes "PUT /undelete/{hash:[0-9a-f]{32}}" requests for the data manager.
+func UndeleteHandler(resp http.ResponseWriter, req *http.Request) {
+       // Reject unauthorized requests.
+       if !IsDataManagerToken(GetApiToken(req)) {
+               http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
+               return
+       }
+
+       hash := mux.Vars(req)["hash"]
+       successResp := "Untrashed on volume: "
+       var st int
+       for _, vol := range KeepVM.AllWritable() {
+               if err := vol.Untrash(hash); err == nil {
+                       log.Printf("Untrashed %v on volume %v", hash, vol.String())
+                       st = http.StatusOK
+                       successResp += vol.String()
+                       break
+               } else {
+                       log.Printf("Error untrashing %v on volume %v: %v", hash, vol.String(), err)
+                       st = 500
+               }
+       }
+
+       if st == http.StatusOK {
+               resp.Write([]byte(successResp))
+       }
+
+       resp.WriteHeader(st)
+}
+
 // ==============================
 // GetBlock and PutBlock implement lower-level code for handling
 // blocks by rooting through volumes connected to the local machine.
index 96a887fecb20b278a2c9a763ebfc094b71bf31ac..f6b1f6e164365aac025b32a87c1c6a5305d780f3 100644 (file)
@@ -79,6 +79,7 @@ var (
        SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
        TooLongError        = &KeepError{413, "Block is too large"}
        MethodDisabledError = &KeepError{405, "Method disabled"}
+       ErrNotImplemented   = &KeepError{500, "Unsupported configuration"}
 )
 
 func (e *KeepError) Error() string {
@@ -113,6 +114,7 @@ var (
        flagSerializeIO bool
        flagReadonly    bool
        volumes         volumeSet
+       trashLifetime   int
 )
 
 func (vs *volumeSet) String() string {
@@ -200,6 +202,11 @@ func main() {
                "max-buffers",
                maxBuffers,
                fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
+       flag.IntVar(
+               &trashLifetime,
+               "trash-lifetime",
+               0,
+               fmt.Sprintf("Trashed blocks will stay in trash for trash-lifetime interval before they are actually deleted by the system."))
 
        flag.Parse()
 
index 572ee46e71419693b103801c7e01a8a139a0c69e..16afc326b5f563171d44abfac731d9ce377da373 100644 (file)
@@ -257,7 +257,7 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
        return nil
 }
 
-func (v *S3Volume) Delete(loc string) error {
+func (v *S3Volume) Trash(loc string) error {
        if v.readonly {
                return MethodDisabledError
        }
@@ -272,6 +272,11 @@ func (v *S3Volume) Delete(loc string) error {
        return v.Bucket.Del(loc)
 }
 
+// TBD
+func (v *S3Volume) Untrash(loc string) error {
+       return nil
+}
+
 func (v *S3Volume) Status() *VolumeStatus {
        return &VolumeStatus{
                DeviceNum: 1,
index 65e3fbd2849593e44be94921cb7073a5aba3adaa..62f63d57c8edb655b5078ebf637ce6d0ed0475bb 100644 (file)
@@ -47,7 +47,7 @@ func TrashItem(trashRequest TrashRequest) {
                if neverDelete {
                        err = errors.New("did not delete block because neverDelete is true")
                } else {
-                       err = volume.Delete(trashRequest.Locator)
+                       err = volume.Trash(trashRequest.Locator)
                }
 
                if err != nil {
index 7966c41b92bd89958308ec77765f0b7a5a1f0fd9..58710c04b269a57af236fbb36f5a6aaa61d9b256 100644 (file)
@@ -144,20 +144,21 @@ type Volume interface {
        // particular order.
        IndexTo(prefix string, writer io.Writer) error
 
-       // Delete deletes the block data from the underlying storage
-       // device.
+       // Trash moves the block data from the underlying storage
+       // device to trash area. The block then stays in trash for
+       // -trash-lifetime interval before it is actually deleted.
        //
        // loc is as described in Get.
        //
        // If the timestamp for the given locator is newer than
-       // blobSignatureTTL, Delete must not delete the data.
+       // blobSignatureTTL, Trash must not trash the data.
        //
-       // If a Delete operation overlaps with any Touch or Put
+       // If a Trash operation overlaps with any Touch or Put
        // operations on the same locator, the implementation must
        // ensure one of the following outcomes:
        //
        //   - Touch and Put return a non-nil error, or
-       //   - Delete does not delete the block, or
+       //   - Trash does not trash the block, or
        //   - Both of the above.
        //
        // If it is possible for the storage device to be accessed by
@@ -171,9 +172,12 @@ type Volume interface {
        // reliably or fail outright.
        //
        // Corollary: A successful Touch or Put guarantees a block
-       // will not be deleted for at least blobSignatureTTL
+       // will not be trashed for at least blobSignatureTTL
        // seconds.
-       Delete(loc string) error
+       Trash(loc string) error
+
+       // Untrash moves block from trash back into store
+       Untrash(loc string) error
 
        // Status returns a *VolumeStatus representing the current
        // in-use and available storage capacity and an
index 7580a202594426173ca14d54d9fda8c523171e30..e168940fdd660f9c3487fea26e1decdec8ba9098 100644 (file)
@@ -420,7 +420,7 @@ func testDeleteNewBlock(t TB, factory TestableVolumeFactory) {
 
        v.Put(TestHash, TestBlock)
 
-       if err := v.Delete(TestHash); err != nil {
+       if err := v.Trash(TestHash); err != nil {
                t.Error(err)
        }
        data, err := v.Get(TestHash)
@@ -449,7 +449,7 @@ func testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
        v.Put(TestHash, TestBlock)
        v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
 
-       if err := v.Delete(TestHash); err != nil {
+       if err := v.Trash(TestHash); err != nil {
                t.Error(err)
        }
        if _, err := v.Get(TestHash); err == nil || !os.IsNotExist(err) {
@@ -463,7 +463,7 @@ func testDeleteNoSuchBlock(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
 
-       if err := v.Delete(TestHash2); err == nil {
+       if err := v.Trash(TestHash2); err == nil {
                t.Errorf("Expected error when attempting to delete a non-existing block")
        }
 }
@@ -535,7 +535,7 @@ func testUpdateReadOnly(t TB, factory TestableVolumeFactory) {
        }
 
        // Delete a block from a read-only volume should result in error
-       err = v.Delete(TestHash)
+       err = v.Trash(TestHash)
        if err == nil {
                t.Errorf("Expected error when deleting block from a read-only volume")
        }
index d6714365de5bef98ad082b93f595231993bafa48..53ffeef0bba186d7f995e6e6afb00feb194c5e7f 100644 (file)
@@ -183,7 +183,7 @@ func (v *MockVolume) IndexTo(prefix string, w io.Writer) error {
        return nil
 }
 
-func (v *MockVolume) Delete(loc string) error {
+func (v *MockVolume) Trash(loc string) error {
        v.gotCall("Delete")
        <-v.Gate
        if v.Readonly {
@@ -199,6 +199,11 @@ func (v *MockVolume) Delete(loc string) error {
        return os.ErrNotExist
 }
 
+// TBD
+func (v *MockVolume) Untrash(loc string) error {
+       return nil
+}
+
 func (v *MockVolume) Status() *VolumeStatus {
        var used uint64
        for _, block := range v.Store {
index 910cc25d613cb7690f944b418aebf5c205c7aced..da1d390279b3c0afe94a388c002275b0cf44790e 100644 (file)
@@ -363,7 +363,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
 }
 
 // Delete deletes the block data from the unix storage
-func (v *UnixVolume) Delete(loc string) error {
+func (v *UnixVolume) Trash(loc string) error {
        // Touch() must be called before calling Write() on a block.  Touch()
        // also uses lockfile().  This avoids a race condition between Write()
        // and Delete() because either (a) the file will be deleted and Touch()
@@ -405,6 +405,12 @@ func (v *UnixVolume) Delete(loc string) error {
        return os.Remove(p)
 }
 
+// Untrash moves block from trash back into store
+// TBD
+func (v *UnixVolume) Untrash(loc string) error {
+       return nil
+}
+
 // blockDir returns the fully qualified directory name for the directory
 // where loc is (or would be) stored on this volume.
 func (v *UnixVolume) blockDir(loc string) string {
index b216810f8cb0fc1008e4bb7bc99b3c306728d70a..0775e89ed275d14f7e2be510084a52e39af84472 100644 (file)
@@ -166,7 +166,7 @@ func TestUnixVolumeReadonly(t *testing.T) {
                t.Errorf("got err %v, expected MethodDisabledError", err)
        }
 
-       err = v.Delete(TestHash)
+       err = v.Trash(TestHash)
        if err != MethodDisabledError {
                t.Errorf("got err %v, expected MethodDisabledError", err)
        }