2769: implement DELETE.
authorTim Pierce <twp@curoverse.com>
Wed, 23 Jul 2014 17:40:48 +0000 (13:40 -0400)
committerTim Pierce <twp@curoverse.com>
Wed, 30 Jul 2014 17:59:20 +0000 (13:59 -0400)
The DeleteHandler method implements DELETE requests for Keep, checking
that the user is authorized to issue DELETE and then deleting the
requested block from local volumes.

* CanDelete checks that the user with the given token is authorized to
  delete blocks from Keep.
* MockVolume and UnixVolume objects provide a Delete method for deleting
  a block from that volume.
* TestDeleteHandler tests:
** Unauthenticated requests
** Requests from a non-admin user
** Requests from a non-admin user for a nonexistent block
** Requests from a non-admin user for an existing block

services/keep/src/keep/handler_test.go
services/keep/src/keep/handlers.go
services/keep/src/keep/volume.go
services/keep/src/keep/volume_unix.go

index 1b0a90fb4e7096c38d2236629122bd82c8ad3da5..cab203d5a5904c5d40cb32486227061c24d31e82 100644 (file)
@@ -11,10 +11,12 @@ package main
 
 import (
        "bytes"
+       "encoding/json"
        "fmt"
        "github.com/gorilla/mux"
        "net/http"
        "net/http/httptest"
+       "os"
        "regexp"
        "strings"
        "testing"
@@ -411,6 +413,119 @@ func TestIndexHandler(t *testing.T) {
        }
 }
 
+// TestDeleteHandler
+//
+// Cases to test:
+//
+//   With no token and with a non-data-manager token:
+//   * Delete existing block
+//     (test for 403 Forbidden, confirm block not deleted)
+//
+//   With data manager token:
+//
+//   * Delete existing block
+//     (test for 200 OK, response counts, confirm block deleted)
+//
+//   * Delete nonexistent block
+//     (test for 200 OK, response counts)
+//
+//   TODO(twp):
+//
+//   * Delete block on read-only volume
+//     (test for 200 OK, response counts, confirm block not deleted)
+//
+func TestDeleteHandler(t *testing.T) {
+       defer teardown()
+
+       // Set up Keep volumes and populate them.
+       // Include multiple blocks on different volumes, and
+       // some metadata files (which should be omitted from index listings)
+       KeepVM = MakeTestVolumeManager(2)
+       defer func() { KeepVM.Quit() }()
+
+       vols := KeepVM.Volumes()
+       vols[0].Put(TEST_HASH, TEST_BLOCK)
+
+       // Set up a REST router for testing the handlers.
+       rest := MakeRESTRouter()
+
+       var user_token = "NOT DATA MANAGER TOKEN"
+       data_manager_token = "DATA MANAGER TOKEN"
+
+       unauth_req := &RequestTester{
+               method: "DELETE",
+               uri:    "http://localhost:25107/" + TEST_HASH,
+       }
+
+       user_req := &RequestTester{
+               method:    "DELETE",
+               uri:       "http://localhost:25107/" + TEST_HASH,
+               api_token: user_token,
+       }
+
+       superuser_existing_block_req := &RequestTester{
+               method:    "DELETE",
+               uri:       "http://localhost:25107/" + TEST_HASH,
+               api_token: data_manager_token,
+       }
+
+       superuser_nonexistent_block_req := &RequestTester{
+               method:    "DELETE",
+               uri:       "http://localhost:25107/" + TEST_HASH_2,
+               api_token: data_manager_token,
+       }
+
+       // Unauthenticated request returns PermissionError.
+       var response *httptest.ResponseRecorder
+       response = IssueRequest(rest, unauth_req)
+       ExpectStatusCode(t,
+               "unauthenticated request",
+               PermissionError.HTTPCode,
+               response)
+
+       // Authenticated non-admin request returns PermissionError.
+       response = IssueRequest(rest, user_req)
+       ExpectStatusCode(t,
+               "authenticated non-admin request",
+               PermissionError.HTTPCode,
+               response)
+
+       // Authenticated admin request for nonexistent block.
+       type deletecounter struct {
+               Deleted int `json:"copies_deleted"`
+               Failed  int `json:"copies_failed"`
+       }
+       var dc deletecounter
+
+       response = IssueRequest(rest, superuser_nonexistent_block_req)
+       ExpectStatusCode(t,
+               "data manager request, nonexistent block",
+               http.StatusOK,
+               response)
+       // Expect response {"copies_deleted":0,"copies_failed":0}
+       json.NewDecoder(response.Body).Decode(&dc)
+       if dc.Deleted != 0 || dc.Failed != 0 {
+               t.Errorf("superuser_nonexistent_block_req: response = %+v", dc)
+       }
+
+       // Authenticated admin request for existing block.
+       response = IssueRequest(rest, superuser_existing_block_req)
+       ExpectStatusCode(t,
+               "data manager request, existing block",
+               http.StatusOK,
+               response)
+       // Expect response {"copies_deleted":1,"copies_failed":0}
+       json.NewDecoder(response.Body).Decode(&dc)
+       if dc.Deleted != 1 || dc.Failed != 0 {
+               t.Errorf("superuser_existing_block_req: response = %+v", dc)
+       }
+       // Confirm the block has been deleted
+       _, err := vols[0].Get(TEST_HASH)
+       if !os.IsNotExist(err) {
+               t.Error("superuser_existing_block_req: block not deleted")
+       }
+}
+
 // ====================
 // Helper functions
 // ====================
index a5d5e47e09886938701071552adcfc5b2f141f28..234e21a8eadda17eae3c1e1bce4d1bfbaefe404e 100644 (file)
@@ -39,6 +39,7 @@ func MakeRESTRouter() *mux.Router {
                GetBlockHandler).Methods("GET", "HEAD")
 
        rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
+       rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
 
        // For IndexHandler we support:
        //   /index           - returns all locators
@@ -315,6 +316,61 @@ func GetVolumeStatus(volume string) *VolumeStatus {
        return &VolumeStatus{volume, devnum, free, used}
 }
 
+// DeleteHandler processes DELETE requests.
+//
+// DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
+// from all connected volumes.
+//
+// Only the Data Manager, or an Arvados admin with scope "all", are
+// allowed to issue DELETE requests.  If a DELETE request is not
+// authenticated or is issued by a non-admin user, the server returns
+// a PermissionError.
+//
+// Upon completion, DELETE returns HTTP 200 OK with a JSON message body
+// in the format
+//
+//    {"copies_deleted":d,"copies_failed":f}
+//
+// where d and f are integers representing the number of blocks that
+// were successfully and unsuccessfully deleted.
+//
+func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
+       hash := mux.Vars(req)["hash"]
+
+       // Confirm that this user is an admin and has a token with unlimited scope.
+       var tok = GetApiToken(req)
+       if tok == "" || !CanDelete(tok) {
+               http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
+               return
+       }
+
+       // Delete copies of this block from all available volumes.  Report
+       // how many blocks were successfully and unsuccessfully
+       // deleted.
+       var result struct {
+               Deleted int `json:"copies_deleted"`
+               Failed  int `json:"copies_failed"`
+       }
+       for _, vol := range KeepVM.Volumes() {
+               if err := vol.Delete(hash); err == nil {
+                       result.Deleted++
+               } else if os.IsNotExist(err) {
+                       continue
+               } else {
+                       result.Failed++
+                       log.Println("DeleteHandler:", err)
+               }
+       }
+
+       if j, err := json.Marshal(result); err == nil {
+               resp.Write(j)
+       } else {
+               log.Printf("json.Marshal: %s\n", err)
+               log.Printf("result = %v\n", result)
+               http.Error(resp, err.Error(), 500)
+       }
+}
+
 func GetBlock(hash string) ([]byte, error) {
        // Attempt to read the requested hash from a keep volume.
        error_to_caller := NotFoundError
@@ -477,3 +533,23 @@ func IsExpired(timestamp_hex string) bool {
        }
        return time.Unix(ts, 0).Before(time.Now())
 }
+
+// CanDelete returns true if the user identified by api_token is
+// allowed to delete blocks.
+func CanDelete(api_token string) bool {
+       if api_token == "" {
+               return false
+       }
+       // Blocks may be deleted only when Keep has been configured with a
+       // data manager.
+       if data_manager_token == "" {
+               return false
+       }
+       if api_token == data_manager_token {
+               return true
+       }
+       // TODO(twp): look up api_token with the API server
+       // return true if is_admin is true and if the token
+       // has unlimited scope
+       return false
+}
index d1956862b10d74d0da40fe7c8809f5bbdb863d4a..b15ef23a85a403c83128bfde5ce18037682f86ca 100644 (file)
@@ -7,14 +7,15 @@ package main
 import (
        "errors"
        "fmt"
-       "strings"
        "os"
+       "strings"
 )
 
 type Volume interface {
        Get(loc string) ([]byte, error)
        Put(loc string, block []byte) error
        Index(prefix string) string
+       Delete(loc string) error
        Status() *VolumeStatus
        String() string
 }
@@ -61,6 +62,14 @@ func (v *MockVolume) Index(prefix string) string {
        return result
 }
 
+func (v *MockVolume) Delete(loc string) error {
+       if _, ok := v.Store[loc]; ok {
+               delete(v.Store, loc)
+               return nil
+       }
+       return os.ErrNotExist
+}
+
 func (v *MockVolume) Status() *VolumeStatus {
        var used uint64
        for _, block := range v.Store {
index aafc8debf134ec4bf8c44fbb93d2bc6313892e72..ddddd5abf901d81a86d5bf0a74cab8bc57081a08 100644 (file)
@@ -109,8 +109,7 @@ func (v *UnixVolume) Put(loc string, block []byte) error {
 // corrupted data block.
 //
 func (v *UnixVolume) Read(loc string) ([]byte, error) {
-       blockFilename := filepath.Join(v.root, loc[0:3], loc)
-       buf, err := ioutil.ReadFile(blockFilename)
+       buf, err := ioutil.ReadFile(v.blockPath(loc))
        return buf, err
 }
 
@@ -123,22 +122,22 @@ func (v *UnixVolume) Write(loc string, block []byte) error {
        if v.IsFull() {
                return FullError
        }
-       blockDir := filepath.Join(v.root, loc[0:3])
-       if err := os.MkdirAll(blockDir, 0755); err != nil {
+       bdir := v.blockDir(loc)
+       if err := os.MkdirAll(bdir, 0755); err != nil {
                log.Printf("%s: could not create directory %s: %s",
-                       loc, blockDir, err)
+                       loc, bdir, err)
                return err
        }
 
-       tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+loc)
+       tmpfile, tmperr := ioutil.TempFile(bdir, "tmp"+loc)
        if tmperr != nil {
-               log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, loc, tmperr)
+               log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
                return tmperr
        }
-       blockFilename := filepath.Join(blockDir, loc)
+       bpath := v.blockPath(loc)
 
        if _, err := tmpfile.Write(block); err != nil {
-               log.Printf("%s: writing to %s: %s\n", v, blockFilename, err)
+               log.Printf("%s: writing to %s: %s\n", v, bpath, err)
                return err
        }
        if err := tmpfile.Close(); err != nil {
@@ -146,8 +145,8 @@ func (v *UnixVolume) Write(loc string, block []byte) error {
                os.Remove(tmpfile.Name())
                return err
        }
-       if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
-               log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
+       if err := os.Rename(tmpfile.Name(), bpath); err != nil {
+               log.Printf("rename %s %s: %s\n", tmpfile.Name(), bpath, err)
                os.Remove(tmpfile.Name())
                return err
        }
@@ -230,6 +229,22 @@ func (v *UnixVolume) Index(prefix string) (output string) {
        return
 }
 
+func (v *UnixVolume) Delete(loc string) error {
+       return os.Remove(v.blockPath(loc))
+}
+
+// blockDir returns the fully qualified directory name for the directory
+// where loc is (or would be) stored on this volume.
+func (v *UnixVolume) blockDir(loc string) string {
+       return filepath.Join(v.root, loc[0:3])
+}
+
+// blockPath returns the fully qualified pathname for the path to loc
+// on this volume.
+func (v *UnixVolume) blockPath(loc string) string {
+       return filepath.Join(v.blockDir(loc), loc)
+}
+
 // IsFull returns true if the free space on the volume is less than
 // MIN_FREE_KILOBYTES.
 //