// List mounts: UUID, readonly, tier, device ID, ...
rest.HandleFunc(`/mounts`, rtr.Mounts).Methods("GET")
+ rest.HandleFunc(`/mounts/{uuid}/blocks`, rtr.MountBlocks).Methods("GET")
+ rest.HandleFunc(`/mounts/{uuid}/blocks/{prefix:[0-9a-f]{0,32}}`, rtr.MountBlocks).Methods("GET")
// Replace the current pull queue.
rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
}
}
+// MountBlocks responds to "GET /mounts/{uuid}/blocks" requests.
+func (rtr *router) MountBlocks(resp http.ResponseWriter, req *http.Request) {
+ if !IsSystemAuth(GetAPIToken(req)) {
+ http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
+ return
+ }
+
+ uuid := mux.Vars(req)["uuid"]
+ prefix := mux.Vars(req)["prefix"]
+ if v := KeepVM.Lookup(uuid, false); v == nil {
+ http.Error(resp, "mount not found", http.StatusNotFound)
+ } else if err := v.IndexTo(prefix, resp); err != nil {
+ http.Error(resp, err.Error(), http.StatusInternalServerError)
+ } else {
+ resp.Write([]byte{'\n'})
+ }
+}
+
// PoolStatus struct
type PoolStatus struct {
Alloc uint64 `json:"BytesAllocated"`
type PullRequest struct {
Locator string `json:"locator"`
Servers []string `json:"servers"`
+
+ // Destination mount, or "" for "anywhere"
+ MountUUID string
}
// PullHandler processes "PUT /pull" requests for the data manager.
type TrashRequest struct {
Locator string `json:"locator"`
BlockMtime int64 `json:"block_mtime"`
+
+ // Target mount, or "" for "everywhere"
+ MountUUID string
}
// TrashHandler processes /trash requests.
--- /dev/null
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "net/http"
+ "net/http/httptest"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&MountsSuite{})
+
+type MountsSuite struct {
+ vm VolumeManager
+ rtr http.Handler
+}
+
+func (s *MountsSuite) SetUpTest(c *check.C) {
+ s.vm = MakeTestVolumeManager(2)
+ KeepVM = s.vm
+ s.rtr = MakeRESTRouter()
+ theConfig.systemAuthToken = arvadostest.DataManagerToken
+}
+
+func (s *MountsSuite) TearDownTest(c *check.C) {
+ s.vm.Close()
+ KeepVM = nil
+ theConfig = DefaultConfig()
+ theConfig.Start()
+}
+
+func (s *MountsSuite) TestMounts(c *check.C) {
+ vols := s.vm.AllWritable()
+ vols[0].Put(context.Background(), TestHash, TestBlock)
+ vols[1].Put(context.Background(), TestHash2, TestBlock2)
+
+ resp := s.call("GET", "/mounts", "")
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+ var mntList []struct {
+ UUID string
+ }
+ err := json.Unmarshal(resp.Body.Bytes(), &mntList)
+ c.Check(err, check.IsNil)
+ c.Check(len(mntList), check.Equals, 2)
+
+ // Bad auth
+ for _, tok := range []string{"", "xyzzy"} {
+ resp = s.call("GET", "/mounts/"+mntList[1].UUID+"/blocks", tok)
+ c.Check(resp.Code, check.Equals, http.StatusUnauthorized)
+ c.Check(resp.Body.String(), check.Equals, "Unauthorized\n")
+ }
+
+ tok := arvadostest.DataManagerToken
+
+ // Nonexistent mount UUID
+ resp = s.call("GET", "/mounts/X/blocks", tok)
+ c.Check(resp.Code, check.Equals, http.StatusNotFound)
+ c.Check(resp.Body.String(), check.Equals, "mount not found\n")
+
+ // Complete index of first mount
+ resp = s.call("GET", "/mounts/"+mntList[0].UUID+"/blocks", tok)
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+ c.Check(resp.Body.String(), check.Matches, TestHash+`\+[0-9]+ [0-9]+\n\n`)
+
+ // Partial index of first mount (one block matches prefix)
+ resp = s.call("GET", "/mounts/"+mntList[0].UUID+"/blocks/"+TestHash[:2], tok)
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+ c.Check(resp.Body.String(), check.Matches, TestHash+`\+[0-9]+ [0-9]+\n\n`)
+
+ // Complete index of second mount (note trailing slash)
+ resp = s.call("GET", "/mounts/"+mntList[1].UUID+"/blocks/", tok)
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+ c.Check(resp.Body.String(), check.Matches, TestHash2+`\+[0-9]+ [0-9]+\n\n`)
+
+ // Partial index of second mount (no blocks match prefix)
+ resp = s.call("GET", "/mounts/"+mntList[1].UUID+"/blocks/"+TestHash[:2], tok)
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+ c.Check(resp.Body.String(), check.Equals, "\n")
+}
+
+func (s *MountsSuite) call(method, path, tok string) *httptest.ResponseRecorder {
+ resp := httptest.NewRecorder()
+ req, _ := http.NewRequest(method, path, nil)
+ if tok != "" {
+ req.Header.Set("Authorization", "OAuth2 "+tok)
+ }
+ s.rtr.ServeHTTP(resp, req)
+ return resp
+}
"context"
"crypto/rand"
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
"io"
"io/ioutil"
"time"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+
log "github.com/Sirupsen/logrus"
)
nextItem := pullq.NextItem
for item := range nextItem {
pullRequest := item.(PullRequest)
- err := PullItemAndProcess(item.(PullRequest), GenerateRandomAPIToken(), keepClient)
+ err := PullItemAndProcess(item.(PullRequest), keepClient)
pullq.DoneItem <- struct{}{}
if err == nil {
log.Printf("Pull %s success", pullRequest)
// Using this token & signature, retrieve the given block.
// Write to storage
//
-func PullItemAndProcess(pullRequest PullRequest, token string, keepClient *keepclient.KeepClient) (err error) {
- keepClient.Arvados.ApiToken = token
+func PullItemAndProcess(pullRequest PullRequest, keepClient *keepclient.KeepClient) error {
+ var vol Volume
+ if uuid := pullRequest.MountUUID; uuid != "" {
+ vol = KeepVM.Lookup(pullRequest.MountUUID, true)
+ if vol == nil {
+ return fmt.Errorf("pull req has nonexistent mount: %v", pullRequest)
+ }
+ }
+
+ keepClient.Arvados.ApiToken = randomToken
serviceRoots := make(map[string]string)
for _, addr := range pullRequest.Servers {
// Generate signature with a random token
expiresAt := time.Now().Add(60 * time.Second)
- signedLocator := SignLocator(pullRequest.Locator, token, expiresAt)
+ signedLocator := SignLocator(pullRequest.Locator, randomToken, expiresAt)
reader, contentLen, _, err := GetContent(signedLocator, keepClient)
if err != nil {
- return
+ return err
}
if reader == nil {
return fmt.Errorf("No reader found for : %s", signedLocator)
return fmt.Errorf("Content not found for: %s", signedLocator)
}
- err = PutContent(readContent, pullRequest.Locator)
- return
+ writePulledBlock(vol, readContent, pullRequest.Locator)
+ return nil
}
// Fetch the content for the given locator using keepclient.
-var GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (
- reader io.ReadCloser, contentLength int64, url string, err error) {
- reader, blocklen, url, err := keepClient.Get(signedLocator)
- return reader, blocklen, url, err
+var GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (io.ReadCloser, int64, string, error) {
+ return keepClient.Get(signedLocator)
}
-const alphaNumeric = "0123456789abcdefghijklmnopqrstuvwxyz"
+var writePulledBlock = func(volume Volume, data []byte, locator string) {
+ var err error
+ if volume != nil {
+ err = volume.Put(context.Background(), locator, data)
+ } else {
+ _, err = PutBlock(context.Background(), data, locator)
+ }
+ if err != nil {
+ log.Printf("error writing pulled block %q: %s", locator, err)
+ }
+}
-// GenerateRandomAPIToken generates a random api token
-func GenerateRandomAPIToken() string {
+var randomToken = func() string {
+ const alphaNumeric = "0123456789abcdefghijklmnopqrstuvwxyz"
var bytes = make([]byte, 36)
rand.Read(bytes)
for i, b := range bytes {
bytes[i] = alphaNumeric[b%byte(len(alphaNumeric))]
}
return (string(bytes))
-}
-
-// Put block
-var PutContent = func(content []byte, locator string) (err error) {
- _, err = PutBlock(context.Background(), content, locator)
- return
-}
+}()
import (
"bytes"
"errors"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/arvadostest"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
"io"
"net/http"
"os"
"strings"
"testing"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
)
var keepClient *keepclient.KeepClient
// putting an item on the pullq so that the errors can be verified.
func performPullWorkerIntegrationTest(testData PullWorkIntegrationTestData, pullRequest PullRequest, t *testing.T) {
- // Override PutContent to mock PutBlock functionality
- defer func(orig func([]byte, string) error) { PutContent = orig }(PutContent)
- PutContent = func(content []byte, locator string) (err error) {
+ // Override writePulledBlock to mock PutBlock functionality
+ defer func(orig func(Volume, []byte, string)) { writePulledBlock = orig }(writePulledBlock)
+ writePulledBlock = func(v Volume, content []byte, locator string) {
if string(content) != testData.Content {
- t.Errorf("PutContent invoked with unexpected data. Expected: %s; Found: %s", testData.Content, content)
+ t.Errorf("writePulledBlock invoked with unexpected data. Expected: %s; Found: %s", testData.Content, content)
}
- return
}
// Override GetContent to mock keepclient Get functionality
return rdr, int64(len(testData.Content)), "", nil
}
- keepClient.Arvados.ApiToken = GenerateRandomAPIToken()
- err := PullItemAndProcess(pullRequest, keepClient.Arvados.ApiToken, keepClient)
+ err := PullItemAndProcess(pullRequest, keepClient)
if len(testData.GetError) > 0 {
if (err == nil) || (!strings.Contains(err.Error(), testData.GetError)) {
import (
"bytes"
"errors"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- . "gopkg.in/check.v1"
"io"
"net/http"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ . "gopkg.in/check.v1"
)
var _ = Suite(&PullWorkerTestSuite{})
return rc, int64(len(testData.readContent)), "", nil
}
- // Override PutContent to mock PutBlock functionality
- defer func(orig func([]byte, string) error) { PutContent = orig }(PutContent)
- PutContent = func(content []byte, locator string) (err error) {
+ // Override writePulledBlock to mock PutBlock functionality
+ defer func(orig func(Volume, []byte, string)) { writePulledBlock = orig }(writePulledBlock)
+ writePulledBlock = func(v Volume, content []byte, locator string) {
if testData.putError {
- err = errors.New("Error putting data")
- putError = err
- return err
+ putError = errors.New("Error putting data")
+ return
}
putContent = content
- return nil
}
c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
return
}
- for _, volume := range KeepVM.AllWritable() {
+ var volumes []Volume
+ if uuid := trashRequest.MountUUID; uuid == "" {
+ volumes = KeepVM.AllWritable()
+ } else if v := KeepVM.Lookup(uuid, true); v == nil {
+ log.Printf("warning: trash request for nonexistent mount: %v", trashRequest)
+ return
+ } else {
+ volumes = []Volume{v}
+ }
+
+ for _, volume := range volumes {
mtime, err := volume.Mtime(trashRequest.Locator)
if err != nil {
log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
// Mounts returns all mounts (volume attachments).
Mounts() []*VolumeMount
+ // Lookup returns the volume under the given mount
+ // UUID. Returns nil if the mount does not exist. If
+ // write==true, returns nil if the volume is not writable.
+ Lookup(uuid string, write bool) Volume
+
// AllReadable returns all volumes.
AllReadable() []Volume
return vm.mounts
}
+func (vm *RRVolumeManager) Lookup(uuid string, needWrite bool) Volume {
+ for _, mnt := range vm.mounts {
+ if mnt.UUID == uuid && (!needWrite || !mnt.ReadOnly) {
+ return mnt.volume
+ }
+ }
+ return nil
+}
+
// AllReadable returns an array of all readable volumes
func (vm *RRVolumeManager) AllReadable() []Volume {
return vm.readables