"net/http/httptest"
"os"
"regexp"
+ "sort"
"strings"
"time"
// A RequestTester represents the parameters for an HTTP request to
// be issued on behalf of a unit test.
type RequestTester struct {
- uri string
- apiToken string
- method string
- requestBody []byte
+ uri string
+ apiToken string
+ method string
+ requestBody []byte
+ storageClasses string
}
// Test GetBlockHandler on the following situations:
var testcases = []pullTest{
{
"Valid pull list from an ordinary user",
- RequestTester{"/pull", userToken, "PUT", goodJSON},
+ RequestTester{"/pull", userToken, "PUT", goodJSON, ""},
http.StatusUnauthorized,
"Unauthorized\n",
},
{
"Invalid pull request from an ordinary user",
- RequestTester{"/pull", userToken, "PUT", badJSON},
+ RequestTester{"/pull", userToken, "PUT", badJSON, ""},
http.StatusUnauthorized,
"Unauthorized\n",
},
{
"Valid pull request from the data manager",
- RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", goodJSON},
+ RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", goodJSON, ""},
http.StatusOK,
"Received 3 pull requests\n",
},
{
"Invalid pull request from the data manager",
- RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", badJSON},
+ RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", badJSON, ""},
http.StatusBadRequest,
"",
},
var testcases = []trashTest{
{
"Valid trash list from an ordinary user",
- RequestTester{"/trash", userToken, "PUT", goodJSON},
+ RequestTester{"/trash", userToken, "PUT", goodJSON, ""},
http.StatusUnauthorized,
"Unauthorized\n",
},
{
"Invalid trash list from an ordinary user",
- RequestTester{"/trash", userToken, "PUT", badJSON},
+ RequestTester{"/trash", userToken, "PUT", badJSON, ""},
http.StatusUnauthorized,
"Unauthorized\n",
},
{
"Valid trash list from the data manager",
- RequestTester{"/trash", s.cluster.SystemRootToken, "PUT", goodJSON},
+ RequestTester{"/trash", s.cluster.SystemRootToken, "PUT", goodJSON, ""},
http.StatusOK,
"Received 3 trash requests\n",
},
{
"Invalid trash list from the data manager",
- RequestTester{"/trash", s.cluster.SystemRootToken, "PUT", badJSON},
+ RequestTester{"/trash", s.cluster.SystemRootToken, "PUT", badJSON, ""},
http.StatusBadRequest,
"",
},
if rt.apiToken != "" {
req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
}
+ if rt.storageClasses != "" {
+ req.Header.Set("X-Keep-Storage-Classes", rt.storageClasses)
+ }
handler.ServeHTTP(response, req)
return response
}
}
}
+func (s *HandlerSuite) TestPutStorageClasses(c *check.C) {
+ s.cluster.Volumes = map[string]arvados.Volume{
+ "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "mock"}, // "default" is implicit
+ "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "mock", StorageClasses: map[string]bool{"special": true, "extra": true}},
+ "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "mock", StorageClasses: map[string]bool{"readonly": true}, ReadOnly: true},
+ }
+ c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
+ rt := RequestTester{
+ method: "PUT",
+ uri: "/" + TestHash,
+ requestBody: TestBlock,
+ }
+ for _, trial := range []struct {
+ ask string
+ expect string
+ }{
+ {"", ""},
+ {"default", "default=1"},
+ {"special", "extra=1, special=1"},
+ {"extra, special", "extra=1, special=1"},
+ {"default, special", "default=1, extra=1, special=1"},
+ } {
+ c.Logf("%#v", trial)
+ rt.storageClasses = trial.ask
+ resp := IssueRequest(s.handler, &rt)
+ if trial.expect == "" {
+ // any non-empty value is correct
+ c.Check(resp.Header().Get("X-Keep-Storage-Classes-Confirmed"), check.Not(check.Equals), "")
+ } else {
+ c.Check(sortCommaSeparated(resp.Header().Get("X-Keep-Storage-Classes-Confirmed")), check.Equals, trial.expect)
+ }
+ }
+}
+
+func sortCommaSeparated(s string) string {
+ slice := strings.Split(s, ", ")
+ sort.Strings(slice)
+ return strings.Join(slice, ", ")
+}
+
func (s *HandlerSuite) TestPutResponseHeader(c *check.C) {
c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
return
}
+ var wantStorageClasses []string
+ if hdr := req.Header.Get("X-Keep-Storage-Classes"); hdr != "" {
+ wantStorageClasses = strings.Split(hdr, ",")
+ for i, sc := range wantStorageClasses {
+ wantStorageClasses[i] = strings.TrimSpace(sc)
+ }
+ }
+
buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
if err != nil {
http.Error(resp, err.Error(), http.StatusServiceUnavailable)
return
}
- result, err := PutBlock(ctx, rtr.volmgr, buf, hash)
+ result, err := PutBlock(ctx, rtr.volmgr, buf, hash, wantStorageClasses)
bufs.Put(buf)
if err != nil {
}
type putResult struct {
+ classTodo map[string]bool
+ mountUsed map[*VolumeMount]bool
totalReplication int
- classReplication map[string]int
+ classDone map[string]int
}
// Number of distinct replicas stored. "2" can mean the block was
// "default=2; special=1".
func (pr putResult) ClassReplication() string {
s := ""
- for k, v := range pr.classReplication {
+ for k, v := range pr.classDone {
if len(s) > 0 {
s += ", "
}
return s
}
-func newPutResult(mnt *VolumeMount) putResult {
- result := putResult{
- totalReplication: mnt.Replication,
- classReplication: map[string]int{},
+func (pr *putResult) Add(mnt *VolumeMount) {
+ if pr.mountUsed[mnt] {
+ logrus.Warnf("BUG? superfluous extra write to mount %s", mnt)
+ return
+ }
+ pr.mountUsed[mnt] = true
+ pr.totalReplication += mnt.Replication
+ for class := range mnt.StorageClasses {
+ pr.classDone[class] += mnt.Replication
+ delete(pr.classTodo, class)
+ }
+}
+
+func (pr *putResult) Done() bool {
+ return len(pr.classTodo) == 0 && pr.totalReplication > 0
+}
+
+func (pr *putResult) Want(mnt *VolumeMount) bool {
+ if pr.Done() || pr.mountUsed[mnt] {
+ return false
+ }
+ if len(pr.classTodo) == 0 {
+ // none specified == "any"
+ return true
}
for class := range mnt.StorageClasses {
- result.classReplication[class] += mnt.Replication
+ if pr.classTodo[class] {
+ return true
+ }
+ }
+ return false
+}
+
+func newPutResult(classes []string) putResult {
+ pr := putResult{
+ classTodo: make(map[string]bool, len(classes)),
+ classDone: map[string]int{},
+ mountUsed: map[*VolumeMount]bool{},
+ }
+ for _, c := range classes {
+ if c != "" {
+ pr.classTodo[c] = true
+ }
}
- return result
+ return pr
}
// PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
// all writes failed). The text of the error message should
// provide as much detail as possible.
//
-func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (putResult, error) {
+func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putResult, error) {
log := ctxlog.FromContext(ctx)
// Check that BLOCK's checksum matches HASH.
return putResult{}, RequestHashError
}
+ result := newPutResult(wantStorageClasses)
+
// If we already have this data, it's intact on disk, and we
// can update its timestamp, return success. If we have
// different data with the same hash, return failure.
- if result, err := CompareAndTouch(ctx, volmgr, hash, block); err == nil || err == CollisionError {
+ if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil {
return result, err
- } else if ctx.Err() != nil {
- return putResult{}, ErrClientDisconnect
+ }
+ if ctx.Err() != nil {
+ return result, ErrClientDisconnect
}
// Choose a Keep volume to write to.
// If this volume fails, try all of the volumes in order.
- if mnt := volmgr.NextWritable(); mnt != nil {
- if err := mnt.Put(ctx, hash, block); err != nil {
- log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
- } else {
- return newPutResult(mnt), nil // success!
+ if mnt := volmgr.NextWritable(); mnt == nil || !result.Want(mnt) {
+ // fall through to "try all volumes" below
+ } else if err := mnt.Put(ctx, hash, block); err != nil {
+ log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
+ } else {
+ result.Add(mnt)
+ if result.Done() {
+ return result, nil
}
}
if ctx.Err() != nil {
allFull := true
for _, mnt := range writables {
+ if !result.Want(mnt) {
+ continue
+ }
err := mnt.Put(ctx, hash, block)
if ctx.Err() != nil {
- return putResult{}, ErrClientDisconnect
+ return result, ErrClientDisconnect
}
switch err {
case nil:
- return newPutResult(mnt), nil // success!
+ result.Add(mnt)
+ if result.Done() {
+ return result, nil
+ }
+ continue
case FullError:
continue
default:
}
}
- if allFull {
- log.Error("all volumes are full")
+ if result.totalReplication > 0 {
+ // Some, but not all, of the storage classes were
+ // satisfied. This qualifies as success.
+ return result, nil
+ } else if allFull {
+ log.Error("all volumes with qualifying storage classes are full")
return putResult{}, FullError
+ } else {
+ // Already logged the non-full errors.
+ return putResult{}, GenericError
}
- // Already logged the non-full errors.
- return putResult{}, GenericError
}
-// CompareAndTouch returns the current replication level if one of the
-// volumes already has the given content and it successfully updates
-// the relevant block's modification time in order to protect it from
-// premature garbage collection. Otherwise, it returns a non-nil
-// error.
-func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (putResult, error) {
+// CompareAndTouch looks for volumes where the given content already
+// exists and its modification time can be updated (i.e., it is
+// protected from garbage collection), and updates result accordingly.
+// It returns when the result is Done() or all volumes have been
+// checked.
+func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, result *putResult) error {
log := ctxlog.FromContext(ctx)
- var bestErr error = NotFoundError
for _, mnt := range volmgr.AllWritable() {
+ if !result.Want(mnt) {
+ continue
+ }
err := mnt.Compare(ctx, hash, buf)
if ctx.Err() != nil {
- return putResult{}, ctx.Err()
+ return nil
} else if err == CollisionError {
// Stop if we have a block with same hash but
// different content. (It will be impossible
// both, so there's no point writing it even
// on a different volume.)
log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
- return putResult{}, err
+ return CollisionError
} else if os.IsNotExist(err) {
// Block does not exist. This is the only
// "normal" error: we don't log anything.
}
if err := mnt.Touch(hash); err != nil {
log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
- bestErr = err
continue
}
// Compare and Touch both worked --> done.
- return newPutResult(mnt), nil
+ result.Add(mnt)
+ if result.Done() {
+ return nil
+ }
}
- return putResult{}, bestErr
+ return nil
}
var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
rrc.ResponseWriter.Write(rrc.Buffer)
return nil
}
- _, err := PutBlock(rrc.Context, rrc.VolumeManager, rrc.Buffer, rrc.Locator[:32])
+ _, err := PutBlock(rrc.Context, rrc.VolumeManager, rrc.Buffer, rrc.Locator[:32], nil)
if rrc.Context.Err() != nil {
// If caller hung up, log that instead of subsequent/misleading errors.
http.Error(rrc.ResponseWriter, rrc.Context.Err().Error(), http.StatusGatewayTimeout)
if volume != nil {
return volume.Put(context.Background(), locator, data)
}
- _, err := PutBlock(context.Background(), volmgr, data, locator)
+ _, err := PutBlock(context.Background(), volmgr, data, locator, nil)
return err
}
func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
testData := PullWorkerTestData{
name: "TestPullWorkerPullList_with_two_locators",
- req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList},
+ req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList, ""},
responseCode: http.StatusOK,
responseBody: "Received 2 pull requests\n",
readContent: "hello",
func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
testData := PullWorkerTestData{
name: "TestPullWorkerPullList_with_one_locator",
- req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList},
+ req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList, ""},
responseCode: http.StatusOK,
responseBody: "Received 1 pull requests\n",
readContent: "hola",
func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
testData := PullWorkerTestData{
name: "TestPullWorker_error_on_get_one_locator",
- req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList},
+ req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList, ""},
responseCode: http.StatusOK,
responseBody: "Received 1 pull requests\n",
readContent: "unused",
func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
testData := PullWorkerTestData{
name: "TestPullWorker_error_on_get_two_locators",
- req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList},
+ req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList, ""},
responseCode: http.StatusOK,
responseBody: "Received 2 pull requests\n",
readContent: "unused",
func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
testData := PullWorkerTestData{
name: "TestPullWorker_error_on_put_one_locator",
- req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList},
+ req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList, ""},
responseCode: http.StatusOK,
responseBody: "Received 1 pull requests\n",
readContent: "hello hello",
func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
testData := PullWorkerTestData{
name: "TestPullWorker_error_on_put_two_locators",
- req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList},
+ req: RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList, ""},
responseCode: http.StatusOK,
responseBody: "Received 2 pull requests\n",
readContent: "hello again",
func (s *PullWorkerTestSuite) TestPullWorker_invalidToken(c *C) {
testData := PullWorkerTestData{
name: "TestPullWorkerPullList_with_two_locators",
- req: RequestTester{"/pull", "invalidToken", "PUT", firstPullList},
+ req: RequestTester{"/pull", "invalidToken", "PUT", firstPullList, ""},
responseCode: http.StatusUnauthorized,
responseBody: "Unauthorized\n",
readContent: "hello",
// getStatusItem("foo","bar","baz") retrieves /status.json, decodes
// the response body into resp, and returns resp["foo"]["bar"]["baz"].
func getStatusItem(h *handler, keys ...string) interface{} {
- resp := IssueRequest(h, &RequestTester{"/status.json", "", "GET", nil})
+ resp := IssueRequest(h, &RequestTester{"/status.json", "", "GET", nil, ""})
var s interface{}
json.NewDecoder(resp.Body).Decode(&s)
for _, k := range keys {
}
return nil
} else {
- return NotFoundError
+ return os.ErrNotExist
}
}