11644: Accept index/pull/trash requests for a specific mount.
authorTom Clegg <tom@curoverse.com>
Mon, 15 May 2017 19:36:02 +0000 (15:36 -0400)
committerTom Clegg <tom@curoverse.com>
Mon, 15 May 2017 21:35:23 +0000 (17:35 -0400)
services/keepstore/handlers.go
services/keepstore/mounts_test.go [new file with mode: 0644]
services/keepstore/pull_worker.go
services/keepstore/pull_worker_integration_test.go
services/keepstore/pull_worker_test.go
services/keepstore/trash_worker.go
services/keepstore/volume.go

index 789d97b24c9ab54779e984132a327fe3ce92610b..33561426f912d08448d7297a192efe4d3b12cff0 100644 (file)
@@ -62,6 +62,8 @@ func MakeRESTRouter() *router {
 
        // List mounts: UUID, readonly, tier, device ID, ...
        rest.HandleFunc(`/mounts`, rtr.Mounts).Methods("GET")
+       rest.HandleFunc(`/mounts/{uuid}/blocks`, rtr.MountBlocks).Methods("GET")
+       rest.HandleFunc(`/mounts/{uuid}/blocks/{prefix:[0-9a-f]{0,32}}`, rtr.MountBlocks).Methods("GET")
 
        // Replace the current pull queue.
        rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
@@ -261,6 +263,24 @@ func (rtr *router) Mounts(resp http.ResponseWriter, req *http.Request) {
        }
 }
 
+// MountBlocks responds to "GET /mounts/{uuid}/blocks" requests.
+func (rtr *router) MountBlocks(resp http.ResponseWriter, req *http.Request) {
+       if !IsSystemAuth(GetAPIToken(req)) {
+               http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
+               return
+       }
+
+       uuid := mux.Vars(req)["uuid"]
+       prefix := mux.Vars(req)["prefix"]
+       if v := KeepVM.Lookup(uuid, false); v == nil {
+               http.Error(resp, "mount not found", http.StatusNotFound)
+       } else if err := v.IndexTo(prefix, resp); err != nil {
+               http.Error(resp, err.Error(), http.StatusInternalServerError)
+       } else {
+               resp.Write([]byte{'\n'})
+       }
+}
+
 // PoolStatus struct
 type PoolStatus struct {
        Alloc uint64 `json:"BytesAllocated"`
@@ -474,6 +494,9 @@ func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
 type PullRequest struct {
        Locator string   `json:"locator"`
        Servers []string `json:"servers"`
+
+       // Destination mount, or "" for "anywhere"
+       MountUUID string
 }
 
 // PullHandler processes "PUT /pull" requests for the data manager.
@@ -510,6 +533,9 @@ func PullHandler(resp http.ResponseWriter, req *http.Request) {
 type TrashRequest struct {
        Locator    string `json:"locator"`
        BlockMtime int64  `json:"block_mtime"`
+
+       // Target mount, or "" for "everywhere"
+       MountUUID string
 }
 
 // TrashHandler processes /trash requests.
diff --git a/services/keepstore/mounts_test.go b/services/keepstore/mounts_test.go
new file mode 100644 (file)
index 0000000..bfe298b
--- /dev/null
@@ -0,0 +1,91 @@
+package main
+
+import (
+       "context"
+       "encoding/json"
+       "net/http"
+       "net/http/httptest"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&MountsSuite{})
+
+type MountsSuite struct {
+       vm  VolumeManager
+       rtr http.Handler
+}
+
+func (s *MountsSuite) SetUpTest(c *check.C) {
+       s.vm = MakeTestVolumeManager(2)
+       KeepVM = s.vm
+       s.rtr = MakeRESTRouter()
+       theConfig.systemAuthToken = arvadostest.DataManagerToken
+}
+
+func (s *MountsSuite) TearDownTest(c *check.C) {
+       s.vm.Close()
+       KeepVM = nil
+       theConfig = DefaultConfig()
+       theConfig.Start()
+}
+
+func (s *MountsSuite) TestMounts(c *check.C) {
+       vols := s.vm.AllWritable()
+       vols[0].Put(context.Background(), TestHash, TestBlock)
+       vols[1].Put(context.Background(), TestHash2, TestBlock2)
+
+       resp := s.call("GET", "/mounts", "")
+       c.Check(resp.Code, check.Equals, http.StatusOK)
+       var mntList []struct {
+               UUID string
+       }
+       err := json.Unmarshal(resp.Body.Bytes(), &mntList)
+       c.Check(err, check.IsNil)
+       c.Check(len(mntList), check.Equals, 2)
+
+       // Bad auth
+       for _, tok := range []string{"", "xyzzy"} {
+               resp = s.call("GET", "/mounts/"+mntList[1].UUID+"/blocks", tok)
+               c.Check(resp.Code, check.Equals, http.StatusUnauthorized)
+               c.Check(resp.Body.String(), check.Equals, "Unauthorized\n")
+       }
+
+       tok := arvadostest.DataManagerToken
+
+       // Nonexistent mount UUID
+       resp = s.call("GET", "/mounts/X/blocks", tok)
+       c.Check(resp.Code, check.Equals, http.StatusNotFound)
+       c.Check(resp.Body.String(), check.Equals, "mount not found\n")
+
+       // Complete index of first mount
+       resp = s.call("GET", "/mounts/"+mntList[0].UUID+"/blocks", tok)
+       c.Check(resp.Code, check.Equals, http.StatusOK)
+       c.Check(resp.Body.String(), check.Matches, TestHash+`\+[0-9]+ [0-9]+\n\n`)
+
+       // Partial index of first mount (one block matches prefix)
+       resp = s.call("GET", "/mounts/"+mntList[0].UUID+"/blocks/"+TestHash[:2], tok)
+       c.Check(resp.Code, check.Equals, http.StatusOK)
+       c.Check(resp.Body.String(), check.Matches, TestHash+`\+[0-9]+ [0-9]+\n\n`)
+
+       // Complete index of second mount (note trailing slash)
+       resp = s.call("GET", "/mounts/"+mntList[1].UUID+"/blocks/", tok)
+       c.Check(resp.Code, check.Equals, http.StatusOK)
+       c.Check(resp.Body.String(), check.Matches, TestHash2+`\+[0-9]+ [0-9]+\n\n`)
+
+       // Partial index of second mount (no blocks match prefix)
+       resp = s.call("GET", "/mounts/"+mntList[1].UUID+"/blocks/"+TestHash[:2], tok)
+       c.Check(resp.Code, check.Equals, http.StatusOK)
+       c.Check(resp.Body.String(), check.Equals, "\n")
+}
+
+func (s *MountsSuite) call(method, path, tok string) *httptest.ResponseRecorder {
+       resp := httptest.NewRecorder()
+       req, _ := http.NewRequest(method, path, nil)
+       if tok != "" {
+               req.Header.Set("Authorization", "OAuth2 "+tok)
+       }
+       s.rtr.ServeHTTP(resp, req)
+       return resp
+}
index 3c6278d478d3d897982b2b6f8c9a166c505e6433..150b5ca2349a6e2765099feb39abc1a3f90ebedd 100644 (file)
@@ -4,11 +4,12 @@ import (
        "context"
        "crypto/rand"
        "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "io"
        "io/ioutil"
        "time"
 
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+
        log "github.com/Sirupsen/logrus"
 )
 
@@ -23,7 +24,7 @@ func RunPullWorker(pullq *WorkQueue, keepClient *keepclient.KeepClient) {
        nextItem := pullq.NextItem
        for item := range nextItem {
                pullRequest := item.(PullRequest)
-               err := PullItemAndProcess(item.(PullRequest), GenerateRandomAPIToken(), keepClient)
+               err := PullItemAndProcess(item.(PullRequest), keepClient)
                pullq.DoneItem <- struct{}{}
                if err == nil {
                        log.Printf("Pull %s success", pullRequest)
@@ -40,8 +41,16 @@ func RunPullWorker(pullq *WorkQueue, keepClient *keepclient.KeepClient) {
 //             Using this token & signature, retrieve the given block.
 //             Write to storage
 //
-func PullItemAndProcess(pullRequest PullRequest, token string, keepClient *keepclient.KeepClient) (err error) {
-       keepClient.Arvados.ApiToken = token
+func PullItemAndProcess(pullRequest PullRequest, keepClient *keepclient.KeepClient) error {
+       var vol Volume
+       if uuid := pullRequest.MountUUID; uuid != "" {
+               vol = KeepVM.Lookup(pullRequest.MountUUID, true)
+               if vol == nil {
+                       return fmt.Errorf("pull req has nonexistent mount: %v", pullRequest)
+               }
+       }
+
+       keepClient.Arvados.ApiToken = randomToken
 
        serviceRoots := make(map[string]string)
        for _, addr := range pullRequest.Servers {
@@ -51,11 +60,11 @@ func PullItemAndProcess(pullRequest PullRequest, token string, keepClient *keepc
 
        // Generate signature with a random token
        expiresAt := time.Now().Add(60 * time.Second)
-       signedLocator := SignLocator(pullRequest.Locator, token, expiresAt)
+       signedLocator := SignLocator(pullRequest.Locator, randomToken, expiresAt)
 
        reader, contentLen, _, err := GetContent(signedLocator, keepClient)
        if err != nil {
-               return
+               return err
        }
        if reader == nil {
                return fmt.Errorf("No reader found for : %s", signedLocator)
@@ -71,31 +80,33 @@ func PullItemAndProcess(pullRequest PullRequest, token string, keepClient *keepc
                return fmt.Errorf("Content not found for: %s", signedLocator)
        }
 
-       err = PutContent(readContent, pullRequest.Locator)
-       return
+       writePulledBlock(vol, readContent, pullRequest.Locator)
+       return nil
 }
 
 // Fetch the content for the given locator using keepclient.
-var GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (
-       reader io.ReadCloser, contentLength int64, url string, err error) {
-       reader, blocklen, url, err := keepClient.Get(signedLocator)
-       return reader, blocklen, url, err
+var GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (io.ReadCloser, int64, string, error) {
+       return keepClient.Get(signedLocator)
 }
 
-const alphaNumeric = "0123456789abcdefghijklmnopqrstuvwxyz"
+var writePulledBlock = func(volume Volume, data []byte, locator string) {
+       var err error
+       if volume != nil {
+               err = volume.Put(context.Background(), locator, data)
+       } else {
+               _, err = PutBlock(context.Background(), data, locator)
+       }
+       if err != nil {
+               log.Printf("error writing pulled block %q: %s", locator, err)
+       }
+}
 
-// GenerateRandomAPIToken generates a random api token
-func GenerateRandomAPIToken() string {
+var randomToken = func() string {
+       const alphaNumeric = "0123456789abcdefghijklmnopqrstuvwxyz"
        var bytes = make([]byte, 36)
        rand.Read(bytes)
        for i, b := range bytes {
                bytes[i] = alphaNumeric[b%byte(len(alphaNumeric))]
        }
        return (string(bytes))
-}
-
-// Put block
-var PutContent = func(content []byte, locator string) (err error) {
-       _, err = PutBlock(context.Background(), content, locator)
-       return
-}
+}()
index 77b4c75f4a702d4afd701d3601f9a54062b2ba2f..7ba72672cfc95831b9e547b2d5430fbdea385821 100644 (file)
@@ -3,14 +3,15 @@ package main
 import (
        "bytes"
        "errors"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "io"
        "net/http"
        "os"
        "strings"
        "testing"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
 )
 
 var keepClient *keepclient.KeepClient
@@ -109,13 +110,12 @@ func TestPullWorkerIntegration_GetExistingLocator(t *testing.T) {
 // putting an item on the pullq so that the errors can be verified.
 func performPullWorkerIntegrationTest(testData PullWorkIntegrationTestData, pullRequest PullRequest, t *testing.T) {
 
-       // Override PutContent to mock PutBlock functionality
-       defer func(orig func([]byte, string) error) { PutContent = orig }(PutContent)
-       PutContent = func(content []byte, locator string) (err error) {
+       // Override writePulledBlock to mock PutBlock functionality
+       defer func(orig func(Volume, []byte, string)) { writePulledBlock = orig }(writePulledBlock)
+       writePulledBlock = func(v Volume, content []byte, locator string) {
                if string(content) != testData.Content {
-                       t.Errorf("PutContent invoked with unexpected data. Expected: %s; Found: %s", testData.Content, content)
+                       t.Errorf("writePulledBlock invoked with unexpected data. Expected: %s; Found: %s", testData.Content, content)
                }
-               return
        }
 
        // Override GetContent to mock keepclient Get functionality
@@ -131,8 +131,7 @@ func performPullWorkerIntegrationTest(testData PullWorkIntegrationTestData, pull
                return rdr, int64(len(testData.Content)), "", nil
        }
 
-       keepClient.Arvados.ApiToken = GenerateRandomAPIToken()
-       err := PullItemAndProcess(pullRequest, keepClient.Arvados.ApiToken, keepClient)
+       err := PullItemAndProcess(pullRequest, keepClient)
 
        if len(testData.GetError) > 0 {
                if (err == nil) || (!strings.Contains(err.Error(), testData.GetError)) {
index 43a6de68443f693acb85696a0f5938836a34b2f7..d8dc695e29e376ee4f7f1fecbac41cab99c15c44 100644 (file)
@@ -3,12 +3,13 @@ package main
 import (
        "bytes"
        "errors"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       . "gopkg.in/check.v1"
        "io"
        "net/http"
        "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       . "gopkg.in/check.v1"
 )
 
 var _ = Suite(&PullWorkerTestSuite{})
@@ -259,16 +260,14 @@ func performTest(testData PullWorkerTestData, c *C) {
                return rc, int64(len(testData.readContent)), "", nil
        }
 
-       // Override PutContent to mock PutBlock functionality
-       defer func(orig func([]byte, string) error) { PutContent = orig }(PutContent)
-       PutContent = func(content []byte, locator string) (err error) {
+       // Override writePulledBlock to mock PutBlock functionality
+       defer func(orig func(Volume, []byte, string)) { writePulledBlock = orig }(writePulledBlock)
+       writePulledBlock = func(v Volume, content []byte, locator string) {
                if testData.putError {
-                       err = errors.New("Error putting data")
-                       putError = err
-                       return err
+                       putError = errors.New("Error putting data")
+                       return
                }
                putContent = content
-               return nil
        }
 
        c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
index 696c3e53a60abbd352efc035f5a0bb1afaec737f..a6f09c14776c37b22f57cc925917b5bb89062911 100644 (file)
@@ -35,7 +35,17 @@ func TrashItem(trashRequest TrashRequest) {
                return
        }
 
-       for _, volume := range KeepVM.AllWritable() {
+       var volumes []Volume
+       if uuid := trashRequest.MountUUID; uuid == "" {
+               volumes = KeepVM.AllWritable()
+       } else if v := KeepVM.Lookup(uuid, true); v == nil {
+               log.Printf("warning: trash request for nonexistent mount: %v", trashRequest)
+               return
+       } else {
+               volumes = []Volume{v}
+       }
+
+       for _, volume := range volumes {
                mtime, err := volume.Mtime(trashRequest.Locator)
                if err != nil {
                        log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
index 7e001a2470943e439e24cd6a149a4410754982ee..0b4bec4f8b2b96acb6a72ae86c9f564f86f8b768 100644 (file)
@@ -246,6 +246,11 @@ type VolumeManager interface {
        // Mounts returns all mounts (volume attachments).
        Mounts() []*VolumeMount
 
+       // Lookup returns the volume under the given mount
+       // UUID. Returns nil if the mount does not exist. If
+       // write==true, returns nil if the volume is not writable.
+       Lookup(uuid string, write bool) Volume
+
        // AllReadable returns all volumes.
        AllReadable() []Volume
 
@@ -335,6 +340,15 @@ func (vm *RRVolumeManager) Mounts() []*VolumeMount {
        return vm.mounts
 }
 
+func (vm *RRVolumeManager) Lookup(uuid string, needWrite bool) Volume {
+       for _, mnt := range vm.mounts {
+               if mnt.UUID == uuid && (!needWrite || !mnt.ReadOnly) {
+                       return mnt.volume
+               }
+       }
+       return nil
+}
+
 // AllReadable returns an array of all readable volumes
 func (vm *RRVolumeManager) AllReadable() []Volume {
        return vm.readables