Merge branch 'master' into 6260-test-datamanager
authorradhika <radhika@curoverse.com>
Mon, 14 Sep 2015 19:23:21 +0000 (15:23 -0400)
committerradhika <radhika@curoverse.com>
Mon, 14 Sep 2015 19:23:21 +0000 (15:23 -0400)
Conflicts:
services/keepstore/keepstore.go

25 files changed:
services/arv-git-httpd/server_test.go
services/keepstore/bufferpool_test.go
services/keepstore/collision.go [new file with mode: 0644]
services/keepstore/collision_test.go [new file with mode: 0644]
services/keepstore/handler_test.go
services/keepstore/handlers.go
services/keepstore/keepstore.go
services/keepstore/keepstore_test.go
services/keepstore/logging_router.go
services/keepstore/mock_mutex_for_test.go [new file with mode: 0644]
services/keepstore/perms.go
services/keepstore/perms_test.go
services/keepstore/pull_worker.go
services/keepstore/pull_worker_integration_test.go
services/keepstore/pull_worker_test.go
services/keepstore/trash_worker.go
services/keepstore/trash_worker_test.go
services/keepstore/volume.go
services/keepstore/volume_generic_test.go [new file with mode: 0644]
services/keepstore/volume_test.go
services/keepstore/volume_unix.go
services/keepstore/volume_unix_test.go
services/keepstore/work_queue.go
services/nodemanager/arvnodeman/computenode/driver/azure.py
services/nodemanager/setup.py

index 2d1aeb78491df27c9f85d3cbe612218038b6c39c..ea8dc045f9775839ac89cbf6078bcf448650c9a7 100644 (file)
@@ -66,7 +66,7 @@ func (s *GitSuite) TestNoPermission(c *check.C) {
 func (s *GitSuite) TestExpiredToken(c *check.C) {
        for _, repo := range []string{"active/foo.git", "active/foo/.git"} {
                err := s.RunGit(c, expiredToken, "fetch", repo)
-               c.Assert(err, check.ErrorMatches, `.* 500 while accessing.*`)
+               c.Assert(err, check.ErrorMatches, `.* (500 while accessing|requested URL returned error: 500).*`)
        }
 }
 
@@ -80,7 +80,7 @@ func (s *GitSuite) TestInvalidToken(c *check.C) {
 func (s *GitSuite) TestShortToken(c *check.C) {
        for _, repo := range []string{"active/foo.git", "active/foo/.git"} {
                err := s.RunGit(c, "s3cr3t", "fetch", repo)
-               c.Assert(err, check.ErrorMatches, `.* 500 while accessing.*`)
+               c.Assert(err, check.ErrorMatches, `.* (500 while accessing|requested URL returned error: 500).*`)
        }
 }
 
index 95d118e221de6b8516654f8a133b871da00c5cd2..8726a19150c7faf2f309cc6b3ba647d9aa40dd54 100644 (file)
@@ -18,12 +18,12 @@ type BufferPoolSuite struct{}
 // Initialize a default-sized buffer pool for the benefit of test
 // suites that don't run main().
 func init() {
-       bufs = newBufferPool(maxBuffers, BLOCKSIZE)
+       bufs = newBufferPool(maxBuffers, BlockSize)
 }
 
 // Restore sane default after bufferpool's own tests
 func (s *BufferPoolSuite) TearDownTest(c *C) {
-       bufs = newBufferPool(maxBuffers, BLOCKSIZE)
+       bufs = newBufferPool(maxBuffers, BlockSize)
 }
 
 func (s *BufferPoolSuite) TestBufferPoolBufSize(c *C) {
diff --git a/services/keepstore/collision.go b/services/keepstore/collision.go
new file mode 100644 (file)
index 0000000..210286a
--- /dev/null
@@ -0,0 +1,49 @@
+package main
+
+import (
+       "crypto/md5"
+       "fmt"
+       "io"
+)
+
+// Compute the MD5 digest of a data block (consisting of buf1 + buf2 +
+// all bytes readable from rdr). If all data is read successfully,
+// return DiskHashError or CollisionError depending on whether it
+// matches expectMD5. If an error occurs while reading, return that
+// error.
+//
+// "content has expected MD5" is called a collision because this
+// function is used in cases where we have another block in hand with
+// the given MD5 but different content.
+func collisionOrCorrupt(expectMD5 string, buf1, buf2 []byte, rdr io.Reader) error {
+       outcome := make(chan error)
+       data := make(chan []byte, 1)
+       go func() {
+               h := md5.New()
+               for b := range data {
+                       h.Write(b)
+               }
+               if fmt.Sprintf("%x", h.Sum(nil)) == expectMD5 {
+                       outcome <- CollisionError
+               } else {
+                       outcome <- DiskHashError
+               }
+       }()
+       data <- buf1
+       if buf2 != nil {
+               data <- buf2
+       }
+       var err error
+       for rdr != nil && err == nil {
+               buf := make([]byte, 1 << 18)
+               var n int
+               n, err = rdr.Read(buf)
+               data <- buf[:n]
+       }
+       close(data)
+       if rdr != nil && err != io.EOF {
+               <-outcome
+               return err
+       }
+       return <-outcome
+}
diff --git a/services/keepstore/collision_test.go b/services/keepstore/collision_test.go
new file mode 100644 (file)
index 0000000..379dadd
--- /dev/null
@@ -0,0 +1,53 @@
+package main
+
+import (
+       "bytes"
+       "testing"
+       "testing/iotest"
+
+       check "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+       check.TestingT(t)
+}
+
+var _ = check.Suite(&CollisionSuite{})
+
+type CollisionSuite struct{}
+
+func (s *CollisionSuite) TestCollisionOrCorrupt(c *check.C) {
+       fooMD5 := "acbd18db4cc2f85cedef654fccc4a4d8"
+
+       c.Check(collisionOrCorrupt(fooMD5, []byte{'f'}, []byte{'o'}, bytes.NewBufferString("o")),
+               check.Equals, CollisionError)
+       c.Check(collisionOrCorrupt(fooMD5, []byte{'f'}, nil, bytes.NewBufferString("oo")),
+               check.Equals, CollisionError)
+       c.Check(collisionOrCorrupt(fooMD5, []byte{'f'}, []byte{'o', 'o'}, nil),
+               check.Equals, CollisionError)
+       c.Check(collisionOrCorrupt(fooMD5, nil, []byte{}, bytes.NewBufferString("foo")),
+               check.Equals, CollisionError)
+       c.Check(collisionOrCorrupt(fooMD5, []byte{'f', 'o', 'o'}, nil, bytes.NewBufferString("")),
+               check.Equals, CollisionError)
+       c.Check(collisionOrCorrupt(fooMD5, nil, nil, iotest.NewReadLogger("foo: ", iotest.DataErrReader(iotest.OneByteReader(bytes.NewBufferString("foo"))))),
+               check.Equals, CollisionError)
+
+       c.Check(collisionOrCorrupt(fooMD5, []byte{'f', 'o', 'o'}, nil, bytes.NewBufferString("bar")),
+               check.Equals, DiskHashError)
+       c.Check(collisionOrCorrupt(fooMD5, []byte{'f', 'o'}, nil, nil),
+               check.Equals, DiskHashError)
+       c.Check(collisionOrCorrupt(fooMD5, []byte{}, nil, bytes.NewBufferString("")),
+               check.Equals, DiskHashError)
+       c.Check(collisionOrCorrupt(fooMD5, []byte{'f', 'O'}, nil, bytes.NewBufferString("o")),
+               check.Equals, DiskHashError)
+       c.Check(collisionOrCorrupt(fooMD5, []byte{'f', 'O', 'o'}, nil, nil),
+               check.Equals, DiskHashError)
+       c.Check(collisionOrCorrupt(fooMD5, []byte{'f', 'o'}, []byte{'O'}, nil),
+               check.Equals, DiskHashError)
+       c.Check(collisionOrCorrupt(fooMD5, []byte{'f', 'o'}, nil, bytes.NewBufferString("O")),
+               check.Equals, DiskHashError)
+
+       c.Check(collisionOrCorrupt(fooMD5, []byte{}, nil, iotest.TimeoutReader(iotest.OneByteReader(bytes.NewBufferString("foo")))),
+               check.Equals, iotest.ErrTimeout)
+}
index 8be471025db5119fb25510d2305fdaaa5cbb7257..4ea4329bd32157329478a413516b39c8878e70ba 100644 (file)
@@ -25,10 +25,10 @@ import (
 // A RequestTester represents the parameters for an HTTP request to
 // be issued on behalf of a unit test.
 type RequestTester struct {
-       uri          string
-       api_token    string
-       method       string
-       request_body []byte
+       uri         string
+       apiToken    string
+       method      string
+       requestBody []byte
 }
 
 // Test GetBlockHandler on the following situations:
@@ -46,76 +46,76 @@ func TestGetHandler(t *testing.T) {
        defer KeepVM.Close()
 
        vols := KeepVM.AllWritable()
-       if err := vols[0].Put(TEST_HASH, TEST_BLOCK); err != nil {
+       if err := vols[0].Put(TestHash, TestBlock); err != nil {
                t.Error(err)
        }
 
        // Create locators for testing.
        // Turn on permission settings so we can generate signed locators.
-       enforce_permissions = true
-       PermissionSecret = []byte(known_key)
+       enforcePermissions = true
+       PermissionSecret = []byte(knownKey)
        blob_signature_ttl = 300 * time.Second
 
        var (
-               unsigned_locator  = "/" + TEST_HASH
-               valid_timestamp   = time.Now().Add(blob_signature_ttl)
-               expired_timestamp = time.Now().Add(-time.Hour)
-               signed_locator    = "/" + SignLocator(TEST_HASH, known_token, valid_timestamp)
-               expired_locator   = "/" + SignLocator(TEST_HASH, known_token, expired_timestamp)
+               unsignedLocator  = "/" + TestHash
+               validTimestamp   = time.Now().Add(blob_signature_ttl)
+               expiredTimestamp = time.Now().Add(-time.Hour)
+               signedLocator    = "/" + SignLocator(TestHash, knownToken, validTimestamp)
+               expiredLocator   = "/" + SignLocator(TestHash, knownToken, expiredTimestamp)
        )
 
        // -----------------
        // Test unauthenticated request with permissions off.
-       enforce_permissions = false
+       enforcePermissions = false
 
        // Unauthenticated request, unsigned locator
        // => OK
        response := IssueRequest(
                &RequestTester{
                        method: "GET",
-                       uri:    unsigned_locator,
+                       uri:    unsignedLocator,
                })
        ExpectStatusCode(t,
                "Unauthenticated request, unsigned locator", http.StatusOK, response)
        ExpectBody(t,
                "Unauthenticated request, unsigned locator",
-               string(TEST_BLOCK),
+               string(TestBlock),
                response)
 
-       received_cl := response.Header().Get("Content-Length")
-       expected_cl := fmt.Sprintf("%d", len(TEST_BLOCK))
-       if received_cl != expected_cl {
-               t.Errorf("expected Content-Length %s, got %s", expected_cl, received_cl)
+       receivedLen := response.Header().Get("Content-Length")
+       expectedLen := fmt.Sprintf("%d", len(TestBlock))
+       if receivedLen != expectedLen {
+               t.Errorf("expected Content-Length %s, got %s", expectedLen, receivedLen)
        }
 
        // ----------------
        // Permissions: on.
-       enforce_permissions = true
+       enforcePermissions = true
 
        // Authenticated request, signed locator
        // => OK
        response = IssueRequest(&RequestTester{
-               method:    "GET",
-               uri:       signed_locator,
-               api_token: known_token,
+               method:   "GET",
+               uri:      signedLocator,
+               apiToken: knownToken,
        })
        ExpectStatusCode(t,
                "Authenticated request, signed locator", http.StatusOK, response)
        ExpectBody(t,
-               "Authenticated request, signed locator", string(TEST_BLOCK), response)
+               "Authenticated request, signed locator", string(TestBlock), response)
 
-       received_cl = response.Header().Get("Content-Length")
-       expected_cl = fmt.Sprintf("%d", len(TEST_BLOCK))
-       if received_cl != expected_cl {
-               t.Errorf("expected Content-Length %s, got %s", expected_cl, received_cl)
+       receivedLen = response.Header().Get("Content-Length")
+       expectedLen = fmt.Sprintf("%d", len(TestBlock))
+       if receivedLen != expectedLen {
+               t.Errorf("expected Content-Length %s, got %s", expectedLen, receivedLen)
        }
 
        // Authenticated request, unsigned locator
        // => PermissionError
        response = IssueRequest(&RequestTester{
-               method:    "GET",
-               uri:       unsigned_locator,
-               api_token: known_token,
+               method:   "GET",
+               uri:      unsignedLocator,
+               apiToken: knownToken,
        })
        ExpectStatusCode(t, "unsigned locator", PermissionError.HTTPCode, response)
 
@@ -123,7 +123,7 @@ func TestGetHandler(t *testing.T) {
        // => PermissionError
        response = IssueRequest(&RequestTester{
                method: "GET",
-               uri:    signed_locator,
+               uri:    signedLocator,
        })
        ExpectStatusCode(t,
                "Unauthenticated request, signed locator",
@@ -132,9 +132,9 @@ func TestGetHandler(t *testing.T) {
        // Authenticated request, expired locator
        // => ExpiredError
        response = IssueRequest(&RequestTester{
-               method:    "GET",
-               uri:       expired_locator,
-               api_token: known_token,
+               method:   "GET",
+               uri:      expiredLocator,
+               apiToken: knownToken,
        })
        ExpectStatusCode(t,
                "Authenticated request, expired locator",
@@ -158,24 +158,24 @@ func TestPutHandler(t *testing.T) {
 
        // Unauthenticated request, no server key
        // => OK (unsigned response)
-       unsigned_locator := "/" + TEST_HASH
+       unsignedLocator := "/" + TestHash
        response := IssueRequest(
                &RequestTester{
-                       method:       "PUT",
-                       uri:          unsigned_locator,
-                       request_body: TEST_BLOCK,
+                       method:      "PUT",
+                       uri:         unsignedLocator,
+                       requestBody: TestBlock,
                })
 
        ExpectStatusCode(t,
                "Unauthenticated request, no server key", http.StatusOK, response)
        ExpectBody(t,
                "Unauthenticated request, no server key",
-               TEST_HASH_PUT_RESPONSE, response)
+               TestHashPutResp, response)
 
        // ------------------
        // With a server key.
 
-       PermissionSecret = []byte(known_key)
+       PermissionSecret = []byte(knownKey)
        blob_signature_ttl = 300 * time.Second
 
        // When a permission key is available, the locator returned
@@ -185,29 +185,29 @@ func TestPutHandler(t *testing.T) {
        // => OK (signed response)
        response = IssueRequest(
                &RequestTester{
-                       method:       "PUT",
-                       uri:          unsigned_locator,
-                       request_body: TEST_BLOCK,
-                       api_token:    known_token,
+                       method:      "PUT",
+                       uri:         unsignedLocator,
+                       requestBody: TestBlock,
+                       apiToken:    knownToken,
                })
 
        ExpectStatusCode(t,
                "Authenticated PUT, signed locator, with server key",
                http.StatusOK, response)
-       response_locator := strings.TrimSpace(response.Body.String())
-       if VerifySignature(response_locator, known_token) != nil {
+       responseLocator := strings.TrimSpace(response.Body.String())
+       if VerifySignature(responseLocator, knownToken) != nil {
                t.Errorf("Authenticated PUT, signed locator, with server key:\n"+
                        "response '%s' does not contain a valid signature",
-                       response_locator)
+                       responseLocator)
        }
 
        // Unauthenticated PUT, unsigned locator
        // => OK
        response = IssueRequest(
                &RequestTester{
-                       method:       "PUT",
-                       uri:          unsigned_locator,
-                       request_body: TEST_BLOCK,
+                       method:      "PUT",
+                       uri:         unsignedLocator,
+                       requestBody: TestBlock,
                })
 
        ExpectStatusCode(t,
@@ -215,7 +215,7 @@ func TestPutHandler(t *testing.T) {
                http.StatusOK, response)
        ExpectBody(t,
                "Unauthenticated PUT, unsigned locator, with server key",
-               TEST_HASH_PUT_RESPONSE, response)
+               TestHashPutResp, response)
 }
 
 func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
@@ -227,17 +227,20 @@ func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
        defer KeepVM.Close()
        IssueRequest(
                &RequestTester{
-                       method:       "PUT",
-                       uri:          "/" + TEST_HASH,
-                       request_body: TEST_BLOCK,
+                       method:      "PUT",
+                       uri:         "/" + TestHash,
+                       requestBody: TestBlock,
                })
+       defer func(orig bool) {
+               never_delete = orig
+       }(never_delete)
        never_delete = false
        IssueRequest(
                &RequestTester{
-                       method:       "DELETE",
-                       uri:          "/" + TEST_HASH,
-                       request_body: TEST_BLOCK,
-                       api_token:    data_manager_token,
+                       method:      "DELETE",
+                       uri:         "/" + TestHash,
+                       requestBody: TestBlock,
+                       apiToken:    data_manager_token,
                })
        type expect struct {
                volnum    int
@@ -246,10 +249,13 @@ func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
        }
        for _, e := range []expect{
                {0, "Get", 0},
+               {0, "Compare", 0},
                {0, "Touch", 0},
                {0, "Put", 0},
                {0, "Delete", 0},
-               {1, "Get", 1},
+               {1, "Get", 0},
+               {1, "Compare", 1},
+               {1, "Touch", 1},
                {1, "Put", 1},
                {1, "Delete", 1},
        } {
@@ -268,7 +274,7 @@ func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
 //   - authenticated   /index/prefix request | superuser
 //
 // The only /index requests that should succeed are those issued by the
-// superuser. They should pass regardless of the value of enforce_permissions.
+// superuser. They should pass regardless of the value of enforcePermissions.
 //
 func TestIndexHandler(t *testing.T) {
        defer teardown()
@@ -280,61 +286,61 @@ func TestIndexHandler(t *testing.T) {
        defer KeepVM.Close()
 
        vols := KeepVM.AllWritable()
-       vols[0].Put(TEST_HASH, TEST_BLOCK)
-       vols[1].Put(TEST_HASH_2, TEST_BLOCK_2)
-       vols[0].Put(TEST_HASH+".meta", []byte("metadata"))
-       vols[1].Put(TEST_HASH_2+".meta", []byte("metadata"))
+       vols[0].Put(TestHash, TestBlock)
+       vols[1].Put(TestHash2, TestBlock2)
+       vols[0].Put(TestHash+".meta", []byte("metadata"))
+       vols[1].Put(TestHash2+".meta", []byte("metadata"))
 
        data_manager_token = "DATA MANAGER TOKEN"
 
-       unauthenticated_req := &RequestTester{
+       unauthenticatedReq := &RequestTester{
                method: "GET",
                uri:    "/index",
        }
-       authenticated_req := &RequestTester{
-               method:    "GET",
-               uri:       "/index",
-               api_token: known_token,
+       authenticatedReq := &RequestTester{
+               method:   "GET",
+               uri:      "/index",
+               apiToken: knownToken,
        }
-       superuser_req := &RequestTester{
-               method:    "GET",
-               uri:       "/index",
-               api_token: data_manager_token,
+       superuserReq := &RequestTester{
+               method:   "GET",
+               uri:      "/index",
+               apiToken: data_manager_token,
        }
-       unauth_prefix_req := &RequestTester{
+       unauthPrefixReq := &RequestTester{
                method: "GET",
-               uri:    "/index/" + TEST_HASH[0:3],
+               uri:    "/index/" + TestHash[0:3],
        }
-       auth_prefix_req := &RequestTester{
-               method:    "GET",
-               uri:       "/index/" + TEST_HASH[0:3],
-               api_token: known_token,
+       authPrefixReq := &RequestTester{
+               method:   "GET",
+               uri:      "/index/" + TestHash[0:3],
+               apiToken: knownToken,
        }
-       superuser_prefix_req := &RequestTester{
-               method:    "GET",
-               uri:       "/index/" + TEST_HASH[0:3],
-               api_token: data_manager_token,
+       superuserPrefixReq := &RequestTester{
+               method:   "GET",
+               uri:      "/index/" + TestHash[0:3],
+               apiToken: data_manager_token,
        }
 
        // -------------------------------------------------------------
        // Only the superuser should be allowed to issue /index requests.
 
        // ---------------------------
-       // enforce_permissions enabled
+       // enforcePermissions enabled
        // This setting should not affect tests passing.
-       enforce_permissions = true
+       enforcePermissions = true
 
        // unauthenticated /index request
        // => UnauthorizedError
-       response := IssueRequest(unauthenticated_req)
+       response := IssueRequest(unauthenticatedReq)
        ExpectStatusCode(t,
-               "enforce_permissions on, unauthenticated request",
+               "enforcePermissions on, unauthenticated request",
                UnauthorizedError.HTTPCode,
                response)
 
        // unauthenticated /index/prefix request
        // => UnauthorizedError
-       response = IssueRequest(unauth_prefix_req)
+       response = IssueRequest(unauthPrefixReq)
        ExpectStatusCode(t,
                "permissions on, unauthenticated /index/prefix request",
                UnauthorizedError.HTTPCode,
@@ -342,7 +348,7 @@ func TestIndexHandler(t *testing.T) {
 
        // authenticated /index request, non-superuser
        // => UnauthorizedError
-       response = IssueRequest(authenticated_req)
+       response = IssueRequest(authenticatedReq)
        ExpectStatusCode(t,
                "permissions on, authenticated request, non-superuser",
                UnauthorizedError.HTTPCode,
@@ -350,7 +356,7 @@ func TestIndexHandler(t *testing.T) {
 
        // authenticated /index/prefix request, non-superuser
        // => UnauthorizedError
-       response = IssueRequest(auth_prefix_req)
+       response = IssueRequest(authPrefixReq)
        ExpectStatusCode(t,
                "permissions on, authenticated /index/prefix request, non-superuser",
                UnauthorizedError.HTTPCode,
@@ -358,27 +364,27 @@ func TestIndexHandler(t *testing.T) {
 
        // superuser /index request
        // => OK
-       response = IssueRequest(superuser_req)
+       response = IssueRequest(superuserReq)
        ExpectStatusCode(t,
                "permissions on, superuser request",
                http.StatusOK,
                response)
 
        // ----------------------------
-       // enforce_permissions disabled
+       // enforcePermissions disabled
        // Valid Request should still pass.
-       enforce_permissions = false
+       enforcePermissions = false
 
        // superuser /index request
        // => OK
-       response = IssueRequest(superuser_req)
+       response = IssueRequest(superuserReq)
        ExpectStatusCode(t,
                "permissions on, superuser request",
                http.StatusOK,
                response)
 
-       expected := `^` + TEST_HASH + `\+\d+ \d+\n` +
-               TEST_HASH_2 + `\+\d+ \d+\n\n$`
+       expected := `^` + TestHash + `\+\d+ \d+\n` +
+               TestHash2 + `\+\d+ \d+\n\n$`
        match, _ := regexp.MatchString(expected, response.Body.String())
        if !match {
                t.Errorf(
@@ -388,13 +394,13 @@ func TestIndexHandler(t *testing.T) {
 
        // superuser /index/prefix request
        // => OK
-       response = IssueRequest(superuser_prefix_req)
+       response = IssueRequest(superuserPrefixReq)
        ExpectStatusCode(t,
                "permissions on, superuser request",
                http.StatusOK,
                response)
 
-       expected = `^` + TEST_HASH + `\+\d+ \d+\n\n$`
+       expected = `^` + TestHash + `\+\d+ \d+\n\n$`
        match, _ = regexp.MatchString(expected, response.Body.String())
        if !match {
                t.Errorf(
@@ -439,51 +445,51 @@ func TestDeleteHandler(t *testing.T) {
        defer KeepVM.Close()
 
        vols := KeepVM.AllWritable()
-       vols[0].Put(TEST_HASH, TEST_BLOCK)
+       vols[0].Put(TestHash, TestBlock)
 
        // Explicitly set the blob_signature_ttl to 0 for these
        // tests, to ensure the MockVolume deletes the blocks
        // even though they have just been created.
        blob_signature_ttl = time.Duration(0)
 
-       var user_token = "NOT DATA MANAGER TOKEN"
+       var userToken = "NOT DATA MANAGER TOKEN"
        data_manager_token = "DATA MANAGER TOKEN"
 
        never_delete = false
 
-       unauth_req := &RequestTester{
+       unauthReq := &RequestTester{
                method: "DELETE",
-               uri:    "/" + TEST_HASH,
+               uri:    "/" + TestHash,
        }
 
-       user_req := &RequestTester{
-               method:    "DELETE",
-               uri:       "/" + TEST_HASH,
-               api_token: user_token,
+       userReq := &RequestTester{
+               method:   "DELETE",
+               uri:      "/" + TestHash,
+               apiToken: userToken,
        }
 
-       superuser_existing_block_req := &RequestTester{
-               method:    "DELETE",
-               uri:       "/" + TEST_HASH,
-               api_token: data_manager_token,
+       superuserExistingBlockReq := &RequestTester{
+               method:   "DELETE",
+               uri:      "/" + TestHash,
+               apiToken: data_manager_token,
        }
 
-       superuser_nonexistent_block_req := &RequestTester{
-               method:    "DELETE",
-               uri:       "/" + TEST_HASH_2,
-               api_token: data_manager_token,
+       superuserNonexistentBlockReq := &RequestTester{
+               method:   "DELETE",
+               uri:      "/" + TestHash2,
+               apiToken: data_manager_token,
        }
 
        // Unauthenticated request returns PermissionError.
        var response *httptest.ResponseRecorder
-       response = IssueRequest(unauth_req)
+       response = IssueRequest(unauthReq)
        ExpectStatusCode(t,
                "unauthenticated request",
                PermissionError.HTTPCode,
                response)
 
        // Authenticated non-admin request returns PermissionError.
-       response = IssueRequest(user_req)
+       response = IssueRequest(userReq)
        ExpectStatusCode(t,
                "authenticated non-admin request",
                PermissionError.HTTPCode,
@@ -494,9 +500,9 @@ func TestDeleteHandler(t *testing.T) {
                Deleted int `json:"copies_deleted"`
                Failed  int `json:"copies_failed"`
        }
-       var response_dc, expected_dc deletecounter
+       var responseDc, expectedDc deletecounter
 
-       response = IssueRequest(superuser_nonexistent_block_req)
+       response = IssueRequest(superuserNonexistentBlockReq)
        ExpectStatusCode(t,
                "data manager request, nonexistent block",
                http.StatusNotFound,
@@ -504,7 +510,7 @@ func TestDeleteHandler(t *testing.T) {
 
        // Authenticated admin request for existing block while never_delete is set.
        never_delete = true
-       response = IssueRequest(superuser_existing_block_req)
+       response = IssueRequest(superuserExistingBlockReq)
        ExpectStatusCode(t,
                "authenticated request, existing block, method disabled",
                MethodDisabledError.HTTPCode,
@@ -512,44 +518,44 @@ func TestDeleteHandler(t *testing.T) {
        never_delete = false
 
        // Authenticated admin request for existing block.
-       response = IssueRequest(superuser_existing_block_req)
+       response = IssueRequest(superuserExistingBlockReq)
        ExpectStatusCode(t,
                "data manager request, existing block",
                http.StatusOK,
                response)
        // Expect response {"copies_deleted":1,"copies_failed":0}
-       expected_dc = deletecounter{1, 0}
-       json.NewDecoder(response.Body).Decode(&response_dc)
-       if response_dc != expected_dc {
-               t.Errorf("superuser_existing_block_req\nexpected: %+v\nreceived: %+v",
-                       expected_dc, response_dc)
+       expectedDc = deletecounter{1, 0}
+       json.NewDecoder(response.Body).Decode(&responseDc)
+       if responseDc != expectedDc {
+               t.Errorf("superuserExistingBlockReq\nexpected: %+v\nreceived: %+v",
+                       expectedDc, responseDc)
        }
        // Confirm the block has been deleted
-       _, err := vols[0].Get(TEST_HASH)
-       var block_deleted = os.IsNotExist(err)
-       if !block_deleted {
-               t.Error("superuser_existing_block_req: block not deleted")
+       _, err := vols[0].Get(TestHash)
+       var blockDeleted = os.IsNotExist(err)
+       if !blockDeleted {
+               t.Error("superuserExistingBlockReq: block not deleted")
        }
 
        // A DELETE request on a block newer than blob_signature_ttl
        // should return success but leave the block on the volume.
-       vols[0].Put(TEST_HASH, TEST_BLOCK)
+       vols[0].Put(TestHash, TestBlock)
        blob_signature_ttl = time.Hour
 
-       response = IssueRequest(superuser_existing_block_req)
+       response = IssueRequest(superuserExistingBlockReq)
        ExpectStatusCode(t,
                "data manager request, existing block",
                http.StatusOK,
                response)
        // Expect response {"copies_deleted":1,"copies_failed":0}
-       expected_dc = deletecounter{1, 0}
-       json.NewDecoder(response.Body).Decode(&response_dc)
-       if response_dc != expected_dc {
-               t.Errorf("superuser_existing_block_req\nexpected: %+v\nreceived: %+v",
-                       expected_dc, response_dc)
+       expectedDc = deletecounter{1, 0}
+       json.NewDecoder(response.Body).Decode(&responseDc)
+       if responseDc != expectedDc {
+               t.Errorf("superuserExistingBlockReq\nexpected: %+v\nreceived: %+v",
+                       expectedDc, responseDc)
        }
        // Confirm the block has NOT been deleted.
-       _, err = vols[0].Get(TEST_HASH)
+       _, err = vols[0].Get(TestHash)
        if err != nil {
                t.Errorf("testing delete on new block: %s\n", err)
        }
@@ -585,12 +591,12 @@ func TestDeleteHandler(t *testing.T) {
 func TestPullHandler(t *testing.T) {
        defer teardown()
 
-       var user_token = "USER TOKEN"
+       var userToken = "USER TOKEN"
        data_manager_token = "DATA MANAGER TOKEN"
 
        pullq = NewWorkQueue()
 
-       good_json := []byte(`[
+       goodJSON := []byte(`[
                {
                        "locator":"locator_with_two_servers",
                        "servers":[
@@ -608,36 +614,36 @@ func TestPullHandler(t *testing.T) {
                }
        ]`)
 
-       bad_json := []byte(`{ "key":"I'm a little teapot" }`)
+       badJSON := []byte(`{ "key":"I'm a little teapot" }`)
 
        type pullTest struct {
-               name          string
-               req           RequestTester
-               response_code int
-               response_body string
+               name         string
+               req          RequestTester
+               responseCode int
+               responseBody string
        }
        var testcases = []pullTest{
                {
                        "Valid pull list from an ordinary user",
-                       RequestTester{"/pull", user_token, "PUT", good_json},
+                       RequestTester{"/pull", userToken, "PUT", goodJSON},
                        http.StatusUnauthorized,
                        "Unauthorized\n",
                },
                {
                        "Invalid pull request from an ordinary user",
-                       RequestTester{"/pull", user_token, "PUT", bad_json},
+                       RequestTester{"/pull", userToken, "PUT", badJSON},
                        http.StatusUnauthorized,
                        "Unauthorized\n",
                },
                {
                        "Valid pull request from the data manager",
-                       RequestTester{"/pull", data_manager_token, "PUT", good_json},
+                       RequestTester{"/pull", data_manager_token, "PUT", goodJSON},
                        http.StatusOK,
                        "Received 3 pull requests\n",
                },
                {
                        "Invalid pull request from the data manager",
-                       RequestTester{"/pull", data_manager_token, "PUT", bad_json},
+                       RequestTester{"/pull", data_manager_token, "PUT", badJSON},
                        http.StatusBadRequest,
                        "",
                },
@@ -645,8 +651,8 @@ func TestPullHandler(t *testing.T) {
 
        for _, tst := range testcases {
                response := IssueRequest(&tst.req)
-               ExpectStatusCode(t, tst.name, tst.response_code, response)
-               ExpectBody(t, tst.name, tst.response_body, response)
+               ExpectStatusCode(t, tst.name, tst.responseCode, response)
+               ExpectBody(t, tst.name, tst.responseBody, response)
        }
 
        // The Keep pull manager should have received one good list with 3
@@ -691,12 +697,12 @@ func TestPullHandler(t *testing.T) {
 func TestTrashHandler(t *testing.T) {
        defer teardown()
 
-       var user_token = "USER TOKEN"
+       var userToken = "USER TOKEN"
        data_manager_token = "DATA MANAGER TOKEN"
 
        trashq = NewWorkQueue()
 
-       good_json := []byte(`[
+       goodJSON := []byte(`[
                {
                        "locator":"block1",
                        "block_mtime":1409082153
@@ -711,37 +717,37 @@ func TestTrashHandler(t *testing.T) {
                }
        ]`)
 
-       bad_json := []byte(`I am not a valid JSON string`)
+       badJSON := []byte(`I am not a valid JSON string`)
 
        type trashTest struct {
-               name          string
-               req           RequestTester
-               response_code int
-               response_body string
+               name         string
+               req          RequestTester
+               responseCode int
+               responseBody string
        }
 
        var testcases = []trashTest{
                {
                        "Valid trash list from an ordinary user",
-                       RequestTester{"/trash", user_token, "PUT", good_json},
+                       RequestTester{"/trash", userToken, "PUT", goodJSON},
                        http.StatusUnauthorized,
                        "Unauthorized\n",
                },
                {
                        "Invalid trash list from an ordinary user",
-                       RequestTester{"/trash", user_token, "PUT", bad_json},
+                       RequestTester{"/trash", userToken, "PUT", badJSON},
                        http.StatusUnauthorized,
                        "Unauthorized\n",
                },
                {
                        "Valid trash list from the data manager",
-                       RequestTester{"/trash", data_manager_token, "PUT", good_json},
+                       RequestTester{"/trash", data_manager_token, "PUT", goodJSON},
                        http.StatusOK,
                        "Received 3 trash requests\n",
                },
                {
                        "Invalid trash list from the data manager",
-                       RequestTester{"/trash", data_manager_token, "PUT", bad_json},
+                       RequestTester{"/trash", data_manager_token, "PUT", badJSON},
                        http.StatusBadRequest,
                        "",
                },
@@ -749,8 +755,8 @@ func TestTrashHandler(t *testing.T) {
 
        for _, tst := range testcases {
                response := IssueRequest(&tst.req)
-               ExpectStatusCode(t, tst.name, tst.response_code, response)
-               ExpectBody(t, tst.name, tst.response_body, response)
+               ExpectStatusCode(t, tst.name, tst.responseCode, response)
+               ExpectBody(t, tst.name, tst.responseBody, response)
        }
 
        // The trash collector should have received one good list with 3
@@ -773,10 +779,10 @@ func TestTrashHandler(t *testing.T) {
 // REST router.  It returns the HTTP response to the request.
 func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder {
        response := httptest.NewRecorder()
-       body := bytes.NewReader(rt.request_body)
+       body := bytes.NewReader(rt.requestBody)
        req, _ := http.NewRequest(rt.method, rt.uri, body)
-       if rt.api_token != "" {
-               req.Header.Set("Authorization", "OAuth2 "+rt.api_token)
+       if rt.apiToken != "" {
+               req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
        }
        loggingRouter := MakeLoggingRESTRouter()
        loggingRouter.ServeHTTP(response, req)
@@ -788,22 +794,55 @@ func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder {
 func ExpectStatusCode(
        t *testing.T,
        testname string,
-       expected_status int,
+       expectedStatus int,
        response *httptest.ResponseRecorder) {
-       if response.Code != expected_status {
+       if response.Code != expectedStatus {
                t.Errorf("%s: expected status %d, got %+v",
-                       testname, expected_status, response)
+                       testname, expectedStatus, response)
        }
 }
 
 func ExpectBody(
        t *testing.T,
        testname string,
-       expected_body string,
+       expectedBody string,
        response *httptest.ResponseRecorder) {
-       if expected_body != "" && response.Body.String() != expected_body {
+       if expectedBody != "" && response.Body.String() != expectedBody {
                t.Errorf("%s: expected response body '%s', got %+v",
-                       testname, expected_body, response)
+                       testname, expectedBody, response)
+       }
+}
+
+// See #7121
+func TestPutNeedsOnlyOneBuffer(t *testing.T) {
+       defer teardown()
+       KeepVM = MakeTestVolumeManager(1)
+       defer KeepVM.Close()
+
+       defer func(orig *bufferPool) {
+               bufs = orig
+       }(bufs)
+       bufs = newBufferPool(1, BlockSize)
+
+       ok := make(chan struct{})
+       go func() {
+               for i := 0; i < 2; i++ {
+                       response := IssueRequest(
+                               &RequestTester{
+                                       method:      "PUT",
+                                       uri:         "/" + TestHash,
+                                       requestBody: TestBlock,
+                               })
+                       ExpectStatusCode(t,
+                               "TestPutNeedsOnlyOneBuffer", http.StatusOK, response)
+               }
+               ok <- struct{}{}
+       }()
+
+       select {
+       case <-ok:
+       case <-time.After(time.Second):
+               t.Fatal("PUT deadlocks with maxBuffers==1")
        }
 }
 
@@ -818,21 +857,21 @@ func TestPutHandlerNoBufferleak(t *testing.T) {
 
        ok := make(chan bool)
        go func() {
-               for i := 0; i < maxBuffers+1; i += 1 {
+               for i := 0; i < maxBuffers+1; i++ {
                        // Unauthenticated request, no server key
                        // => OK (unsigned response)
-                       unsigned_locator := "/" + TEST_HASH
+                       unsignedLocator := "/" + TestHash
                        response := IssueRequest(
                                &RequestTester{
-                                       method:       "PUT",
-                                       uri:          unsigned_locator,
-                                       request_body: TEST_BLOCK,
+                                       method:      "PUT",
+                                       uri:         unsignedLocator,
+                                       requestBody: TestBlock,
                                })
                        ExpectStatusCode(t,
                                "TestPutHandlerBufferleak", http.StatusOK, response)
                        ExpectBody(t,
                                "TestPutHandlerBufferleak",
-                               TEST_HASH_PUT_RESPONSE, response)
+                               TestHashPutResp, response)
                }
                ok <- true
        }()
@@ -854,26 +893,26 @@ func TestGetHandlerNoBufferleak(t *testing.T) {
        defer KeepVM.Close()
 
        vols := KeepVM.AllWritable()
-       if err := vols[0].Put(TEST_HASH, TEST_BLOCK); err != nil {
+       if err := vols[0].Put(TestHash, TestBlock); err != nil {
                t.Error(err)
        }
 
        ok := make(chan bool)
        go func() {
-               for i := 0; i < maxBuffers+1; i += 1 {
+               for i := 0; i < maxBuffers+1; i++ {
                        // Unauthenticated request, unsigned locator
                        // => OK
-                       unsigned_locator := "/" + TEST_HASH
+                       unsignedLocator := "/" + TestHash
                        response := IssueRequest(
                                &RequestTester{
                                        method: "GET",
-                                       uri:    unsigned_locator,
+                                       uri:    unsignedLocator,
                                })
                        ExpectStatusCode(t,
                                "Unauthenticated request, unsigned locator", http.StatusOK, response)
                        ExpectBody(t,
                                "Unauthenticated request, unsigned locator",
-                               string(TEST_BLOCK),
+                               string(TestBlock),
                                response)
                }
                ok <- true
index a86bb6a5b552887836e24cb858191bcbe920e479..8c9b8499c609bdad23ca8ac26e29286929323294 100644 (file)
@@ -8,7 +8,6 @@ package main
 // StatusHandler   (GET /status.json)
 
 import (
-       "bytes"
        "container/list"
        "crypto/md5"
        "encoding/json"
@@ -61,12 +60,14 @@ func MakeRESTRouter() *mux.Router {
        return rest
 }
 
+// BadRequestHandler is a HandleFunc to address bad requests.
 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
        http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
 }
 
+// GetBlockHandler is a HandleFunc to address Get block requests.
 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
-       if enforce_permissions {
+       if enforcePermissions {
                locator := req.URL.Path[1:] // strip leading slash
                if err := VerifySignature(locator, GetApiToken(req)); err != nil {
                        http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
@@ -74,7 +75,7 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
                }
        }
 
-       block, err := GetBlock(mux.Vars(req)["hash"], false)
+       block, err := GetBlock(mux.Vars(req)["hash"])
        if err != nil {
                // This type assertion is safe because the only errors
                // GetBlock can return are DiskHashError or NotFoundError.
@@ -88,6 +89,7 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
        resp.Write(block)
 }
 
+// PutBlockHandler is a HandleFunc to address Put block requests.
 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
        hash := mux.Vars(req)["hash"]
 
@@ -100,7 +102,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
                return
        }
 
-       if req.ContentLength > BLOCKSIZE {
+       if req.ContentLength > BlockSize {
                http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
                return
        }
@@ -129,18 +131,16 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
 
        // Success; add a size hint, sign the locator if possible, and
        // return it to the client.
-       return_hash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
-       api_token := GetApiToken(req)
-       if PermissionSecret != nil && api_token != "" {
+       returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
+       apiToken := GetApiToken(req)
+       if PermissionSecret != nil && apiToken != "" {
                expiry := time.Now().Add(blob_signature_ttl)
-               return_hash = SignLocator(return_hash, api_token, expiry)
+               returnHash = SignLocator(returnHash, apiToken, expiry)
        }
-       resp.Write([]byte(return_hash + "\n"))
+       resp.Write([]byte(returnHash + "\n"))
 }
 
-// IndexHandler
-//     A HandleFunc to address /index and /index/{prefix} requests.
-//
+// IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
        // Reject unauthorized requests.
        if !IsDataManagerToken(GetApiToken(req)) {
@@ -178,20 +178,15 @@ func IndexHandler(resp http.ResponseWriter, req *http.Request) {
 //            * device_num (an integer identifying the underlying filesystem)
 //            * bytes_free
 //            * bytes_used
-//
-type VolumeStatus struct {
-       MountPoint string `json:"mount_point"`
-       DeviceNum  uint64 `json:"device_num"`
-       BytesFree  uint64 `json:"bytes_free"`
-       BytesUsed  uint64 `json:"bytes_used"`
-}
 
+// PoolStatus struct
 type PoolStatus struct {
        Alloc uint64 `json:"BytesAllocated"`
        Cap   int    `json:"BuffersMax"`
        Len   int    `json:"BuffersInUse"`
 }
 
+// NodeStatus struct
 type NodeStatus struct {
        Volumes    []*VolumeStatus `json:"volumes"`
        BufferPool PoolStatus
@@ -203,6 +198,7 @@ type NodeStatus struct {
 var st NodeStatus
 var stLock sync.Mutex
 
+// StatusHandler addresses /status.json requests.
 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
        stLock.Lock()
        readNodeStatus(&st)
@@ -211,8 +207,8 @@ func StatusHandler(resp http.ResponseWriter, req *http.Request) {
        if err == nil {
                resp.Write(jstat)
        } else {
-               log.Printf("json.Marshal: %s\n", err)
-               log.Printf("NodeStatus = %v\n", &st)
+               log.Printf("json.Marshal: %s", err)
+               log.Printf("NodeStatus = %v", &st)
                http.Error(resp, err.Error(), 500)
        }
 }
@@ -322,7 +318,7 @@ func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
                if body, err := json.Marshal(result); err == nil {
                        resp.Write(body)
                } else {
-                       log.Printf("json.Marshal: %s (result = %v)\n", err, result)
+                       log.Printf("json.Marshal: %s (result = %v)", err, result)
                        http.Error(resp, err.Error(), 500)
                }
        }
@@ -361,11 +357,13 @@ func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
    If the JSON unmarshalling fails, return 400 Bad Request.
 */
 
+// PullRequest consists of a block locator and an ordered list of servers
 type PullRequest struct {
        Locator string   `json:"locator"`
        Servers []string `json:"servers"`
 }
 
+// PullHandler processes "PUT /pull" requests for the data manager.
 func PullHandler(resp http.ResponseWriter, req *http.Request) {
        // Reject unauthorized requests.
        if !IsDataManagerToken(GetApiToken(req)) {
@@ -395,11 +393,13 @@ func PullHandler(resp http.ResponseWriter, req *http.Request) {
        pullq.ReplaceQueue(plist)
 }
 
+// TrashRequest consists of a block locator and it's Mtime
 type TrashRequest struct {
        Locator    string `json:"locator"`
        BlockMtime int64  `json:"block_mtime"`
 }
 
+// TrashHandler processes /trash requests.
 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
        // Reject unauthorized requests.
        if !IsDataManagerToken(GetApiToken(req)) {
@@ -440,12 +440,9 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) {
 // should be the only part of the code that cares about which volume a
 // block is stored on, so it should be responsible for figuring out
 // which volume to check for fetching blocks, storing blocks, etc.
-
 // ==============================
-// GetBlock fetches and returns the block identified by "hash".  If
-// the update_timestamp argument is true, GetBlock also updates the
-// block's file modification time (for the sake of PutBlock, which
-// must update the file's timestamp when the block already exists).
+
+// GetBlock fetches and returns the block identified by "hash".
 //
 // On success, GetBlock returns a byte slice with the block data, and
 // a nil error.
@@ -455,23 +452,11 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) {
 // If the block found does not have the correct MD5 hash, returns
 // DiskHashError.
 //
-
-func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
+func GetBlock(hash string) ([]byte, error) {
        // Attempt to read the requested hash from a keep volume.
-       error_to_caller := NotFoundError
-
-       var vols []Volume
-       if update_timestamp {
-               // Pointless to find the block on an unwritable volume
-               // because Touch() will fail -- this is as good as
-               // "not found" for purposes of callers who need to
-               // update_timestamp.
-               vols = KeepVM.AllWritable()
-       } else {
-               vols = KeepVM.AllReadable()
-       }
+       errorToCaller := NotFoundError
 
-       for _, vol := range vols {
+       for _, vol := range KeepVM.AllReadable() {
                buf, err := vol.Get(hash)
                if err != nil {
                        // IsNotExist is an expected error and may be
@@ -480,7 +465,7 @@ func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
                        // volumes. If all volumes report IsNotExist,
                        // we return a NotFoundError.
                        if !os.IsNotExist(err) {
-                               log.Printf("GetBlock: reading %s: %s\n", hash, err)
+                               log.Printf("%s: Get(%s): %s", vol, hash, err)
                        }
                        continue
                }
@@ -490,56 +475,48 @@ func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
                if filehash != hash {
                        // TODO: Try harder to tell a sysadmin about
                        // this.
-                       log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
+                       log.Printf("%s: checksum mismatch for request %s (actual %s)",
                                vol, hash, filehash)
-                       error_to_caller = DiskHashError
+                       errorToCaller = DiskHashError
                        bufs.Put(buf)
                        continue
                }
-               if error_to_caller == DiskHashError {
+               if errorToCaller == DiskHashError {
                        log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
                                vol, hash)
                }
-               if update_timestamp {
-                       if err := vol.Touch(hash); err != nil {
-                               error_to_caller = GenericError
-                               log.Printf("%s: Touch %s failed: %s",
-                                       vol, hash, error_to_caller)
-                               bufs.Put(buf)
-                               continue
-                       }
-               }
                return buf, nil
        }
-       return nil, error_to_caller
+       return nil, errorToCaller
 }
 
-/* PutBlock(block, hash)
-   Stores the BLOCK (identified by the content id HASH) in Keep.
-
-   The MD5 checksum of the block must be identical to the content id HASH.
-   If not, an error is returned.
-
-   PutBlock stores the BLOCK on the first Keep volume with free space.
-   A failure code is returned to the user only if all volumes fail.
-
-   On success, PutBlock returns nil.
-   On failure, it returns a KeepError with one of the following codes:
-
-   500 Collision
-          A different block with the same hash already exists on this
-          Keep server.
-   422 MD5Fail
-          The MD5 hash of the BLOCK does not match the argument HASH.
-   503 Full
-          There was not enough space left in any Keep volume to store
-          the object.
-   500 Fail
-          The object could not be stored for some other reason (e.g.
-          all writes failed). The text of the error message should
-          provide as much detail as possible.
-*/
-
+// PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
+//
+// PutBlock(block, hash)
+//   Stores the BLOCK (identified by the content id HASH) in Keep.
+//
+//   The MD5 checksum of the block must be identical to the content id HASH.
+//   If not, an error is returned.
+//
+//   PutBlock stores the BLOCK on the first Keep volume with free space.
+//   A failure code is returned to the user only if all volumes fail.
+//
+//   On success, PutBlock returns nil.
+//   On failure, it returns a KeepError with one of the following codes:
+//
+//   500 Collision
+//          A different block with the same hash already exists on this
+//          Keep server.
+//   422 MD5Fail
+//          The MD5 hash of the BLOCK does not match the argument HASH.
+//   503 Full
+//          There was not enough space left in any Keep volume to store
+//          the object.
+//   500 Fail
+//          The object could not be stored for some other reason (e.g.
+//          all writes failed). The text of the error message should
+//          provide as much detail as possible.
+//
 func PutBlock(block []byte, hash string) error {
        // Check that BLOCK's checksum matches HASH.
        blockhash := fmt.Sprintf("%x", md5.Sum(block))
@@ -548,21 +525,11 @@ func PutBlock(block []byte, hash string) error {
                return RequestHashError
        }
 
-       // If we already have a block on disk under this identifier, return
-       // success (but check for MD5 collisions).  While fetching the block,
-       // update its timestamp.
-       // The only errors that GetBlock can return are DiskHashError and NotFoundError.
-       // In either case, we want to write our new (good) block to disk,
-       // so there is nothing special to do if err != nil.
-       //
-       if oldblock, err := GetBlock(hash, true); err == nil {
-               defer bufs.Put(oldblock)
-               if bytes.Compare(block, oldblock) == 0 {
-                       // The block already exists; return success.
-                       return nil
-               } else {
-                       return CollisionError
-               }
+       // If we already have this data, it's intact on disk, and we
+       // can update its timestamp, return success. If we have
+       // different data with the same hash, return failure.
+       if err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
+               return err
        }
 
        // Choose a Keep volume to write to.
@@ -590,25 +557,60 @@ func PutBlock(block []byte, hash string) error {
                        // write did not succeed.  Report the
                        // error and continue trying.
                        allFull = false
-                       log.Printf("%s: Write(%s): %s\n", vol, hash, err)
+                       log.Printf("%s: Write(%s): %s", vol, hash, err)
                }
        }
 
        if allFull {
                log.Print("All volumes are full.")
                return FullError
-       } else {
-               // Already logged the non-full errors.
-               return GenericError
        }
+       // Already logged the non-full errors.
+       return GenericError
+}
+
+// CompareAndTouch returns nil if one of the volumes already has the
+// given content and it successfully updates the relevant block's
+// modification time in order to protect it from premature garbage
+// collection.
+func CompareAndTouch(hash string, buf []byte) error {
+       var bestErr error = NotFoundError
+       for _, vol := range KeepVM.AllWritable() {
+               if err := vol.Compare(hash, buf); err == CollisionError {
+                       // Stop if we have a block with same hash but
+                       // different content. (It will be impossible
+                       // to tell which one is wanted if we have
+                       // both, so there's no point writing it even
+                       // on a different volume.)
+                       log.Printf("%s: Compare(%s): %s", vol, hash, err)
+                       return err
+               } else if os.IsNotExist(err) {
+                       // Block does not exist. This is the only
+                       // "normal" error: we don't log anything.
+                       continue
+               } else if err != nil {
+                       // Couldn't open file, data is corrupt on
+                       // disk, etc.: log this abnormal condition,
+                       // and try the next volume.
+                       log.Printf("%s: Compare(%s): %s", vol, hash, err)
+                       continue
+               }
+               if err := vol.Touch(hash); err != nil {
+                       log.Printf("%s: Touch %s failed: %s", vol, hash, err)
+                       bestErr = err
+                       continue
+               }
+               // Compare and Touch both worked --> done.
+               return nil
+       }
+       return bestErr
 }
 
 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
 
-// IsValidLocator
-//     Return true if the specified string is a valid Keep locator.
-//     When Keep is extended to support hash types other than MD5,
-//     this should be updated to cover those as well.
+// IsValidLocator returns true if the specified string is a valid Keep locator.
+//   When Keep is extended to support hash types other than MD5,
+//   this should be updated to cover those as well.
 //
 func IsValidLocator(loc string) bool {
        return validLocatorRe.MatchString(loc)
@@ -629,36 +631,36 @@ func GetApiToken(req *http.Request) string {
 }
 
 // IsExpired returns true if the given Unix timestamp (expressed as a
-// hexadecimal string) is in the past, or if timestamp_hex cannot be
+// hexadecimal string) is in the past, or if timestampHex cannot be
 // parsed as a hexadecimal string.
-func IsExpired(timestamp_hex string) bool {
-       ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
+func IsExpired(timestampHex string) bool {
+       ts, err := strconv.ParseInt(timestampHex, 16, 0)
        if err != nil {
-               log.Printf("IsExpired: %s\n", err)
+               log.Printf("IsExpired: %s", err)
                return true
        }
        return time.Unix(ts, 0).Before(time.Now())
 }
 
-// CanDelete returns true if the user identified by api_token is
+// CanDelete returns true if the user identified by apiToken is
 // allowed to delete blocks.
-func CanDelete(api_token string) bool {
-       if api_token == "" {
+func CanDelete(apiToken string) bool {
+       if apiToken == "" {
                return false
        }
        // Blocks may be deleted only when Keep has been configured with a
        // data manager.
-       if IsDataManagerToken(api_token) {
+       if IsDataManagerToken(apiToken) {
                return true
        }
-       // TODO(twp): look up api_token with the API server
+       // TODO(twp): look up apiToken with the API server
        // return true if is_admin is true and if the token
        // has unlimited scope
        return false
 }
 
-// IsDataManagerToken returns true if api_token represents the data
+// IsDataManagerToken returns true if apiToken represents the data
 // manager's token.
-func IsDataManagerToken(api_token string) bool {
-       return data_manager_token != "" && api_token == data_manager_token
+func IsDataManagerToken(apiToken string) bool {
+       return data_manager_token != "" && apiToken == data_manager_token
 }
index 593035717bf65f041327a7d14209277d6338d231..a79a401e71d82f40bcee38cd5a9eb6ebb3fa09df 100644 (file)
@@ -14,6 +14,7 @@ import (
        "os"
        "os/signal"
        "strings"
+       "sync"
        "syscall"
        "time"
 )
@@ -26,25 +27,26 @@ import (
 
 // Default TCP address on which to listen for requests.
 // Initialized by the --listen flag.
-const DEFAULT_ADDR = ":25107"
+const DefaultAddr = ":25107"
 
 // A Keep "block" is 64MB.
-const BLOCKSIZE = 64 * 1024 * 1024
+const BlockSize = 64 * 1024 * 1024
 
-// A Keep volume must have at least MIN_FREE_KILOBYTES available
+// A Keep volume must have at least MinFreeKilobytes available
 // in order to permit writes.
-const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
+const MinFreeKilobytes = BlockSize / 1024
 
 // Until #6221 is resolved, never_delete must be true.
-// However, allow it to be false in testing.
+// However, allow it to be false in testing with TEST_DATA_MANAGER_TOKEN
 const TEST_DATA_MANAGER_TOKEN = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h"
 
-var PROC_MOUNTS = "/proc/mounts"
+// ProcMounts /proc/mounts
+var ProcMounts = "/proc/mounts"
 
-// enforce_permissions controls whether permission signatures
+// enforcePermissions controls whether permission signatures
 // should be enforced (affecting GET and DELETE requests).
 // Initialized by the -enforce-permissions flag.
-var enforce_permissions bool
+var enforcePermissions bool
 
 // blob_signature_ttl is the time duration for which new permission
 // signatures (returned by PUT requests) will be valid.
@@ -63,8 +65,7 @@ var never_delete = true
 var maxBuffers = 128
 var bufs *bufferPool
 
-// ==========
-// Error types.
+// KeepError types.
 //
 type KeepError struct {
        HTTPCode int
@@ -136,10 +137,14 @@ func (vs *volumeSet) Set(value string) error {
        if _, err := os.Stat(value); err != nil {
                return err
        }
+       var locker sync.Locker
+       if flagSerializeIO {
+               locker = &sync.Mutex{}
+       }
        *vs = append(*vs, &UnixVolume{
-               root:      value,
-               serialize: flagSerializeIO,
-               readonly:  flagReadonly,
+               root:     value,
+               locker:   locker,
+               readonly: flagReadonly,
        })
        return nil
 }
@@ -160,15 +165,15 @@ func (vs *volumeSet) String() string {
 // other than "/". It returns the number of volumes added.
 func (vs *volumeSet) Discover() int {
        added := 0
-       f, err := os.Open(PROC_MOUNTS)
+       f, err := os.Open(ProcMounts)
        if err != nil {
-               log.Fatalf("opening %s: %s", PROC_MOUNTS, err)
+               log.Fatalf("opening %s: %s", ProcMounts, err)
        }
        scanner := bufio.NewScanner(f)
        for scanner.Scan() {
                args := strings.Fields(scanner.Text())
                if err := scanner.Err(); err != nil {
-                       log.Fatalf("reading %s: %s", PROC_MOUNTS, err)
+                       log.Fatalf("reading %s: %s", ProcMounts, err)
                }
                dev, mount := args[0], args[1]
                if mount == "/" {
@@ -224,14 +229,14 @@ func main() {
                "File with the API token used by the Data Manager. All DELETE "+
                        "requests or GET /index requests must carry this token.")
        flag.BoolVar(
-               &enforce_permissions,
+               &enforcePermissions,
                "enforce-permissions",
                false,
                "Enforce permission signatures on requests.")
        flag.StringVar(
                &listen,
                "listen",
-               DEFAULT_ADDR,
+               DefaultAddr,
                "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
        flag.BoolVar(
                &never_delete,
@@ -288,14 +293,14 @@ func main() {
                &maxBuffers,
                "max-buffers",
                maxBuffers,
-               fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BLOCKSIZE>>20))
+               fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
 
        flag.Parse()
 
        if maxBuffers < 0 {
                log.Fatal("-max-buffers must be greater than zero.")
        }
-       bufs = newBufferPool(maxBuffers, BLOCKSIZE)
+       bufs = newBufferPool(maxBuffers, BlockSize)
 
        if pidfile != "" {
                f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
@@ -358,7 +363,7 @@ func main() {
        blob_signature_ttl = time.Duration(permission_ttl_sec) * time.Second
 
        if PermissionSecret == nil {
-               if enforce_permissions {
+               if enforcePermissions {
                        log.Fatal("-enforce-permissions requires a permission key")
                } else {
                        log.Println("Running without a PermissionSecret. Block locators " +
index e01b01363d4e2de2f77b854ea5789fb996276234..dc641abe0cc39a08ed2cc76871c030897767d277 100644 (file)
@@ -12,19 +12,19 @@ import (
        "testing"
 )
 
-var TEST_BLOCK = []byte("The quick brown fox jumps over the lazy dog.")
-var TEST_HASH = "e4d909c290d0fb1ca068ffaddf22cbd0"
-var TEST_HASH_PUT_RESPONSE = "e4d909c290d0fb1ca068ffaddf22cbd0+44\n"
+var TestBlock = []byte("The quick brown fox jumps over the lazy dog.")
+var TestHash = "e4d909c290d0fb1ca068ffaddf22cbd0"
+var TestHashPutResp = "e4d909c290d0fb1ca068ffaddf22cbd0+44\n"
 
-var TEST_BLOCK_2 = []byte("Pack my box with five dozen liquor jugs.")
-var TEST_HASH_2 = "f15ac516f788aec4f30932ffb6395c39"
+var TestBlock2 = []byte("Pack my box with five dozen liquor jugs.")
+var TestHash2 = "f15ac516f788aec4f30932ffb6395c39"
 
-var TEST_BLOCK_3 = []byte("Now is the time for all good men to come to the aid of their country.")
-var TEST_HASH_3 = "eed29bbffbc2dbe5e5ee0bb71888e61f"
+var TestBlock3 = []byte("Now is the time for all good men to come to the aid of their country.")
+var TestHash3 = "eed29bbffbc2dbe5e5ee0bb71888e61f"
 
-// BAD_BLOCK is used to test collisions and corruption.
+// BadBlock is used to test collisions and corruption.
 // It must not match any test hashes.
-var BAD_BLOCK = []byte("The magic words are squeamish ossifrage.")
+var BadBlock = []byte("The magic words are squeamish ossifrage.")
 
 // TODO(twp): Tests still to be written
 //
@@ -55,17 +55,17 @@ func TestGetBlock(t *testing.T) {
        defer KeepVM.Close()
 
        vols := KeepVM.AllReadable()
-       if err := vols[1].Put(TEST_HASH, TEST_BLOCK); err != nil {
+       if err := vols[1].Put(TestHash, TestBlock); err != nil {
                t.Error(err)
        }
 
        // Check that GetBlock returns success.
-       result, err := GetBlock(TEST_HASH, false)
+       result, err := GetBlock(TestHash)
        if err != nil {
                t.Errorf("GetBlock error: %s", err)
        }
-       if fmt.Sprint(result) != fmt.Sprint(TEST_BLOCK) {
-               t.Errorf("expected %s, got %s", TEST_BLOCK, result)
+       if fmt.Sprint(result) != fmt.Sprint(TestBlock) {
+               t.Errorf("expected %s, got %s", TestBlock, result)
        }
 }
 
@@ -80,7 +80,7 @@ func TestGetBlockMissing(t *testing.T) {
        defer KeepVM.Close()
 
        // Check that GetBlock returns failure.
-       result, err := GetBlock(TEST_HASH, false)
+       result, err := GetBlock(TestHash)
        if err != NotFoundError {
                t.Errorf("Expected NotFoundError, got %v", result)
        }
@@ -98,10 +98,10 @@ func TestGetBlockCorrupt(t *testing.T) {
        defer KeepVM.Close()
 
        vols := KeepVM.AllReadable()
-       vols[0].Put(TEST_HASH, BAD_BLOCK)
+       vols[0].Put(TestHash, BadBlock)
 
        // Check that GetBlock returns failure.
-       result, err := GetBlock(TEST_HASH, false)
+       result, err := GetBlock(TestHash)
        if err != DiskHashError {
                t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, result)
        }
@@ -122,18 +122,18 @@ func TestPutBlockOK(t *testing.T) {
        defer KeepVM.Close()
 
        // Check that PutBlock stores the data as expected.
-       if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
+       if err := PutBlock(TestBlock, TestHash); err != nil {
                t.Fatalf("PutBlock: %v", err)
        }
 
        vols := KeepVM.AllReadable()
-       result, err := vols[1].Get(TEST_HASH)
+       result, err := vols[1].Get(TestHash)
        if err != nil {
                t.Fatalf("Volume #0 Get returned error: %v", err)
        }
-       if string(result) != string(TEST_BLOCK) {
+       if string(result) != string(TestBlock) {
                t.Fatalf("PutBlock stored '%s', Get retrieved '%s'",
-                       string(TEST_BLOCK), string(result))
+                       string(TestBlock), string(result))
        }
 }
 
@@ -152,18 +152,18 @@ func TestPutBlockOneVol(t *testing.T) {
        vols[0].(*MockVolume).Bad = true
 
        // Check that PutBlock stores the data as expected.
-       if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
+       if err := PutBlock(TestBlock, TestHash); err != nil {
                t.Fatalf("PutBlock: %v", err)
        }
 
-       result, err := GetBlock(TEST_HASH, false)
+       result, err := GetBlock(TestHash)
        if err != nil {
                t.Fatalf("GetBlock: %v", err)
        }
-       if string(result) != string(TEST_BLOCK) {
+       if string(result) != string(TestBlock) {
                t.Error("PutBlock/GetBlock mismatch")
                t.Fatalf("PutBlock stored '%s', GetBlock retrieved '%s'",
-                       string(TEST_BLOCK), string(result))
+                       string(TestBlock), string(result))
        }
 }
 
@@ -180,12 +180,12 @@ func TestPutBlockMD5Fail(t *testing.T) {
 
        // Check that PutBlock returns the expected error when the hash does
        // not match the block.
-       if err := PutBlock(BAD_BLOCK, TEST_HASH); err != RequestHashError {
+       if err := PutBlock(BadBlock, TestHash); err != RequestHashError {
                t.Error("Expected RequestHashError, got %v", err)
        }
 
        // Confirm that GetBlock fails to return anything.
-       if result, err := GetBlock(TEST_HASH, false); err != NotFoundError {
+       if result, err := GetBlock(TestHash); err != NotFoundError {
                t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)",
                        string(result), err)
        }
@@ -202,17 +202,17 @@ func TestPutBlockCorrupt(t *testing.T) {
        KeepVM = MakeTestVolumeManager(2)
        defer KeepVM.Close()
 
-       // Store a corrupted block under TEST_HASH.
+       // Store a corrupted block under TestHash.
        vols := KeepVM.AllWritable()
-       vols[0].Put(TEST_HASH, BAD_BLOCK)
-       if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
+       vols[0].Put(TestHash, BadBlock)
+       if err := PutBlock(TestBlock, TestHash); err != nil {
                t.Errorf("PutBlock: %v", err)
        }
 
-       // The block on disk should now match TEST_BLOCK.
-       if block, err := GetBlock(TEST_HASH, false); err != nil {
+       // The block on disk should now match TestBlock.
+       if block, err := GetBlock(TestHash); err != nil {
                t.Errorf("GetBlock: %v", err)
-       } else if bytes.Compare(block, TEST_BLOCK) != 0 {
+       } else if bytes.Compare(block, TestBlock) != 0 {
                t.Errorf("GetBlock returned: '%s'", string(block))
        }
 }
@@ -260,35 +260,35 @@ func TestPutBlockTouchFails(t *testing.T) {
        // Store a block and then make the underlying volume bad,
        // so a subsequent attempt to update the file timestamp
        // will fail.
-       vols[0].Put(TEST_HASH, BAD_BLOCK)
-       old_mtime, err := vols[0].Mtime(TEST_HASH)
+       vols[0].Put(TestHash, BadBlock)
+       oldMtime, err := vols[0].Mtime(TestHash)
        if err != nil {
-               t.Fatalf("vols[0].Mtime(%s): %s\n", TEST_HASH, err)
+               t.Fatalf("vols[0].Mtime(%s): %s\n", TestHash, err)
        }
 
        // vols[0].Touch will fail on the next call, so the volume
        // manager will store a copy on vols[1] instead.
        vols[0].(*MockVolume).Touchable = false
-       if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
+       if err := PutBlock(TestBlock, TestHash); err != nil {
                t.Fatalf("PutBlock: %v", err)
        }
        vols[0].(*MockVolume).Touchable = true
 
        // Now the mtime on the block on vols[0] should be unchanged, and
        // there should be a copy of the block on vols[1].
-       new_mtime, err := vols[0].Mtime(TEST_HASH)
+       newMtime, err := vols[0].Mtime(TestHash)
        if err != nil {
-               t.Fatalf("vols[0].Mtime(%s): %s\n", TEST_HASH, err)
+               t.Fatalf("vols[0].Mtime(%s): %s\n", TestHash, err)
        }
-       if !new_mtime.Equal(old_mtime) {
-               t.Errorf("mtime was changed on vols[0]:\nold_mtime = %v\nnew_mtime = %v\n",
-                       old_mtime, new_mtime)
+       if !newMtime.Equal(oldMtime) {
+               t.Errorf("mtime was changed on vols[0]:\noldMtime = %v\nnewMtime = %v\n",
+                       oldMtime, newMtime)
        }
-       result, err := vols[1].Get(TEST_HASH)
+       result, err := vols[1].Get(TestHash)
        if err != nil {
                t.Fatalf("vols[1]: %v", err)
        }
-       if bytes.Compare(result, TEST_BLOCK) != 0 {
+       if bytes.Compare(result, TestBlock) != 0 {
                t.Errorf("new block does not match test block\nnew block = %v\n", result)
        }
 }
@@ -309,7 +309,7 @@ func TestDiscoverTmpfs(t *testing.T) {
                }
        }
 
-       // Set up a bogus PROC_MOUNTS file.
+       // Set up a bogus ProcMounts file.
        f, err := ioutil.TempFile("", "keeptest")
        if err != nil {
                t.Fatal(err)
@@ -327,7 +327,7 @@ func TestDiscoverTmpfs(t *testing.T) {
                fmt.Fprintf(f, "tmpfs %s tmpfs %s 0 0\n", path.Dir(vol), opts)
        }
        f.Close()
-       PROC_MOUNTS = f.Name()
+       ProcMounts = f.Name()
 
        var resultVols volumeSet
        added := resultVols.Discover()
@@ -355,7 +355,7 @@ func TestDiscoverTmpfs(t *testing.T) {
 func TestDiscoverNone(t *testing.T) {
        defer teardown()
 
-       // Set up a bogus PROC_MOUNTS file with no Keep vols.
+       // Set up a bogus ProcMounts file with no Keep vols.
        f, err := ioutil.TempFile("", "keeptest")
        if err != nil {
                t.Fatal(err)
@@ -367,7 +367,7 @@ func TestDiscoverNone(t *testing.T) {
        fmt.Fprintln(f, "udev /dev devtmpfs opts 0 0")
        fmt.Fprintln(f, "devpts /dev/pts devpts opts 0 0")
        f.Close()
-       PROC_MOUNTS = f.Name()
+       ProcMounts = f.Name()
 
        var resultVols volumeSet
        added := resultVols.Discover()
@@ -388,23 +388,23 @@ func TestIndex(t *testing.T) {
        defer KeepVM.Close()
 
        vols := KeepVM.AllReadable()
-       vols[0].Put(TEST_HASH, TEST_BLOCK)
-       vols[1].Put(TEST_HASH_2, TEST_BLOCK_2)
-       vols[0].Put(TEST_HASH_3, TEST_BLOCK_3)
-       vols[0].Put(TEST_HASH+".meta", []byte("metadata"))
-       vols[1].Put(TEST_HASH_2+".meta", []byte("metadata"))
+       vols[0].Put(TestHash, TestBlock)
+       vols[1].Put(TestHash2, TestBlock2)
+       vols[0].Put(TestHash3, TestBlock3)
+       vols[0].Put(TestHash+".meta", []byte("metadata"))
+       vols[1].Put(TestHash2+".meta", []byte("metadata"))
 
        buf := new(bytes.Buffer)
        vols[0].IndexTo("", buf)
        vols[1].IndexTo("", buf)
-       index_rows := strings.Split(string(buf.Bytes()), "\n")
-       sort.Strings(index_rows)
-       sorted_index := strings.Join(index_rows, "\n")
-       expected := `^\n` + TEST_HASH + `\+\d+ \d+\n` +
-               TEST_HASH_3 + `\+\d+ \d+\n` +
-               TEST_HASH_2 + `\+\d+ \d+$`
-
-       match, err := regexp.MatchString(expected, sorted_index)
+       indexRows := strings.Split(string(buf.Bytes()), "\n")
+       sort.Strings(indexRows)
+       sortedIndex := strings.Join(indexRows, "\n")
+       expected := `^\n` + TestHash + `\+\d+ \d+\n` +
+               TestHash3 + `\+\d+ \d+\n` +
+               TestHash2 + `\+\d+ \d+$`
+
+       match, err := regexp.MatchString(expected, sortedIndex)
        if err == nil {
                if !match {
                        t.Errorf("IndexLocators returned:\n%s", string(buf.Bytes()))
@@ -420,8 +420,8 @@ func TestIndex(t *testing.T) {
 
 // MakeTestVolumeManager returns a RRVolumeManager with the specified
 // number of MockVolumes.
-func MakeTestVolumeManager(num_volumes int) VolumeManager {
-       vols := make([]Volume, num_volumes)
+func MakeTestVolumeManager(numVolumes int) VolumeManager {
+       vols := make([]Volume, numVolumes)
        for i := range vols {
                vols[i] = CreateMockVolume()
        }
@@ -431,7 +431,7 @@ func MakeTestVolumeManager(num_volumes int) VolumeManager {
 // teardown cleans up after each test.
 func teardown() {
        data_manager_token = ""
-       enforce_permissions = false
+       enforcePermissions = false
        PermissionSecret = nil
        KeepVM = nil
 }
index b622d1d3eefd2eb3bedfb5e4260976cb2075f85a..47bb6d77179f226dc20d84a525fa5a0de3a5f7af 100644 (file)
@@ -11,6 +11,7 @@ import (
        "time"
 )
 
+// LoggingResponseWriter has anonymous fields ResponseWriter and ResponseBody
 type LoggingResponseWriter struct {
        Status int
        Length int
@@ -18,6 +19,7 @@ type LoggingResponseWriter struct {
        ResponseBody string
 }
 
+// WriteHeader writes header to ResponseWriter
 func (loggingWriter *LoggingResponseWriter) WriteHeader(code int) {
        loggingWriter.Status = code
        loggingWriter.ResponseWriter.WriteHeader(code)
@@ -31,10 +33,12 @@ func (loggingWriter *LoggingResponseWriter) Write(data []byte) (int, error) {
        return loggingWriter.ResponseWriter.Write(data)
 }
 
+// LoggingRESTRouter is used to add logging capabilities to mux.Router
 type LoggingRESTRouter struct {
        router *mux.Router
 }
 
+// MakeLoggingRESTRouter initializes LoggingRESTRouter
 func MakeLoggingRESTRouter() *LoggingRESTRouter {
        router := MakeRESTRouter()
        return (&LoggingRESTRouter{router})
diff --git a/services/keepstore/mock_mutex_for_test.go b/services/keepstore/mock_mutex_for_test.go
new file mode 100644 (file)
index 0000000..e75d910
--- /dev/null
@@ -0,0 +1,23 @@
+package main
+
+type MockMutex struct {
+       AllowLock   chan struct{}
+       AllowUnlock chan struct{}
+}
+
+func NewMockMutex() *MockMutex {
+       return &MockMutex{
+               AllowLock: make(chan struct{}),
+               AllowUnlock: make(chan struct{}),
+       }
+}
+
+// Lock waits for someone to send to AllowLock.
+func (m *MockMutex) Lock() {
+       <- m.AllowLock
+}
+
+// Unlock waits for someone to send to AllowUnlock.
+func (m *MockMutex) Unlock() {
+       <- m.AllowUnlock
+}
index 65160b1868913638e8315a266e0b3736ecfbe14c..5579238112b65ed0747ec33e82e582e580e74e6d 100644 (file)
@@ -51,65 +51,66 @@ import (
 var PermissionSecret []byte
 
 // MakePermSignature returns a string representing the signed permission
-// hint for the blob identified by blob_hash, api_token and expiration timestamp.
-func MakePermSignature(blob_hash string, api_token string, expiry string) string {
+// hint for the blob identified by blobHash, apiToken and expiration timestamp.
+func MakePermSignature(blobHash string, apiToken string, expiry string) string {
        hmac := hmac.New(sha1.New, PermissionSecret)
-       hmac.Write([]byte(blob_hash))
+       hmac.Write([]byte(blobHash))
        hmac.Write([]byte("@"))
-       hmac.Write([]byte(api_token))
+       hmac.Write([]byte(apiToken))
        hmac.Write([]byte("@"))
        hmac.Write([]byte(expiry))
        digest := hmac.Sum(nil)
        return fmt.Sprintf("%x", digest)
 }
 
-// SignLocator takes a blob_locator, an api_token and an expiry time, and
+// SignLocator takes a blobLocator, an apiToken and an expiry time, and
 // returns a signed locator string.
-func SignLocator(blob_locator string, api_token string, expiry time.Time) string {
+func SignLocator(blobLocator string, apiToken string, expiry time.Time) string {
        // If no permission secret or API token is available,
        // return an unsigned locator.
-       if PermissionSecret == nil || api_token == "" {
-               return blob_locator
+       if PermissionSecret == nil || apiToken == "" {
+               return blobLocator
        }
        // Extract the hash from the blob locator, omitting any size hint that may be present.
-       blob_hash := strings.Split(blob_locator, "+")[0]
+       blobHash := strings.Split(blobLocator, "+")[0]
        // Return the signed locator string.
-       timestamp_hex := fmt.Sprintf("%08x", expiry.Unix())
-       return blob_locator +
-               "+A" + MakePermSignature(blob_hash, api_token, timestamp_hex) +
-               "@" + timestamp_hex
+       timestampHex := fmt.Sprintf("%08x", expiry.Unix())
+       return blobLocator +
+               "+A" + MakePermSignature(blobHash, apiToken, timestampHex) +
+               "@" + timestampHex
 }
 
 var signedLocatorRe = regexp.MustCompile(`^([[:xdigit:]]{32}).*\+A([[:xdigit:]]{40})@([[:xdigit:]]{8})`)
 
-// VerifySignature returns nil if the signature on the signed_locator
-// can be verified using the given api_token. Otherwise it returns
+// VerifySignature returns nil if the signature on the signedLocator
+// can be verified using the given apiToken. Otherwise it returns
 // either ExpiredError (if the timestamp has expired, which is
 // something the client could have figured out independently) or
 // PermissionError.
-func VerifySignature(signed_locator string, api_token string) error {
-       matches := signedLocatorRe.FindStringSubmatch(signed_locator)
+func VerifySignature(signedLocator string, apiToken string) error {
+       matches := signedLocatorRe.FindStringSubmatch(signedLocator)
        if matches == nil {
                // Could not find a permission signature at all
                return PermissionError
        }
-       blob_hash := matches[1]
-       sig_hex := matches[2]
-       exp_hex := matches[3]
-       if exp_time, err := ParseHexTimestamp(exp_hex); err != nil {
+       blobHash := matches[1]
+       sigHex := matches[2]
+       expHex := matches[3]
+       if expTime, err := ParseHexTimestamp(expHex); err != nil {
                return PermissionError
-       } else if exp_time.Before(time.Now()) {
+       } else if expTime.Before(time.Now()) {
                return ExpiredError
        }
-       if sig_hex != MakePermSignature(blob_hash, api_token, exp_hex) {
+       if sigHex != MakePermSignature(blobHash, apiToken, expHex) {
                return PermissionError
        }
        return nil
 }
 
-func ParseHexTimestamp(timestamp_hex string) (ts time.Time, err error) {
-       if ts_int, e := strconv.ParseInt(timestamp_hex, 16, 0); e == nil {
-               ts = time.Unix(ts_int, 0)
+// ParseHexTimestamp parses timestamp
+func ParseHexTimestamp(timestampHex string) (ts time.Time, err error) {
+       if tsInt, e := strconv.ParseInt(timestampHex, 16, 0); e == nil {
+               ts = time.Unix(tsInt, 0)
        } else {
                err = e
        }
index e43cb8dcd99bf39d4318153525b4f46c660239ce..59516af85f898efd223389f42199901c6ae65862 100644 (file)
@@ -6,91 +6,91 @@ import (
 )
 
 const (
-       known_hash    = "acbd18db4cc2f85cedef654fccc4a4d8"
-       known_locator = known_hash + "+3"
-       known_token   = "hocfupkn2pjhrpgp2vxv8rsku7tvtx49arbc9s4bvu7p7wxqvk"
-       known_key     = "13u9fkuccnboeewr0ne3mvapk28epf68a3bhj9q8sb4l6e4e5mkk" +
+       knownHash    = "acbd18db4cc2f85cedef654fccc4a4d8"
+       knownLocator = knownHash + "+3"
+       knownToken   = "hocfupkn2pjhrpgp2vxv8rsku7tvtx49arbc9s4bvu7p7wxqvk"
+       knownKey     = "13u9fkuccnboeewr0ne3mvapk28epf68a3bhj9q8sb4l6e4e5mkk" +
                "p6nhj2mmpscgu1zze5h5enydxfe3j215024u16ij4hjaiqs5u4pzsl3nczmaoxnc" +
                "ljkm4875xqn4xv058koz3vkptmzhyheiy6wzevzjmdvxhvcqsvr5abhl15c2d4o4" +
                "jhl0s91lojy1mtrzqqvprqcverls0xvy9vai9t1l1lvvazpuadafm71jl4mrwq2y" +
                "gokee3eamvjy8qq1fvy238838enjmy5wzy2md7yvsitp5vztft6j4q866efym7e6" +
                "vu5wm9fpnwjyxfldw3vbo01mgjs75rgo7qioh8z8ij7jpyp8508okhgbbex3ceei" +
                "786u5rw2a9gx743dj3fgq2irk"
-       known_signature      = "257f3f5f5f0a4e4626a18fc74bd42ec34dcb228a"
-       known_timestamp      = "7fffffff"
-       known_sig_hint       = "+A" + known_signature + "@" + known_timestamp
-       known_signed_locator = known_locator + known_sig_hint
+       knownSignature     = "257f3f5f5f0a4e4626a18fc74bd42ec34dcb228a"
+       knownTimestamp     = "7fffffff"
+       knownSigHint       = "+A" + knownSignature + "@" + knownTimestamp
+       knownSignedLocator = knownLocator + knownSigHint
 )
 
 func TestSignLocator(t *testing.T) {
-       PermissionSecret = []byte(known_key)
+       PermissionSecret = []byte(knownKey)
        defer func() { PermissionSecret = nil }()
 
-       if ts, err := ParseHexTimestamp(known_timestamp); err != nil {
-               t.Errorf("bad known_timestamp %s", known_timestamp)
+       if ts, err := ParseHexTimestamp(knownTimestamp); err != nil {
+               t.Errorf("bad knownTimestamp %s", knownTimestamp)
        } else {
-               if known_signed_locator != SignLocator(known_locator, known_token, ts) {
+               if knownSignedLocator != SignLocator(knownLocator, knownToken, ts) {
                        t.Fail()
                }
        }
 }
 
 func TestVerifySignature(t *testing.T) {
-       PermissionSecret = []byte(known_key)
+       PermissionSecret = []byte(knownKey)
        defer func() { PermissionSecret = nil }()
 
-       if VerifySignature(known_signed_locator, known_token) != nil {
+       if VerifySignature(knownSignedLocator, knownToken) != nil {
                t.Fail()
        }
 }
 
 func TestVerifySignatureExtraHints(t *testing.T) {
-       PermissionSecret = []byte(known_key)
+       PermissionSecret = []byte(knownKey)
        defer func() { PermissionSecret = nil }()
 
-       if VerifySignature(known_locator+"+K@xyzzy"+known_sig_hint, known_token) != nil {
+       if VerifySignature(knownLocator+"+K@xyzzy"+knownSigHint, knownToken) != nil {
                t.Fatal("Verify cannot handle hint before permission signature")
        }
 
-       if VerifySignature(known_locator+known_sig_hint+"+Zfoo", known_token) != nil {
+       if VerifySignature(knownLocator+knownSigHint+"+Zfoo", knownToken) != nil {
                t.Fatal("Verify cannot handle hint after permission signature")
        }
 
-       if VerifySignature(known_locator+"+K@xyzzy"+known_sig_hint+"+Zfoo", known_token) != nil {
+       if VerifySignature(knownLocator+"+K@xyzzy"+knownSigHint+"+Zfoo", knownToken) != nil {
                t.Fatal("Verify cannot handle hints around permission signature")
        }
 }
 
 // The size hint on the locator string should not affect signature validation.
 func TestVerifySignatureWrongSize(t *testing.T) {
-       PermissionSecret = []byte(known_key)
+       PermissionSecret = []byte(knownKey)
        defer func() { PermissionSecret = nil }()
 
-       if VerifySignature(known_hash+"+999999"+known_sig_hint, known_token) != nil {
+       if VerifySignature(knownHash+"+999999"+knownSigHint, knownToken) != nil {
                t.Fatal("Verify cannot handle incorrect size hint")
        }
 
-       if VerifySignature(known_hash+known_sig_hint, known_token) != nil {
+       if VerifySignature(knownHash+knownSigHint, knownToken) != nil {
                t.Fatal("Verify cannot handle missing size hint")
        }
 }
 
 func TestVerifySignatureBadSig(t *testing.T) {
-       PermissionSecret = []byte(known_key)
+       PermissionSecret = []byte(knownKey)
        defer func() { PermissionSecret = nil }()
 
-       bad_locator := known_locator + "+Aaaaaaaaaaaaaaaa@" + known_timestamp
-       if VerifySignature(bad_locator, known_token) != PermissionError {
+       badLocator := knownLocator + "+Aaaaaaaaaaaaaaaa@" + knownTimestamp
+       if VerifySignature(badLocator, knownToken) != PermissionError {
                t.Fail()
        }
 }
 
 func TestVerifySignatureBadTimestamp(t *testing.T) {
-       PermissionSecret = []byte(known_key)
+       PermissionSecret = []byte(knownKey)
        defer func() { PermissionSecret = nil }()
 
-       bad_locator := known_locator + "+A" + known_signature + "@OOOOOOOl"
-       if VerifySignature(bad_locator, known_token) != PermissionError {
+       badLocator := knownLocator + "+A" + knownSignature + "@OOOOOOOl"
+       if VerifySignature(badLocator, knownToken) != PermissionError {
                t.Fail()
        }
 }
@@ -99,27 +99,27 @@ func TestVerifySignatureBadSecret(t *testing.T) {
        PermissionSecret = []byte("00000000000000000000")
        defer func() { PermissionSecret = nil }()
 
-       if VerifySignature(known_signed_locator, known_token) != PermissionError {
+       if VerifySignature(knownSignedLocator, knownToken) != PermissionError {
                t.Fail()
        }
 }
 
 func TestVerifySignatureBadToken(t *testing.T) {
-       PermissionSecret = []byte(known_key)
+       PermissionSecret = []byte(knownKey)
        defer func() { PermissionSecret = nil }()
 
-       if VerifySignature(known_signed_locator, "00000000") != PermissionError {
+       if VerifySignature(knownSignedLocator, "00000000") != PermissionError {
                t.Fail()
        }
 }
 
 func TestVerifySignatureExpired(t *testing.T) {
-       PermissionSecret = []byte(known_key)
+       PermissionSecret = []byte(knownKey)
        defer func() { PermissionSecret = nil }()
 
        yesterday := time.Now().AddDate(0, 0, -1)
-       expired_locator := SignLocator(known_hash, known_token, yesterday)
-       if VerifySignature(expired_locator, known_token) != ExpiredError {
+       expiredLocator := SignLocator(knownHash, knownToken, yesterday)
+       if VerifySignature(expiredLocator, knownToken) != ExpiredError {
                t.Fail()
        }
 }
index acf861119f47fd1b765bcad461d826c369151968..9f0b96fa35b1c32af4b06be92ea02632547ef249 100644 (file)
@@ -11,19 +11,18 @@ import (
        "time"
 )
 
-/*
-       Keepstore initiates pull worker channel goroutine.
-       The channel will process pull list.
-               For each (next) pull request:
-                       For each locator listed, execute Pull on the server(s) listed
-                       Skip the rest of the servers if no errors
-               Repeat
-*/
+// RunPullWorker is used by Keepstore to initiate pull worker channel goroutine.
+//     The channel will process pull list.
+//             For each (next) pull request:
+//                     For each locator listed, execute Pull on the server(s) listed
+//                     Skip the rest of the servers if no errors
+//             Repeat
+//
 func RunPullWorker(pullq *WorkQueue, keepClient *keepclient.KeepClient) {
        nextItem := pullq.NextItem
        for item := range nextItem {
                pullRequest := item.(PullRequest)
-               err := PullItemAndProcess(item.(PullRequest), GenerateRandomApiToken(), keepClient)
+               err := PullItemAndProcess(item.(PullRequest), GenerateRandomAPIToken(), keepClient)
                pullq.DoneItem <- struct{}{}
                if err == nil {
                        log.Printf("Pull %s success", pullRequest)
@@ -33,25 +32,25 @@ func RunPullWorker(pullq *WorkQueue, keepClient *keepclient.KeepClient) {
        }
 }
 
-/*
-       For each Pull request:
-               Generate a random API token.
-               Generate a permission signature using this token, timestamp ~60 seconds in the future, and desired block hash.
-               Using this token & signature, retrieve the given block.
-               Write to storage
-*/
+// PullItemAndProcess pulls items from PullQueue and processes them.
+//     For each Pull request:
+//             Generate a random API token.
+//             Generate a permission signature using this token, timestamp ~60 seconds in the future, and desired block hash.
+//             Using this token & signature, retrieve the given block.
+//             Write to storage
+//
 func PullItemAndProcess(pullRequest PullRequest, token string, keepClient *keepclient.KeepClient) (err error) {
        keepClient.Arvados.ApiToken = token
 
-       service_roots := make(map[string]string)
+       serviceRoots := make(map[string]string)
        for _, addr := range pullRequest.Servers {
-               service_roots[addr] = addr
+               serviceRoots[addr] = addr
        }
-       keepClient.SetServiceRoots(service_roots, nil, nil)
+       keepClient.SetServiceRoots(serviceRoots, nil, nil)
 
        // Generate signature with a random token
-       expires_at := time.Now().Add(60 * time.Second)
-       signedLocator := SignLocator(pullRequest.Locator, token, expires_at)
+       expiresAt := time.Now().Add(60 * time.Second)
+       signedLocator := SignLocator(pullRequest.Locator, token, expiresAt)
 
        reader, contentLen, _, err := GetContent(signedLocator, keepClient)
        if err != nil {
@@ -62,16 +61,16 @@ func PullItemAndProcess(pullRequest PullRequest, token string, keepClient *keepc
        }
        defer reader.Close()
 
-       read_content, err := ioutil.ReadAll(reader)
+       readContent, err := ioutil.ReadAll(reader)
        if err != nil {
                return err
        }
 
-       if (read_content == nil) || (int64(len(read_content)) != contentLen) {
+       if (readContent == nil) || (int64(len(readContent)) != contentLen) {
                return errors.New(fmt.Sprintf("Content not found for: %s", signedLocator))
        }
 
-       err = PutContent(read_content, pullRequest.Locator)
+       err = PutContent(readContent, pullRequest.Locator)
        return
 }
 
@@ -82,13 +81,14 @@ var GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (
        return reader, blocklen, url, err
 }
 
-const ALPHA_NUMERIC = "0123456789abcdefghijklmnopqrstuvwxyz"
+const alphaNumeric = "0123456789abcdefghijklmnopqrstuvwxyz"
 
-func GenerateRandomApiToken() string {
+// GenerateRandomAPIToken generates a random api token
+func GenerateRandomAPIToken() string {
        var bytes = make([]byte, 36)
        rand.Read(bytes)
        for i, b := range bytes {
-               bytes[i] = ALPHA_NUMERIC[b%byte(len(ALPHA_NUMERIC))]
+               bytes[i] = alphaNumeric[b%byte(len(alphaNumeric))]
        }
        return (string(bytes))
 }
index 3e57407369c0dccf8216e3e0835820c0bc419b55..e0bad0045af34d8067591fa723e9229be093790c 100644 (file)
@@ -128,7 +128,7 @@ func performPullWorkerIntegrationTest(testData PullWorkIntegrationTestData, pull
                return rdr, int64(len(testData.Content)), "", nil
        }
 
-       keepClient.Arvados.ApiToken = GenerateRandomApiToken()
+       keepClient.Arvados.ApiToken = GenerateRandomAPIToken()
        err := PullItemAndProcess(pullRequest, keepClient.Arvados.ApiToken, keepClient)
 
        if len(testData.GetError) > 0 {
index 37d83b32802af1432bf7ed8f2af5826a3d757914..ed0d24ae8cb476459bb05f17aa48667d2951a8c4 100644 (file)
@@ -37,7 +37,7 @@ func (s *PullWorkerTestSuite) SetUpTest(c *C) {
 
        // When a new pull request arrives, the old one will be overwritten.
        // This behavior is verified using these two maps in the
-       // "TestPullWorker_pull_list_with_two_items_latest_replacing_old"
+       // "TestPullWorkerPullList_with_two_items_latest_replacing_old"
        testPullLists = make(map[string]string)
 }
 
@@ -53,7 +53,7 @@ func RunTestPullWorker(c *C) {
        go RunPullWorker(pullq, keepClient)
 }
 
-var first_pull_list = []byte(`[
+var firstPullList = []byte(`[
                {
                        "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
                        "servers":[
@@ -68,7 +68,7 @@ var first_pull_list = []byte(`[
                }
        ]`)
 
-var second_pull_list = []byte(`[
+var secondPullList = []byte(`[
                {
                        "locator":"73feffa4b7f6bb68e44cf984c85f6e88+3",
                        "servers":[
@@ -79,44 +79,44 @@ var second_pull_list = []byte(`[
        ]`)
 
 type PullWorkerTestData struct {
-       name          string
-       req           RequestTester
-       response_code int
-       response_body string
-       read_content  string
-       read_error    bool
-       put_error     bool
+       name         string
+       req          RequestTester
+       responseCode int
+       responseBody string
+       readContent  string
+       readError    bool
+       putError     bool
 }
 
-func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_two_locators(c *C) {
+func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
        defer teardown()
 
        data_manager_token = "DATA MANAGER TOKEN"
        testData := PullWorkerTestData{
-               name:          "TestPullWorker_pull_list_with_two_locators",
-               req:           RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
-               response_code: http.StatusOK,
-               response_body: "Received 2 pull requests\n",
-               read_content:  "hello",
-               read_error:    false,
-               put_error:     false,
+               name:         "TestPullWorkerPullList_with_two_locators",
+               req:          RequestTester{"/pull", data_manager_token, "PUT", firstPullList},
+               responseCode: http.StatusOK,
+               responseBody: "Received 2 pull requests\n",
+               readContent:  "hello",
+               readError:    false,
+               putError:     false,
        }
 
        performTest(testData, c)
 }
 
-func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_one_locator(c *C) {
+func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
        defer teardown()
 
        data_manager_token = "DATA MANAGER TOKEN"
        testData := PullWorkerTestData{
-               name:          "TestPullWorker_pull_list_with_one_locator",
-               req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
-               response_code: http.StatusOK,
-               response_body: "Received 1 pull requests\n",
-               read_content:  "hola",
-               read_error:    false,
-               put_error:     false,
+               name:         "TestPullWorkerPullList_with_one_locator",
+               req:          RequestTester{"/pull", data_manager_token, "PUT", secondPullList},
+               responseCode: http.StatusOK,
+               responseBody: "Received 1 pull requests\n",
+               readContent:  "hola",
+               readError:    false,
+               putError:     false,
        }
 
        performTest(testData, c)
@@ -127,13 +127,13 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
 
        data_manager_token = "DATA MANAGER TOKEN"
        testData := PullWorkerTestData{
-               name:          "TestPullWorker_error_on_get_one_locator",
-               req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
-               response_code: http.StatusOK,
-               response_body: "Received 1 pull requests\n",
-               read_content:  "unused",
-               read_error:    true,
-               put_error:     false,
+               name:         "TestPullWorker_error_on_get_one_locator",
+               req:          RequestTester{"/pull", data_manager_token, "PUT", secondPullList},
+               responseCode: http.StatusOK,
+               responseBody: "Received 1 pull requests\n",
+               readContent:  "unused",
+               readError:    true,
+               putError:     false,
        }
 
        performTest(testData, c)
@@ -144,13 +144,13 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
 
        data_manager_token = "DATA MANAGER TOKEN"
        testData := PullWorkerTestData{
-               name:          "TestPullWorker_error_on_get_two_locators",
-               req:           RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
-               response_code: http.StatusOK,
-               response_body: "Received 2 pull requests\n",
-               read_content:  "unused",
-               read_error:    true,
-               put_error:     false,
+               name:         "TestPullWorker_error_on_get_two_locators",
+               req:          RequestTester{"/pull", data_manager_token, "PUT", firstPullList},
+               responseCode: http.StatusOK,
+               responseBody: "Received 2 pull requests\n",
+               readContent:  "unused",
+               readError:    true,
+               putError:     false,
        }
 
        performTest(testData, c)
@@ -161,13 +161,13 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
 
        data_manager_token = "DATA MANAGER TOKEN"
        testData := PullWorkerTestData{
-               name:          "TestPullWorker_error_on_put_one_locator",
-               req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
-               response_code: http.StatusOK,
-               response_body: "Received 1 pull requests\n",
-               read_content:  "hello hello",
-               read_error:    false,
-               put_error:     true,
+               name:         "TestPullWorker_error_on_put_one_locator",
+               req:          RequestTester{"/pull", data_manager_token, "PUT", secondPullList},
+               responseCode: http.StatusOK,
+               responseBody: "Received 1 pull requests\n",
+               readContent:  "hello hello",
+               readError:    false,
+               putError:     true,
        }
 
        performTest(testData, c)
@@ -178,13 +178,13 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
 
        data_manager_token = "DATA MANAGER TOKEN"
        testData := PullWorkerTestData{
-               name:          "TestPullWorker_error_on_put_two_locators",
-               req:           RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
-               response_code: http.StatusOK,
-               response_body: "Received 2 pull requests\n",
-               read_content:  "hello again",
-               read_error:    false,
-               put_error:     true,
+               name:         "TestPullWorker_error_on_put_two_locators",
+               req:          RequestTester{"/pull", data_manager_token, "PUT", firstPullList},
+               responseCode: http.StatusOK,
+               responseBody: "Received 2 pull requests\n",
+               readContent:  "hello again",
+               readError:    false,
+               putError:     true,
        }
 
        performTest(testData, c)
@@ -194,7 +194,7 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
 // is used to check that behavior by first putting an item on the queue,
 // and then performing the test. Thus the "testPullLists" has two entries;
 // however, processedPullLists will see only the newest item in the list.
-func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_two_items_latest_replacing_old(c *C) {
+func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_items_latest_replacing_old(c *C) {
        defer teardown()
 
        var firstInput = []int{1}
@@ -204,13 +204,13 @@ func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_two_items_latest_rep
 
        data_manager_token = "DATA MANAGER TOKEN"
        testData := PullWorkerTestData{
-               name:          "TestPullWorker_pull_list_with_two_items_latest_replacing_old",
-               req:           RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
-               response_code: http.StatusOK,
-               response_body: "Received 1 pull requests\n",
-               read_content:  "hola de nuevo",
-               read_error:    false,
-               put_error:     false,
+               name:         "TestPullWorkerPullList_with_two_items_latest_replacing_old",
+               req:          RequestTester{"/pull", data_manager_token, "PUT", secondPullList},
+               responseCode: http.StatusOK,
+               responseBody: "Received 1 pull requests\n",
+               readContent:  "hola de nuevo",
+               readError:    false,
+               putError:     false,
        }
 
        performTest(testData, c)
@@ -223,13 +223,13 @@ func (s *PullWorkerTestSuite) TestPullWorker_invalid_data_manager_token(c *C) {
        data_manager_token = "DATA MANAGER TOKEN"
 
        testData := PullWorkerTestData{
-               name:          "TestPullWorker_pull_list_with_two_locators",
-               req:           RequestTester{"/pull", "invalid_data_manager_token", "PUT", first_pull_list},
-               response_code: http.StatusUnauthorized,
-               response_body: "Unauthorized\n",
-               read_content:  "hello",
-               read_error:    false,
-               put_error:     false,
+               name:         "TestPullWorkerPullList_with_two_locators",
+               req:          RequestTester{"/pull", "invalid_data_manager_token", "PUT", firstPullList},
+               responseCode: http.StatusUnauthorized,
+               responseBody: "Unauthorized\n",
+               readContent:  "hello",
+               readError:    false,
+               putError:     false,
        }
 
        performTest(testData, c)
@@ -243,7 +243,7 @@ func performTest(testData PullWorkerTestData, c *C) {
        defer pullq.Close()
 
        currentTestData = testData
-       testPullLists[testData.name] = testData.response_body
+       testPullLists[testData.name] = testData.responseBody
 
        processedPullLists := make(map[string]string)
 
@@ -253,53 +253,51 @@ func performTest(testData PullWorkerTestData, c *C) {
        }(GetContent)
        GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
                c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(1))
-               processedPullLists[testData.name] = testData.response_body
-               if testData.read_error {
+               processedPullLists[testData.name] = testData.responseBody
+               if testData.readError {
                        err = errors.New("Error getting data")
                        readError = err
                        return nil, 0, "", err
-               } else {
-                       readContent = testData.read_content
-                       cb := &ClosingBuffer{bytes.NewBufferString(testData.read_content)}
-                       var rc io.ReadCloser
-                       rc = cb
-                       return rc, int64(len(testData.read_content)), "", nil
                }
+               readContent = testData.readContent
+               cb := &ClosingBuffer{bytes.NewBufferString(testData.readContent)}
+               var rc io.ReadCloser
+               rc = cb
+               return rc, int64(len(testData.readContent)), "", nil
        }
 
        // Override PutContent to mock PutBlock functionality
        defer func(orig func([]byte, string) error) { PutContent = orig }(PutContent)
        PutContent = func(content []byte, locator string) (err error) {
-               if testData.put_error {
+               if testData.putError {
                        err = errors.New("Error putting data")
                        putError = err
                        return err
-               } else {
-                       putContent = content
-                       return nil
                }
+               putContent = content
+               return nil
        }
 
        c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
        c.Assert(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
 
        response := IssueRequest(&testData.req)
-       c.Assert(response.Code, Equals, testData.response_code)
-       c.Assert(response.Body.String(), Equals, testData.response_body)
+       c.Assert(response.Code, Equals, testData.responseCode)
+       c.Assert(response.Body.String(), Equals, testData.responseBody)
 
        expectEqualWithin(c, time.Second, 0, func() interface{} {
                st := pullq.Status()
                return st.InProgress + st.Queued
        })
 
-       if testData.name == "TestPullWorker_pull_list_with_two_items_latest_replacing_old" {
+       if testData.name == "TestPullWorkerPullList_with_two_items_latest_replacing_old" {
                c.Assert(len(testPullLists), Equals, 2)
                c.Assert(len(processedPullLists), Equals, 1)
                c.Assert(testPullLists["Added_before_actual_test_item"], NotNil)
-               c.Assert(testPullLists["TestPullWorker_pull_list_with_two_items_latest_replacing_old"], NotNil)
-               c.Assert(processedPullLists["TestPullWorker_pull_list_with_two_items_latest_replacing_old"], NotNil)
+               c.Assert(testPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
+               c.Assert(processedPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
        } else {
-               if testData.response_code == http.StatusOK {
+               if testData.responseCode == http.StatusOK {
                        c.Assert(len(testPullLists), Equals, 1)
                        c.Assert(len(processedPullLists), Equals, 1)
                        c.Assert(testPullLists[testData.name], NotNil)
@@ -309,16 +307,16 @@ func performTest(testData PullWorkerTestData, c *C) {
                }
        }
 
-       if testData.read_error {
+       if testData.readError {
                c.Assert(readError, NotNil)
-       } else if testData.response_code == http.StatusOK {
+       } else if testData.responseCode == http.StatusOK {
                c.Assert(readError, IsNil)
-               c.Assert(readContent, Equals, testData.read_content)
-               if testData.put_error {
+               c.Assert(readContent, Equals, testData.readContent)
+               if testData.putError {
                        c.Assert(putError, NotNil)
                } else {
                        c.Assert(putError, IsNil)
-                       c.Assert(string(putContent), Equals, testData.read_content)
+                       c.Assert(string(putContent), Equals, testData.readContent)
                }
        }
 
index 8f78658c3a7496473c2d81a7f0d7b13213ef9d5f..7e19ba41c5d8250f0d678ea0c107972e3c077374 100644 (file)
@@ -6,14 +6,12 @@ import (
        "time"
 )
 
-/*
-       Keepstore initiates trash worker channel goroutine.
-       The channel will process trash list.
-               For each (next) trash request:
-      Delete the block indicated by the trash request Locator
-               Repeat
-*/
-
+// RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine.
+//     The channel will process trash list.
+//             For each (next) trash request:
+//      Delete the block indicated by the trash request Locator
+//             Repeat
+//
 func RunTrashWorker(trashq *WorkQueue) {
        for item := range trashq.NextItem {
                trashRequest := item.(TrashRequest)
index 40b291e6f3a0d268eef374d43a6f489701d02ab9..016ad28cf31943afb40a34846bbe4f11e64f3130 100644 (file)
@@ -55,15 +55,15 @@ func TestTrashWorkerIntegration_GetNonExistingLocator(t *testing.T) {
 func TestTrashWorkerIntegration_LocatorInVolume1(t *testing.T) {
        never_delete = false
        testData := TrashWorkerTestData{
-               Locator1: TEST_HASH,
-               Block1:   TEST_BLOCK,
+               Locator1: TestHash,
+               Block1:   TestBlock,
 
-               Locator2: TEST_HASH_2,
-               Block2:   TEST_BLOCK_2,
+               Locator2: TestHash2,
+               Block2:   TestBlock2,
 
                CreateData: true,
 
-               DeleteLocator: TEST_HASH, // first locator
+               DeleteLocator: TestHash, // first locator
 
                ExpectLocator1: false,
                ExpectLocator2: true,
@@ -77,15 +77,15 @@ func TestTrashWorkerIntegration_LocatorInVolume1(t *testing.T) {
 func TestTrashWorkerIntegration_LocatorInVolume2(t *testing.T) {
        never_delete = false
        testData := TrashWorkerTestData{
-               Locator1: TEST_HASH,
-               Block1:   TEST_BLOCK,
+               Locator1: TestHash,
+               Block1:   TestBlock,
 
-               Locator2: TEST_HASH_2,
-               Block2:   TEST_BLOCK_2,
+               Locator2: TestHash2,
+               Block2:   TestBlock2,
 
                CreateData: true,
 
-               DeleteLocator: TEST_HASH_2, // locator 2
+               DeleteLocator: TestHash2, // locator 2
 
                ExpectLocator1: true,
                ExpectLocator2: false,
@@ -99,15 +99,15 @@ func TestTrashWorkerIntegration_LocatorInVolume2(t *testing.T) {
 func TestTrashWorkerIntegration_LocatorInBothVolumes(t *testing.T) {
        never_delete = false
        testData := TrashWorkerTestData{
-               Locator1: TEST_HASH,
-               Block1:   TEST_BLOCK,
+               Locator1: TestHash,
+               Block1:   TestBlock,
 
-               Locator2: TEST_HASH,
-               Block2:   TEST_BLOCK,
+               Locator2: TestHash,
+               Block2:   TestBlock,
 
                CreateData: true,
 
-               DeleteLocator: TEST_HASH,
+               DeleteLocator: TestHash,
 
                ExpectLocator1: false,
                ExpectLocator2: false,
@@ -121,16 +121,16 @@ func TestTrashWorkerIntegration_LocatorInBothVolumes(t *testing.T) {
 func TestTrashWorkerIntegration_MtimeMatchesForLocator1ButNotForLocator2(t *testing.T) {
        never_delete = false
        testData := TrashWorkerTestData{
-               Locator1: TEST_HASH,
-               Block1:   TEST_BLOCK,
+               Locator1: TestHash,
+               Block1:   TestBlock,
 
-               Locator2: TEST_HASH,
-               Block2:   TEST_BLOCK,
+               Locator2: TestHash,
+               Block2:   TestBlock,
 
                CreateData:      true,
                DifferentMtimes: true,
 
-               DeleteLocator: TEST_HASH,
+               DeleteLocator: TestHash,
 
                ExpectLocator1: true,
                ExpectLocator2: false,
@@ -145,16 +145,16 @@ func TestTrashWorkerIntegration_MtimeMatchesForLocator1ButNotForLocator2(t *test
 func TestTrashWorkerIntegration_TwoDifferentLocatorsInVolume1(t *testing.T) {
        never_delete = false
        testData := TrashWorkerTestData{
-               Locator1: TEST_HASH,
-               Block1:   TEST_BLOCK,
+               Locator1: TestHash,
+               Block1:   TestBlock,
 
-               Locator2: TEST_HASH_2,
-               Block2:   TEST_BLOCK_2,
+               Locator2: TestHash2,
+               Block2:   TestBlock2,
 
                CreateData:      true,
                CreateInVolume1: true,
 
-               DeleteLocator: TEST_HASH, // locator 1
+               DeleteLocator: TestHash, // locator 1
 
                ExpectLocator1: false,
                ExpectLocator2: true,
@@ -168,18 +168,18 @@ func TestTrashWorkerIntegration_TwoDifferentLocatorsInVolume1(t *testing.T) {
 func TestTrashWorkerIntegration_SameLocatorInTwoVolumesWithDefaultTrashLifeTime(t *testing.T) {
        never_delete = false
        testData := TrashWorkerTestData{
-               Locator1: TEST_HASH,
-               Block1:   TEST_BLOCK,
+               Locator1: TestHash,
+               Block1:   TestBlock,
 
-               Locator2: TEST_HASH_2,
-               Block2:   TEST_BLOCK_2,
+               Locator2: TestHash2,
+               Block2:   TestBlock2,
 
                CreateData:      true,
                CreateInVolume1: true,
 
                UseTrashLifeTime: true,
 
-               DeleteLocator: TEST_HASH, // locator 1
+               DeleteLocator: TestHash, // locator 1
 
                // Since trash life time is in effect, block won't be deleted.
                ExpectLocator1: true,
@@ -194,15 +194,15 @@ func TestTrashWorkerIntegration_SameLocatorInTwoVolumesWithDefaultTrashLifeTime(
 func TestTrashWorkerIntegration_NeverDelete(t *testing.T) {
        never_delete = true
        testData := TrashWorkerTestData{
-               Locator1: TEST_HASH,
-               Block1:   TEST_BLOCK,
+               Locator1: TestHash,
+               Block1:   TestBlock,
 
-               Locator2: TEST_HASH,
-               Block2:   TEST_BLOCK,
+               Locator2: TestHash,
+               Block2:   TestBlock,
 
                CreateData: true,
 
-               DeleteLocator: TEST_HASH,
+               DeleteLocator: TestHash,
 
                ExpectLocator1: true,
                ExpectLocator2: true,
@@ -290,7 +290,7 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
        expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.Status().InProgress })
 
        // Verify Locator1 to be un/deleted as expected
-       data, _ := GetBlock(testData.Locator1, false)
+       data, _ := GetBlock(testData.Locator1)
        if testData.ExpectLocator1 {
                if len(data) == 0 {
                        t.Errorf("Expected Locator1 to be still present: %s", testData.Locator1)
@@ -303,7 +303,7 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
 
        // Verify Locator2 to be un/deleted as expected
        if testData.Locator1 != testData.Locator2 {
-               data, _ = GetBlock(testData.Locator2, false)
+               data, _ = GetBlock(testData.Locator2)
                if testData.ExpectLocator2 {
                        if len(data) == 0 {
                                t.Errorf("Expected Locator2 to be still present: %s", testData.Locator2)
index 64fea34bfe1c32ad9b6b6b33a74c82f8b9f0252f..e6a0f27f52df895d4b38a7f29ebd68ad97cbafdb 100644 (file)
@@ -1,7 +1,3 @@
-// A Volume is an interface representing a Keep back-end storage unit:
-// for example, a single mounted disk, a RAID array, an Amazon S3 volume,
-// etc.
-
 package main
 
 import (
@@ -10,18 +6,194 @@ import (
        "time"
 )
 
+// A Volume is an interface representing a Keep back-end storage unit:
+// for example, a single mounted disk, a RAID array, an Amazon S3 volume,
+// etc.
 type Volume interface {
        // Get a block. IFF the returned error is nil, the caller must
        // put the returned slice back into the buffer pool when it's
-       // finished with it.
+       // finished with it. (Otherwise, the buffer pool will be
+       // depleted and eventually -- when all available buffers are
+       // used and not returned -- operations will reach deadlock.)
+       //
+       // loc is guaranteed to consist of 32 or more lowercase hex
+       // digits.
+       //
+       // Get should not verify the integrity of the returned data:
+       // it should just return whatever was found in its backing
+       // store. (Integrity checking is the caller's responsibility.)
+       //
+       // If an error is encountered that prevents it from
+       // retrieving the data, that error should be returned so the
+       // caller can log (and send to the client) a more useful
+       // message.
+       //
+       // If the error is "not found", and there's no particular
+       // reason to expect the block to be found (other than that a
+       // caller is asking for it), the returned error should satisfy
+       // os.IsNotExist(err): this is a normal condition and will not
+       // be logged as an error (except that a 404 will appear in the
+       // access log if the block is not found on any other volumes
+       // either).
+       //
+       // If the data in the backing store is bigger than BlockSize,
+       // Get is permitted to return an error without reading any of
+       // the data.
        Get(loc string) ([]byte, error)
+
+       // Compare the given data with the stored data (i.e., what Get
+       // would return). If equal, return nil. If not, return
+       // CollisionError or DiskHashError (depending on whether the
+       // data on disk matches the expected hash), or whatever error
+       // was encountered opening/reading the stored data.
+       Compare(loc string, data []byte) error
+
+       // Put writes a block to an underlying storage device.
+       //
+       // loc is as described in Get.
+       //
+       // len(block) is guaranteed to be between 0 and BlockSize.
+       //
+       // If a block is already stored under the same name (loc) with
+       // different content, Put must either overwrite the existing
+       // data with the new data or return a non-nil error. When
+       // overwriting existing data, it must never leave the storage
+       // device in an inconsistent state: a subsequent call to Get
+       // must return either the entire old block, the entire new
+       // block, or an error. (An implementation that cannot peform
+       // atomic updates must leave the old data alone and return an
+       // error.)
+       //
+       // Put also sets the timestamp for the given locator to the
+       // current time.
+       //
+       // Put must return a non-nil error unless it can guarantee
+       // that the entire block has been written and flushed to
+       // persistent storage, and that its timestamp is current. Of
+       // course, this guarantee is only as good as the underlying
+       // storage device, but it is Put's responsibility to at least
+       // get whatever guarantee is offered by the storage device.
+       //
+       // Put should not verify that loc==hash(block): this is the
+       // caller's responsibility.
        Put(loc string, block []byte) error
+
+       // Touch sets the timestamp for the given locator to the
+       // current time.
+       //
+       // loc is as described in Get.
+       //
+       // If invoked at time t0, Touch must guarantee that a
+       // subsequent call to Mtime will return a timestamp no older
+       // than {t0 minus one second}. For example, if Touch is called
+       // at 2015-07-07T01:23:45.67890123Z, it is acceptable for a
+       // subsequent Mtime to return any of the following:
+       //
+       //   - 2015-07-07T01:23:45.00000000Z
+       //   - 2015-07-07T01:23:45.67890123Z
+       //   - 2015-07-07T01:23:46.67890123Z
+       //   - 2015-07-08T00:00:00.00000000Z
+       //
+       // It is not acceptable for a subsequente Mtime to return
+       // either of the following:
+       //
+       //   - 2015-07-07T00:00:00.00000000Z -- ERROR
+       //   - 2015-07-07T01:23:44.00000000Z -- ERROR
+       //
+       // Touch must return a non-nil error if the timestamp cannot
+       // be updated.
        Touch(loc string) error
+
+       // Mtime returns the stored timestamp for the given locator.
+       //
+       // loc is as described in Get.
+       //
+       // Mtime must return a non-nil error if the given block is not
+       // found or the timestamp could not be retrieved.
        Mtime(loc string) (time.Time, error)
+
+       // IndexTo writes a complete list of locators with the given
+       // prefix for which Get() can retrieve data.
+       //
+       // prefix consists of zero or more lowercase hexadecimal
+       // digits.
+       //
+       // Each locator must be written to the given writer using the
+       // following format:
+       //
+       //   loc "+" size " " timestamp "\n"
+       //
+       // where:
+       //
+       //   - size is the number of bytes of content, given as a
+       //     decimal number with one or more digits
+       //
+       //   - timestamp is the timestamp stored for the locator,
+       //     given as a decimal number of seconds after January 1,
+       //     1970 UTC.
+       //
+       // IndexTo must not write any other data to writer: for
+       // example, it must not write any blank lines.
+       //
+       // If an error makes it impossible to provide a complete
+       // index, IndexTo must return a non-nil error. It is
+       // acceptable to return a non-nil error after writing a
+       // partial index to writer.
+       //
+       // The resulting index is not expected to be sorted in any
+       // particular order.
        IndexTo(prefix string, writer io.Writer) error
+
+       // Delete deletes the block data from the underlying storage
+       // device.
+       //
+       // loc is as described in Get.
+       //
+       // If the timestamp for the given locator is newer than
+       // blob_signature_ttl, Delete must not delete the data.
+       //
+       // If a Delete operation overlaps with any Touch or Put
+       // operations on the same locator, the implementation must
+       // ensure one of the following outcomes:
+       //
+       //   - Touch and Put return a non-nil error, or
+       //   - Delete does not delete the block, or
+       //   - Both of the above.
+       //
+       // If it is possible for the storage device to be accessed by
+       // a different process or host, the synchronization mechanism
+       // should also guard against races with other processes and
+       // hosts. If such a mechanism is not available, there must be
+       // a mechanism for detecting unsafe configurations, alerting
+       // the operator, and aborting or falling back to a read-only
+       // state. In other words, running multiple keepstore processes
+       // with the same underlying storage device must either work
+       // reliably or fail outright.
+       //
+       // Corollary: A successful Touch or Put guarantees a block
+       // will not be deleted for at least blob_signature_ttl
+       // seconds.
        Delete(loc string) error
+
+       // Status returns a *VolumeStatus representing the current
+       // in-use and available storage capacity and an
+       // implementation-specific volume identifier (e.g., "mount
+       // point" for a UnixVolume).
        Status() *VolumeStatus
+
+       // String returns an identifying label for this volume,
+       // suitable for including in log messages. It should contain
+       // enough information to uniquely identify the underlying
+       // storage device, but should not contain any credentials or
+       // secrets.
        String() string
+
+       // Writable returns false if all future Put, Mtime, and Delete
+       // calls are expected to fail.
+       //
+       // If the volume is only temporarily unwritable -- or if Put
+       // will fail because it is full, but Mtime or Delete can
+       // succeed -- then Writable should return false.
        Writable() bool
 }
 
@@ -30,25 +202,32 @@ type Volume interface {
 type VolumeManager interface {
        // AllReadable returns all volumes.
        AllReadable() []Volume
+
        // AllWritable returns all volumes that aren't known to be in
        // a read-only state. (There is no guarantee that a write to
        // one will succeed, though.)
        AllWritable() []Volume
+
        // NextWritable returns the volume where the next new block
        // should be written. A VolumeManager can select a volume in
        // order to distribute activity across spindles, fill up disks
        // with more free space, etc.
        NextWritable() Volume
+
        // Close shuts down the volume manager cleanly.
        Close()
 }
 
+// RRVolumeManager is a round-robin VolumeManager: the Nth call to
+// NextWritable returns the (N % len(writables))th writable Volume
+// (where writables are all Volumes v where v.Writable()==true).
 type RRVolumeManager struct {
        readables []Volume
        writables []Volume
        counter   uint32
 }
 
+// MakeRRVolumeManager initializes RRVolumeManager
 func MakeRRVolumeManager(volumes []Volume) *RRVolumeManager {
        vm := &RRVolumeManager{}
        for _, v := range volumes {
@@ -60,14 +239,17 @@ func MakeRRVolumeManager(volumes []Volume) *RRVolumeManager {
        return vm
 }
 
+// AllReadable returns an array of all readable volumes
 func (vm *RRVolumeManager) AllReadable() []Volume {
        return vm.readables
 }
 
+// AllWritable returns an array of all writable volumes
 func (vm *RRVolumeManager) AllWritable() []Volume {
        return vm.writables
 }
 
+// NextWritable returns the next writable
 func (vm *RRVolumeManager) NextWritable() Volume {
        if len(vm.writables) == 0 {
                return nil
@@ -76,5 +258,18 @@ func (vm *RRVolumeManager) NextWritable() Volume {
        return vm.writables[i%uint32(len(vm.writables))]
 }
 
+// Close the RRVolumeManager
 func (vm *RRVolumeManager) Close() {
 }
+
+// VolumeStatus provides status information of the volume consisting of:
+//   * mount_point
+//   * device_num (an integer identifying the underlying storage system)
+//   * bytes_free
+//   * bytes_used
+type VolumeStatus struct {
+       MountPoint string `json:"mount_point"`
+       DeviceNum  uint64 `json:"device_num"`
+       BytesFree  uint64 `json:"bytes_free"`
+       BytesUsed  uint64 `json:"bytes_used"`
+}
diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go
new file mode 100644 (file)
index 0000000..0218d40
--- /dev/null
@@ -0,0 +1,614 @@
+package main
+
+import (
+       "bytes"
+       "os"
+       "regexp"
+       "sort"
+       "strings"
+       "testing"
+       "time"
+)
+
+// A TestableVolumeFactory returns a new TestableVolume. The factory
+// function, and the TestableVolume it returns, can use "t" to write
+// logs, fail the current test, etc.
+type TestableVolumeFactory func(t *testing.T) TestableVolume
+
+// DoGenericVolumeTests runs a set of tests that every TestableVolume
+// is expected to pass. It calls factory to create a new TestableVolume
+// for each test case, to avoid leaking state between tests.
+func DoGenericVolumeTests(t *testing.T, factory TestableVolumeFactory) {
+       testGet(t, factory)
+       testGetNoSuchBlock(t, factory)
+
+       testCompareSameContent(t, factory)
+       testCompareWithDifferentContent(t, factory)
+       testCompareWithBadData(t, factory)
+
+       testPutBlockWithSameContent(t, factory)
+       testPutBlockWithDifferentContent(t, factory)
+       testPutMultipleBlocks(t, factory)
+
+       testPutAndTouch(t, factory)
+       testTouchNoSuchBlock(t, factory)
+
+       testMtimeNoSuchBlock(t, factory)
+
+       testIndexTo(t, factory)
+
+       testDeleteNewBlock(t, factory)
+       testDeleteOldBlock(t, factory)
+       testDeleteNoSuchBlock(t, factory)
+
+       testStatus(t, factory)
+
+       testString(t, factory)
+
+       testUpdateReadOnly(t, factory)
+
+       testGetConcurrent(t, factory)
+       testPutConcurrent(t, factory)
+}
+
+// Put a test block, get it and verify content
+// Test should pass for both writable and read-only volumes
+func testGet(t *testing.T, factory TestableVolumeFactory) {
+       v := factory(t)
+       defer v.Teardown()
+
+       v.PutRaw(TestHash, TestBlock)
+
+       buf, err := v.Get(TestHash)
+       if err != nil {
+               t.Error(err)
+       }
+
+       bufs.Put(buf)
+
+       if bytes.Compare(buf, TestBlock) != 0 {
+               t.Errorf("expected %s, got %s", string(TestBlock), string(buf))
+       }
+}
+
+// Invoke get on a block that does not exist in volume; should result in error
+// Test should pass for both writable and read-only volumes
+func testGetNoSuchBlock(t *testing.T, factory TestableVolumeFactory) {
+       v := factory(t)
+       defer v.Teardown()
+
+       if _, err := v.Get(TestHash2); err == nil {
+               t.Errorf("Expected error while getting non-existing block %v", TestHash2)
+       }
+}
+
+// Put a test block and compare the locator with same content
+// Test should pass for both writable and read-only volumes
+func testCompareSameContent(t *testing.T, factory TestableVolumeFactory) {
+       v := factory(t)
+       defer v.Teardown()
+
+       v.PutRaw(TestHash, TestBlock)
+
+       // Compare the block locator with same content
+       err := v.Compare(TestHash, TestBlock)
+       if err != nil {
+               t.Errorf("Got err %q, expected nil", err)
+       }
+}
+
+// Put a test block and compare the locator with a different content
+// Expect error due to collision
+// Test should pass for both writable and read-only volumes
+func testCompareWithDifferentContent(t *testing.T, factory TestableVolumeFactory) {
+       v := factory(t)
+       defer v.Teardown()
+
+       v.PutRaw(TestHash, TestBlock)
+
+       // Compare the block locator with different content; collision
+       err := v.Compare(TestHash, []byte("baddata"))
+       if err == nil {
+               t.Errorf("Expected error due to collision")
+       }
+}
+
+// Put a test block with bad data (hash does not match, but Put does not verify)
+// Compare the locator with good data whose hash matches with locator
+// Expect error due to corruption.
+// Test should pass for both writable and read-only volumes
+func testCompareWithBadData(t *testing.T, factory TestableVolumeFactory) {
+       v := factory(t)
+       defer v.Teardown()
+
+       v.PutRaw(TestHash, []byte("baddata"))
+
+       err := v.Compare(TestHash, TestBlock)
+       if err == nil {
+               t.Errorf("Expected error due to corruption")
+       }
+}
+
+// Put a block and put again with same content
+// Test is intended for only writable volumes
+func testPutBlockWithSameContent(t *testing.T, factory TestableVolumeFactory) {
+       v := factory(t)
+       defer v.Teardown()
+
+       if v.Writable() == false {
+               return
+       }
+
+       err := v.Put(TestHash, TestBlock)
+       if err != nil {
+               t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
+       }
+
+       err = v.Put(TestHash, TestBlock)
+       if err != nil {
+               t.Errorf("Got err putting block second time %q: %q, expected nil", TestBlock, err)
+       }
+}
+
+// Put a block and put again with different content
+// Test is intended for only writable volumes
+func testPutBlockWithDifferentContent(t *testing.T, factory TestableVolumeFactory) {
+       v := factory(t)
+       defer v.Teardown()
+
+       if v.Writable() == false {
+               return
+       }
+
+       err := v.Put(TestHash, TestBlock)
+       if err != nil {
+               t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
+       }
+
+       putErr := v.Put(TestHash, TestBlock2)
+       buf, getErr := v.Get(TestHash)
+       if putErr == nil {
+               // Put must not return a nil error unless it has
+               // overwritten the existing data.
+               if bytes.Compare(buf, TestBlock2) != 0 {
+                       t.Errorf("Put succeeded but Get returned %+v, expected %+v", buf, TestBlock2)
+               }
+       } else {
+               // It is permissible for Put to fail, but it must
+               // leave us with either the original data, the new
+               // data, or nothing at all.
+               if getErr == nil && bytes.Compare(buf, TestBlock) != 0 && bytes.Compare(buf, TestBlock2) != 0 {
+                       t.Errorf("Put failed but Get returned %+v, which is neither %+v nor %+v", buf, TestBlock, TestBlock2)
+               }
+       }
+       if getErr == nil {
+               bufs.Put(buf)
+       }
+}
+
+// Put and get multiple blocks
+// Test is intended for only writable volumes
+func testPutMultipleBlocks(t *testing.T, factory TestableVolumeFactory) {
+       v := factory(t)
+       defer v.Teardown()
+
+       if v.Writable() == false {
+               return
+       }
+
+       err := v.Put(TestHash, TestBlock)
+       if err != nil {
+               t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
+       }
+
+       err = v.Put(TestHash2, TestBlock2)
+       if err != nil {
+               t.Errorf("Got err putting block %q: %q, expected nil", TestBlock2, err)
+       }
+
+       err = v.Put(TestHash3, TestBlock3)
+       if err != nil {
+               t.Errorf("Got err putting block %q: %q, expected nil", TestBlock3, err)
+       }
+
+       data, err := v.Get(TestHash)
+       if err != nil {
+               t.Error(err)
+       } else if bytes.Compare(data, TestBlock) != 0 {
+               t.Errorf("Block present, but content is incorrect: Expected: %v  Found: %v", data, TestBlock)
+       }
+       bufs.Put(data)
+
+       data, err = v.Get(TestHash2)
+       if err != nil {
+               t.Error(err)
+       } else if bytes.Compare(data, TestBlock2) != 0 {
+               t.Errorf("Block present, but content is incorrect: Expected: %v  Found: %v", data, TestBlock2)
+       }
+       bufs.Put(data)
+
+       data, err = v.Get(TestHash3)
+       if err != nil {
+               t.Error(err)
+       } else if bytes.Compare(data, TestBlock3) != 0 {
+               t.Errorf("Block present, but content is incorrect: Expected: %v  Found: %v", data, TestBlock3)
+       }
+       bufs.Put(data)
+}
+
+// testPutAndTouch
+//   Test that when applying PUT to a block that already exists,
+//   the block's modification time is updated.
+// Test is intended for only writable volumes
+func testPutAndTouch(t *testing.T, factory TestableVolumeFactory) {
+       v := factory(t)
+       defer v.Teardown()
+
+       if v.Writable() == false {
+               return
+       }
+
+       if err := v.Put(TestHash, TestBlock); err != nil {
+               t.Error(err)
+       }
+
+       // We'll verify { t0 < threshold < t1 }, where t0 is the
+       // existing block's timestamp on disk before Put() and t1 is
+       // its timestamp after Put().
+       threshold := time.Now().Add(-time.Second)
+
+       // Set the stored block's mtime far enough in the past that we
+       // can see the difference between "timestamp didn't change"
+       // and "timestamp granularity is too low".
+       v.TouchWithDate(TestHash, time.Now().Add(-20*time.Second))
+
+       // Make sure v.Mtime() agrees the above Utime really worked.
+       if t0, err := v.Mtime(TestHash); err != nil || t0.IsZero() || !t0.Before(threshold) {
+               t.Errorf("Setting mtime failed: %v, %v", t0, err)
+       }
+
+       // Write the same block again.
+       if err := v.Put(TestHash, TestBlock); err != nil {
+               t.Error(err)
+       }
+
+       // Verify threshold < t1
+       if t1, err := v.Mtime(TestHash); err != nil {
+               t.Error(err)
+       } else if t1.Before(threshold) {
+               t.Errorf("t1 %v should be >= threshold %v after v.Put ", t1, threshold)
+       }
+}
+
+// Touching a non-existing block should result in error.
+// Test should pass for both writable and read-only volumes
+func testTouchNoSuchBlock(t *testing.T, factory TestableVolumeFactory) {
+       v := factory(t)
+       defer v.Teardown()
+
+       if err := v.Touch(TestHash); err == nil {
+               t.Error("Expected error when attempted to touch a non-existing block")
+       }
+}
+
+// Invoking Mtime on a non-existing block should result in error.
+// Test should pass for both writable and read-only volumes
+func testMtimeNoSuchBlock(t *testing.T, factory TestableVolumeFactory) {
+       v := factory(t)
+       defer v.Teardown()
+
+       if _, err := v.Mtime("12345678901234567890123456789012"); err == nil {
+               t.Error("Expected error when updating Mtime on a non-existing block")
+       }
+}
+
+// Put a few blocks and invoke IndexTo with:
+// * no prefix
+// * with a prefix
+// * with no such prefix
+// Test should pass for both writable and read-only volumes
+func testIndexTo(t *testing.T, factory TestableVolumeFactory) {
+       v := factory(t)
+       defer v.Teardown()
+
+       v.PutRaw(TestHash, TestBlock)
+       v.PutRaw(TestHash2, TestBlock2)
+       v.PutRaw(TestHash3, TestBlock3)
+
+       buf := new(bytes.Buffer)
+       v.IndexTo("", buf)
+       indexRows := strings.Split(string(buf.Bytes()), "\n")
+       sort.Strings(indexRows)
+       sortedIndex := strings.Join(indexRows, "\n")
+       m, err := regexp.MatchString(
+               `^\n`+TestHash+`\+\d+ \d+\n`+
+                       TestHash3+`\+\d+ \d+\n`+
+                       TestHash2+`\+\d+ \d+$`,
+               sortedIndex)
+       if err != nil {
+               t.Error(err)
+       } else if !m {
+               t.Errorf("Got index %q for empty prefix", sortedIndex)
+       }
+
+       for _, prefix := range []string{"f", "f15", "f15ac"} {
+               buf = new(bytes.Buffer)
+               v.IndexTo(prefix, buf)
+
+               m, err := regexp.MatchString(`^`+TestHash2+`\+\d+ \d+\n$`, string(buf.Bytes()))
+               if err != nil {
+                       t.Error(err)
+               } else if !m {
+                       t.Errorf("Got index %q for prefix %s", string(buf.Bytes()), prefix)
+               }
+       }
+
+       for _, prefix := range []string{"zero", "zip", "zilch"} {
+               buf = new(bytes.Buffer)
+               v.IndexTo(prefix, buf)
+               if err != nil {
+                       t.Errorf("Got error on IndexTo with no such prefix %v", err.Error())
+               } else if buf.Len() != 0 {
+                       t.Errorf("Expected empty list for IndexTo with no such prefix %s", prefix)
+               }
+       }
+}
+
+// Calling Delete() for a block immediately after writing it (not old enough)
+// should neither delete the data nor return an error.
+// Test is intended for only writable volumes
+func testDeleteNewBlock(t *testing.T, factory TestableVolumeFactory) {
+       v := factory(t)
+       defer v.Teardown()
+
+       if v.Writable() == false {
+               return
+       }
+
+       v.Put(TestHash, TestBlock)
+
+       if err := v.Delete(TestHash); err != nil {
+               t.Error(err)
+       }
+       data, err := v.Get(TestHash)
+       if err != nil {
+               t.Error(err)
+       } else if bytes.Compare(data, TestBlock) != 0 {
+               t.Error("Block still present, but content is incorrect: %+v != %+v", data, TestBlock)
+       }
+       bufs.Put(data)
+}
+
+// Calling Delete() for a block with a timestamp older than
+// blob_signature_ttl seconds in the past should delete the data.
+// Test is intended for only writable volumes
+func testDeleteOldBlock(t *testing.T, factory TestableVolumeFactory) {
+       v := factory(t)
+       defer v.Teardown()
+
+       if v.Writable() == false {
+               return
+       }
+
+       v.Put(TestHash, TestBlock)
+       v.TouchWithDate(TestHash, time.Now().Add(-2*blob_signature_ttl*time.Second))
+
+       if err := v.Delete(TestHash); err != nil {
+               t.Error(err)
+       }
+       if _, err := v.Get(TestHash); err == nil || !os.IsNotExist(err) {
+               t.Errorf("os.IsNotExist(%v) should have been true", err.Error())
+       }
+}
+
+// Calling Delete() for a block that does not exist should result in error.
+// Test should pass for both writable and read-only volumes
+func testDeleteNoSuchBlock(t *testing.T, factory TestableVolumeFactory) {
+       v := factory(t)
+       defer v.Teardown()
+
+       if err := v.Delete(TestHash2); err == nil {
+               t.Errorf("Expected error when attempting to delete a non-existing block")
+       }
+}
+
+// Invoke Status and verify that VolumeStatus is returned
+// Test should pass for both writable and read-only volumes
+func testStatus(t *testing.T, factory TestableVolumeFactory) {
+       v := factory(t)
+       defer v.Teardown()
+
+       // Get node status and make a basic sanity check.
+       status := v.Status()
+       if status.DeviceNum == 0 {
+               t.Errorf("uninitialized device_num in %v", status)
+       }
+
+       if status.BytesFree == 0 {
+               t.Errorf("uninitialized bytes_free in %v", status)
+       }
+
+       if status.BytesUsed == 0 {
+               t.Errorf("uninitialized bytes_used in %v", status)
+       }
+}
+
+// Invoke String for the volume; expect non-empty result
+// Test should pass for both writable and read-only volumes
+func testString(t *testing.T, factory TestableVolumeFactory) {
+       v := factory(t)
+       defer v.Teardown()
+
+       if id := v.String(); len(id) == 0 {
+               t.Error("Got empty string for v.String()")
+       }
+}
+
+// Putting, updating, touching, and deleting blocks from a read-only volume result in error.
+// Test is intended for only read-only volumes
+func testUpdateReadOnly(t *testing.T, factory TestableVolumeFactory) {
+       v := factory(t)
+       defer v.Teardown()
+
+       if v.Writable() == true {
+               return
+       }
+
+       v.PutRaw(TestHash, TestBlock)
+
+       // Get from read-only volume should succeed
+       _, err := v.Get(TestHash)
+       if err != nil {
+               t.Errorf("got err %v, expected nil", err)
+       }
+
+       // Put a new block to read-only volume should result in error
+       err = v.Put(TestHash2, TestBlock2)
+       if err == nil {
+               t.Errorf("Expected error when putting block in a read-only volume")
+       }
+       _, err = v.Get(TestHash2)
+       if err == nil {
+               t.Errorf("Expected error when getting block whose put in read-only volume failed")
+       }
+
+       // Touch a block in read-only volume should result in error
+       err = v.Touch(TestHash)
+       if err == nil {
+               t.Errorf("Expected error when touching block in a read-only volume")
+       }
+
+       // Delete a block from a read-only volume should result in error
+       err = v.Delete(TestHash)
+       if err == nil {
+               t.Errorf("Expected error when deleting block from a read-only volume")
+       }
+
+       // Overwriting an existing block in read-only volume should result in error
+       err = v.Put(TestHash, TestBlock)
+       if err == nil {
+               t.Errorf("Expected error when putting block in a read-only volume")
+       }
+}
+
+// Launch concurrent Gets
+// Test should pass for both writable and read-only volumes
+func testGetConcurrent(t *testing.T, factory TestableVolumeFactory) {
+       v := factory(t)
+       defer v.Teardown()
+
+       v.PutRaw(TestHash, TestBlock)
+       v.PutRaw(TestHash2, TestBlock2)
+       v.PutRaw(TestHash3, TestBlock3)
+
+       sem := make(chan int)
+       go func(sem chan int) {
+               buf, err := v.Get(TestHash)
+               if err != nil {
+                       t.Errorf("err1: %v", err)
+               }
+               bufs.Put(buf)
+               if bytes.Compare(buf, TestBlock) != 0 {
+                       t.Errorf("buf should be %s, is %s", string(TestBlock), string(buf))
+               }
+               sem <- 1
+       }(sem)
+
+       go func(sem chan int) {
+               buf, err := v.Get(TestHash2)
+               if err != nil {
+                       t.Errorf("err2: %v", err)
+               }
+               bufs.Put(buf)
+               if bytes.Compare(buf, TestBlock2) != 0 {
+                       t.Errorf("buf should be %s, is %s", string(TestBlock2), string(buf))
+               }
+               sem <- 1
+       }(sem)
+
+       go func(sem chan int) {
+               buf, err := v.Get(TestHash3)
+               if err != nil {
+                       t.Errorf("err3: %v", err)
+               }
+               bufs.Put(buf)
+               if bytes.Compare(buf, TestBlock3) != 0 {
+                       t.Errorf("buf should be %s, is %s", string(TestBlock3), string(buf))
+               }
+               sem <- 1
+       }(sem)
+
+       // Wait for all goroutines to finish
+       for done := 0; done < 3; {
+               done += <-sem
+       }
+}
+
+// Launch concurrent Puts
+// Test is intended for only writable volumes
+func testPutConcurrent(t *testing.T, factory TestableVolumeFactory) {
+       v := factory(t)
+       defer v.Teardown()
+
+       if v.Writable() == false {
+               return
+       }
+
+       sem := make(chan int)
+       go func(sem chan int) {
+               err := v.Put(TestHash, TestBlock)
+               if err != nil {
+                       t.Errorf("err1: %v", err)
+               }
+               sem <- 1
+       }(sem)
+
+       go func(sem chan int) {
+               err := v.Put(TestHash2, TestBlock2)
+               if err != nil {
+                       t.Errorf("err2: %v", err)
+               }
+               sem <- 1
+       }(sem)
+
+       go func(sem chan int) {
+               err := v.Put(TestHash3, TestBlock3)
+               if err != nil {
+                       t.Errorf("err3: %v", err)
+               }
+               sem <- 1
+       }(sem)
+
+       // Wait for all goroutines to finish
+       for done := 0; done < 3; {
+               done += <-sem
+       }
+
+       // Double check that we actually wrote the blocks we expected to write.
+       buf, err := v.Get(TestHash)
+       if err != nil {
+               t.Errorf("Get #1: %v", err)
+       }
+       bufs.Put(buf)
+       if bytes.Compare(buf, TestBlock) != 0 {
+               t.Errorf("Get #1: expected %s, got %s", string(TestBlock), string(buf))
+       }
+
+       buf, err = v.Get(TestHash2)
+       if err != nil {
+               t.Errorf("Get #2: %v", err)
+       }
+       bufs.Put(buf)
+       if bytes.Compare(buf, TestBlock2) != 0 {
+               t.Errorf("Get #2: expected %s, got %s", string(TestBlock2), string(buf))
+       }
+
+       buf, err = v.Get(TestHash3)
+       if err != nil {
+               t.Errorf("Get #3: %v", err)
+       }
+       bufs.Put(buf)
+       if bytes.Compare(buf, TestBlock3) != 0 {
+               t.Errorf("Get #3: expected %s, got %s", string(TestBlock3), string(buf))
+       }
+}
index c5a7491b3d542951983ad67e7eec29baa972f966..5467ed541b3dc90974e3d70ea531971b48f70341 100644 (file)
@@ -1,6 +1,8 @@
 package main
 
 import (
+       "bytes"
+       "crypto/md5"
        "errors"
        "fmt"
        "io"
@@ -10,18 +12,39 @@ import (
        "time"
 )
 
+// A TestableVolume allows test suites to manipulate the state of an
+// underlying Volume, in order to test behavior in cases that are
+// impractical to achieve with a sequence of normal Volume operations.
+type TestableVolume interface {
+       Volume
+       // [Over]write content for a locator with the given data,
+       // bypassing all constraints like readonly and serialize.
+       PutRaw(locator string, data []byte)
+
+       // Specify the value Mtime() should return, until the next
+       // call to Touch, TouchWithDate, or Put.
+       TouchWithDate(locator string, lastPut time.Time)
+
+       // Clean up, delete temporary files.
+       Teardown()
+}
+
 // MockVolumes are test doubles for Volumes, used to test handlers.
 type MockVolume struct {
        Store      map[string][]byte
        Timestamps map[string]time.Time
+
        // Bad volumes return an error for every operation.
        Bad bool
+
        // Touchable volumes' Touch() method succeeds for a locator
        // that has been Put().
        Touchable bool
+
        // Readonly volumes return an error for Put, Delete, and
        // Touch.
        Readonly bool
+
        // Gate is a "starting gate", allowing test cases to pause
        // volume operations long enough to inspect state. Every
        // operation (except Status) starts by receiving from
@@ -29,7 +52,8 @@ type MockVolume struct {
        // channel unblocks all operations. By default, Gate is a
        // closed channel, so all operations proceed without
        // blocking. See trash_worker_test.go for an example.
-       Gate   chan struct{}
+       Gate chan struct{}
+
        called map[string]int
        mutex  sync.Mutex
 }
@@ -54,11 +78,11 @@ func CreateMockVolume() *MockVolume {
 func (v *MockVolume) CallCount(method string) int {
        v.mutex.Lock()
        defer v.mutex.Unlock()
-       if c, ok := v.called[method]; !ok {
+       c, ok := v.called[method]
+       if !ok {
                return 0
-       } else {
-               return c
        }
+       return c
 }
 
 func (v *MockVolume) gotCall(method string) {
@@ -71,6 +95,24 @@ func (v *MockVolume) gotCall(method string) {
        }
 }
 
+func (v *MockVolume) Compare(loc string, buf []byte) error {
+       v.gotCall("Compare")
+       <-v.Gate
+       if v.Bad {
+               return errors.New("Bad volume")
+       } else if block, ok := v.Store[loc]; ok {
+               if fmt.Sprintf("%x", md5.Sum(block)) != loc {
+                       return DiskHashError
+               }
+               if bytes.Compare(buf, block) != 0 {
+                       return CollisionError
+               }
+               return nil
+       } else {
+               return NotFoundError
+       }
+}
+
 func (v *MockVolume) Get(loc string) ([]byte, error) {
        v.gotCall("Get")
        <-v.Gate
index a7ad6f9e499c80439c27cb1beed33060674ed776..f4b49710e9192deabb2119d72ccbb389edcbe7bc 100644 (file)
@@ -1,8 +1,7 @@
-// A UnixVolume is a Volume backed by a locally mounted disk.
-//
 package main
 
 import (
+       "bytes"
        "fmt"
        "io"
        "io/ioutil"
@@ -19,12 +18,15 @@ import (
 
 // A UnixVolume stores and retrieves blocks in a local directory.
 type UnixVolume struct {
-       root      string // path to the volume's root directory
-       serialize bool
-       readonly  bool
-       mutex     sync.Mutex
+       // path to the volume's root directory
+       root string
+       // something to lock during IO, typically a sync.Mutex (or nil
+       // to skip locking)
+       locker   sync.Locker
+       readonly bool
 }
 
+// Touch sets the timestamp for the given locator to the current time
 func (v *UnixVolume) Touch(loc string) error {
        if v.readonly {
                return MethodDisabledError
@@ -35,9 +37,9 @@ func (v *UnixVolume) Touch(loc string) error {
                return err
        }
        defer f.Close()
-       if v.serialize {
-               v.mutex.Lock()
-               defer v.mutex.Unlock()
+       if v.locker != nil {
+               v.locker.Lock()
+               defer v.locker.Unlock()
        }
        if e := lockfile(f); e != nil {
                return e
@@ -48,44 +50,59 @@ func (v *UnixVolume) Touch(loc string) error {
        return syscall.Utime(p, &utime)
 }
 
+// Mtime returns the stored timestamp for the given locator.
 func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
        p := v.blockPath(loc)
-       if fi, err := os.Stat(p); err != nil {
+       fi, err := os.Stat(p)
+       if err != nil {
                return time.Time{}, err
-       } else {
-               return fi.ModTime(), nil
        }
+       return fi.ModTime(), nil
+}
+
+// Lock the locker (if one is in use), open the file for reading, and
+// call the given function if and when the file is ready to read.
+func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
+       if v.locker != nil {
+               v.locker.Lock()
+               defer v.locker.Unlock()
+       }
+       f, err := os.Open(path)
+       if err != nil {
+               return err
+       }
+       defer f.Close()
+       return fn(f)
+}
+
+// stat is os.Stat() with some extra sanity checks.
+func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
+       stat, err := os.Stat(path)
+       if err == nil {
+               if stat.Size() < 0 {
+                       err = os.ErrInvalid
+               } else if stat.Size() > BlockSize {
+                       err = TooLongError
+               }
+       }
+       return stat, err
 }
 
 // Get retrieves a block identified by the locator string "loc", and
 // returns its contents as a byte slice.
 //
-// If the block could not be found, opened, or read, Get returns a nil
-// slice and whatever non-nil error was returned by Stat or ReadFile.
+// Get returns a nil buffer IFF it returns a non-nil error.
 func (v *UnixVolume) Get(loc string) ([]byte, error) {
        path := v.blockPath(loc)
-       stat, err := os.Stat(path)
-       if err != nil {
-               return nil, err
-       }
-       if stat.Size() < 0 {
-               return nil, os.ErrInvalid
-       } else if stat.Size() == 0 {
-               return bufs.Get(0), nil
-       } else if stat.Size() > BLOCKSIZE {
-               return nil, TooLongError
-       }
-       f, err := os.Open(path)
+       stat, err := v.stat(path)
        if err != nil {
                return nil, err
        }
-       defer f.Close()
        buf := bufs.Get(int(stat.Size()))
-       if v.serialize {
-               v.mutex.Lock()
-               defer v.mutex.Unlock()
-       }
-       _, err = io.ReadFull(f, buf)
+       err = v.getFunc(path, func(rdr io.Reader) error {
+               _, err = io.ReadFull(rdr, buf)
+               return err
+       })
        if err != nil {
                bufs.Put(buf)
                return nil, err
@@ -93,6 +110,44 @@ func (v *UnixVolume) Get(loc string) ([]byte, error) {
        return buf, nil
 }
 
+// Compare returns nil if Get(loc) would return the same content as
+// expect. It is functionally equivalent to Get() followed by
+// bytes.Compare(), but uses less memory.
+func (v *UnixVolume) Compare(loc string, expect []byte) error {
+       path := v.blockPath(loc)
+       stat, err := v.stat(path)
+       if err != nil {
+               return err
+       }
+       bufLen := 1 << 20
+       if int64(bufLen) > stat.Size() {
+               bufLen = int(stat.Size())
+       }
+       cmp := expect
+       buf := make([]byte, bufLen)
+       return v.getFunc(path, func(rdr io.Reader) error {
+               // Loop invariants: all data read so far matched what
+               // we expected, and the first N bytes of cmp are
+               // expected to equal the next N bytes read from
+               // reader.
+               for {
+                       n, err := rdr.Read(buf)
+                       if n > len(cmp) || bytes.Compare(cmp[:n], buf[:n]) != 0 {
+                               return collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], buf[:n], rdr)
+                       }
+                       cmp = cmp[n:]
+                       if err == io.EOF {
+                               if len(cmp) != 0 {
+                                       return collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], nil, nil)
+                               }
+                               return nil
+                       } else if err != nil {
+                               return err
+                       }
+               }
+       })
+}
+
 // Put stores a block of data identified by the locator string
 // "loc".  It returns nil on success.  If the volume is full, it
 // returns a FullError.  If the write fails due to some other error,
@@ -118,9 +173,9 @@ func (v *UnixVolume) Put(loc string, block []byte) error {
        }
        bpath := v.blockPath(loc)
 
-       if v.serialize {
-               v.mutex.Lock()
-               defer v.mutex.Unlock()
+       if v.locker != nil {
+               v.locker.Lock()
+               defer v.locker.Unlock()
        }
        if _, err := tmpfile.Write(block); err != nil {
                log.Printf("%s: writing to %s: %s\n", v, bpath, err)
@@ -235,6 +290,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
        }
 }
 
+// Delete deletes the block data from the unix storage
 func (v *UnixVolume) Delete(loc string) error {
        // Touch() must be called before calling Write() on a block.  Touch()
        // also uses lockfile().  This avoids a race condition between Write()
@@ -247,9 +303,9 @@ func (v *UnixVolume) Delete(loc string) error {
        if v.readonly {
                return MethodDisabledError
        }
-       if v.serialize {
-               v.mutex.Lock()
-               defer v.mutex.Unlock()
+       if v.locker != nil {
+               v.locker.Lock()
+               defer v.locker.Unlock()
        }
        p := v.blockPath(loc)
        f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
@@ -290,7 +346,7 @@ func (v *UnixVolume) blockPath(loc string) string {
 }
 
 // IsFull returns true if the free space on the volume is less than
-// MIN_FREE_KILOBYTES.
+// MinFreeKilobytes.
 //
 func (v *UnixVolume) IsFull() (isFull bool) {
        fullSymlink := v.root + "/full"
@@ -306,7 +362,7 @@ func (v *UnixVolume) IsFull() (isFull bool) {
        }
 
        if avail, err := v.FreeDiskSpace(); err == nil {
-               isFull = avail < MIN_FREE_KILOBYTES
+               isFull = avail < MinFreeKilobytes
        } else {
                log.Printf("%s: FreeDiskSpace: %s\n", v, err)
                isFull = false
@@ -338,6 +394,7 @@ func (v *UnixVolume) String() string {
        return fmt.Sprintf("[UnixVolume %s]", v.root)
 }
 
+// Writable returns false if all future Put, Mtime, and Delete calls are expected to fail.
 func (v *UnixVolume) Writable() bool {
        return !v.readonly
 }
index ebb8421d9e1d3dbf6b8fce1f3fca0953568de931..4f1e84c0dd998d59f4ea2449c73d2d5d53a4074c 100644 (file)
@@ -2,71 +2,95 @@ package main
 
 import (
        "bytes"
+       "errors"
        "fmt"
+       "io"
        "io/ioutil"
        "os"
-       "regexp"
-       "sort"
        "strings"
+       "sync"
        "syscall"
        "testing"
        "time"
 )
 
-func TempUnixVolume(t *testing.T, serialize bool, readonly bool) *UnixVolume {
+type TestableUnixVolume struct {
+       UnixVolume
+       t *testing.T
+}
+
+func NewTestableUnixVolume(t *testing.T, serialize bool, readonly bool) *TestableUnixVolume {
        d, err := ioutil.TempDir("", "volume_test")
        if err != nil {
                t.Fatal(err)
        }
-       return &UnixVolume{
-               root:      d,
-               serialize: serialize,
-               readonly:  readonly,
+       var locker sync.Locker
+       if serialize {
+               locker = &sync.Mutex{}
+       }
+       return &TestableUnixVolume{
+               UnixVolume: UnixVolume{
+                       root:     d,
+                       locker:   locker,
+                       readonly: readonly,
+               },
+               t: t,
        }
 }
 
-func _teardown(v *UnixVolume) {
-       os.RemoveAll(v.root)
+// PutRaw writes a Keep block directly into a UnixVolume, even if
+// the volume is readonly.
+func (v *TestableUnixVolume) PutRaw(locator string, data []byte) {
+       defer func(orig bool) {
+               v.readonly = orig
+       }(v.readonly)
+       v.readonly = false
+       err := v.Put(locator, data)
+       if err != nil {
+               v.t.Fatal(err)
+       }
 }
 
-// _store writes a Keep block directly into a UnixVolume, bypassing
-// the overhead and safeguards of Put(). Useful for storing bogus data
-// and isolating unit tests from Put() behavior.
-func _store(t *testing.T, vol *UnixVolume, filename string, block []byte) {
-       blockdir := fmt.Sprintf("%s/%s", vol.root, filename[:3])
-       if err := os.MkdirAll(blockdir, 0755); err != nil {
-               t.Fatal(err)
+func (v *TestableUnixVolume) TouchWithDate(locator string, lastPut time.Time) {
+       err := syscall.Utime(v.blockPath(locator), &syscall.Utimbuf{lastPut.Unix(), lastPut.Unix()})
+       if err != nil {
+               v.t.Fatal(err)
        }
+}
 
-       blockpath := fmt.Sprintf("%s/%s", blockdir, filename)
-       if f, err := os.Create(blockpath); err == nil {
-               f.Write(block)
-               f.Close()
-       } else {
-               t.Fatal(err)
+func (v *TestableUnixVolume) Teardown() {
+       if err := os.RemoveAll(v.root); err != nil {
+               v.t.Fatal(err)
        }
 }
 
-func TestGet(t *testing.T) {
-       v := TempUnixVolume(t, false, false)
-       defer _teardown(v)
-       _store(t, v, TEST_HASH, TEST_BLOCK)
+// serialize = false; readonly = false
+func TestUnixVolumeWithGenericTests(t *testing.T) {
+       DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
+               return NewTestableUnixVolume(t, false, false)
+       })
+}
 
-       buf, err := v.Get(TEST_HASH)
-       if err != nil {
-               t.Error(err)
-       }
-       if bytes.Compare(buf, TEST_BLOCK) != 0 {
-               t.Errorf("expected %s, got %s", string(TEST_BLOCK), string(buf))
-       }
+// serialize = false; readonly = true
+func TestUnixVolumeWithGenericTestsReadOnly(t *testing.T) {
+       DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
+               return NewTestableUnixVolume(t, false, true)
+       })
+}
+
+// serialize = true; readonly = false
+func TestUnixVolumeWithGenericTestsSerialized(t *testing.T) {
+       DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
+               return NewTestableUnixVolume(t, true, false)
+       })
 }
 
 func TestGetNotFound(t *testing.T) {
-       v := TempUnixVolume(t, false, false)
-       defer _teardown(v)
-       _store(t, v, TEST_HASH, TEST_BLOCK)
+       v := NewTestableUnixVolume(t, false, false)
+       defer v.Teardown()
+       v.Put(TestHash, TestBlock)
 
-       buf, err := v.Get(TEST_HASH_2)
+       buf, err := v.Get(TestHash2)
        switch {
        case os.IsNotExist(err):
                break
@@ -77,298 +101,84 @@ func TestGetNotFound(t *testing.T) {
        }
 }
 
-func TestIndexTo(t *testing.T) {
-       v := TempUnixVolume(t, false, false)
-       defer _teardown(v)
-
-       _store(t, v, TEST_HASH, TEST_BLOCK)
-       _store(t, v, TEST_HASH_2, TEST_BLOCK_2)
-       _store(t, v, TEST_HASH_3, TEST_BLOCK_3)
-
-       buf := new(bytes.Buffer)
-       v.IndexTo("", buf)
-       index_rows := strings.Split(string(buf.Bytes()), "\n")
-       sort.Strings(index_rows)
-       sorted_index := strings.Join(index_rows, "\n")
-       m, err := regexp.MatchString(
-               `^\n`+TEST_HASH+`\+\d+ \d+\n`+
-                       TEST_HASH_3+`\+\d+ \d+\n`+
-                       TEST_HASH_2+`\+\d+ \d+$`,
-               sorted_index)
-       if err != nil {
-               t.Error(err)
-       } else if !m {
-               t.Errorf("Got index %q for empty prefix", sorted_index)
-       }
-
-       for _, prefix := range []string{"f", "f15", "f15ac"} {
-               buf = new(bytes.Buffer)
-               v.IndexTo(prefix, buf)
-               m, err := regexp.MatchString(`^`+TEST_HASH_2+`\+\d+ \d+\n$`, string(buf.Bytes()))
-               if err != nil {
-                       t.Error(err)
-               } else if !m {
-                       t.Errorf("Got index %q for prefix %q", string(buf.Bytes()), prefix)
-               }
-       }
-}
-
 func TestPut(t *testing.T) {
-       v := TempUnixVolume(t, false, false)
-       defer _teardown(v)
+       v := NewTestableUnixVolume(t, false, false)
+       defer v.Teardown()
 
-       err := v.Put(TEST_HASH, TEST_BLOCK)
+       err := v.Put(TestHash, TestBlock)
        if err != nil {
                t.Error(err)
        }
-       p := fmt.Sprintf("%s/%s/%s", v.root, TEST_HASH[:3], TEST_HASH)
+       p := fmt.Sprintf("%s/%s/%s", v.root, TestHash[:3], TestHash)
        if buf, err := ioutil.ReadFile(p); err != nil {
                t.Error(err)
-       } else if bytes.Compare(buf, TEST_BLOCK) != 0 {
+       } else if bytes.Compare(buf, TestBlock) != 0 {
                t.Errorf("Write should have stored %s, did store %s",
-                       string(TEST_BLOCK), string(buf))
+                       string(TestBlock), string(buf))
        }
 }
 
 func TestPutBadVolume(t *testing.T) {
-       v := TempUnixVolume(t, false, false)
-       defer _teardown(v)
+       v := NewTestableUnixVolume(t, false, false)
+       defer v.Teardown()
 
        os.Chmod(v.root, 000)
-       err := v.Put(TEST_HASH, TEST_BLOCK)
+       err := v.Put(TestHash, TestBlock)
        if err == nil {
                t.Error("Write should have failed")
        }
 }
 
 func TestUnixVolumeReadonly(t *testing.T) {
-       v := TempUnixVolume(t, false, false)
-       defer _teardown(v)
-
-       // First write something before marking readonly
-       err := v.Put(TEST_HASH, TEST_BLOCK)
-       if err != nil {
-               t.Error("got err %v, expected nil", err)
-       }
+       v := NewTestableUnixVolume(t, false, true)
+       defer v.Teardown()
 
-       v.readonly = true
+       v.PutRaw(TestHash, TestBlock)
 
-       _, err = v.Get(TEST_HASH)
+       _, err := v.Get(TestHash)
        if err != nil {
-               t.Error("got err %v, expected nil", err)
+               t.Errorf("got err %v, expected nil", err)
        }
 
-       err = v.Put(TEST_HASH, TEST_BLOCK)
+       err = v.Put(TestHash, TestBlock)
        if err != MethodDisabledError {
-               t.Error("got err %v, expected MethodDisabledError", err)
+               t.Errorf("got err %v, expected MethodDisabledError", err)
        }
 
-       err = v.Touch(TEST_HASH)
+       err = v.Touch(TestHash)
        if err != MethodDisabledError {
-               t.Error("got err %v, expected MethodDisabledError", err)
+               t.Errorf("got err %v, expected MethodDisabledError", err)
        }
 
-       err = v.Delete(TEST_HASH)
+       err = v.Delete(TestHash)
        if err != MethodDisabledError {
-               t.Error("got err %v, expected MethodDisabledError", err)
-       }
-}
-
-// TestPutTouch
-//     Test that when applying PUT to a block that already exists,
-//     the block's modification time is updated.
-func TestPutTouch(t *testing.T) {
-       v := TempUnixVolume(t, false, false)
-       defer _teardown(v)
-
-       if err := v.Put(TEST_HASH, TEST_BLOCK); err != nil {
-               t.Error(err)
-       }
-
-       // We'll verify { t0 < threshold < t1 }, where t0 is the
-       // existing block's timestamp on disk before Put() and t1 is
-       // its timestamp after Put().
-       threshold := time.Now().Add(-time.Second)
-
-       // Set the stored block's mtime far enough in the past that we
-       // can see the difference between "timestamp didn't change"
-       // and "timestamp granularity is too low".
-       {
-               oldtime := time.Now().Add(-20 * time.Second).Unix()
-               if err := syscall.Utime(v.blockPath(TEST_HASH),
-                       &syscall.Utimbuf{oldtime, oldtime}); err != nil {
-                       t.Error(err)
-               }
-
-               // Make sure v.Mtime() agrees the above Utime really worked.
-               if t0, err := v.Mtime(TEST_HASH); err != nil || t0.IsZero() || !t0.Before(threshold) {
-                       t.Errorf("Setting mtime failed: %v, %v", t0, err)
-               }
-       }
-
-       // Write the same block again.
-       if err := v.Put(TEST_HASH, TEST_BLOCK); err != nil {
-               t.Error(err)
-       }
-
-       // Verify threshold < t1
-       t1, err := v.Mtime(TEST_HASH)
-       if err != nil {
-               t.Error(err)
-       }
-       if t1.Before(threshold) {
-               t.Errorf("t1 %v must be >= threshold %v after v.Put ",
-                       t1, threshold)
-       }
-}
-
-// Serialization tests: launch a bunch of concurrent
-//
-// TODO(twp): show that the underlying Read/Write operations executed
-// serially and not concurrently. The easiest way to do this is
-// probably to activate verbose or debug logging, capture log output
-// and examine it to confirm that Reads and Writes did not overlap.
-//
-// TODO(twp): a proper test of I/O serialization requires that a
-// second request start while the first one is still underway.
-// Guaranteeing that the test behaves this way requires some tricky
-// synchronization and mocking.  For now we'll just launch a bunch of
-// requests simultaenously in goroutines and demonstrate that they
-// return accurate results.
-//
-func TestGetSerialized(t *testing.T) {
-       // Create a volume with I/O serialization enabled.
-       v := TempUnixVolume(t, true, false)
-       defer _teardown(v)
-
-       _store(t, v, TEST_HASH, TEST_BLOCK)
-       _store(t, v, TEST_HASH_2, TEST_BLOCK_2)
-       _store(t, v, TEST_HASH_3, TEST_BLOCK_3)
-
-       sem := make(chan int)
-       go func(sem chan int) {
-               buf, err := v.Get(TEST_HASH)
-               if err != nil {
-                       t.Errorf("err1: %v", err)
-               }
-               if bytes.Compare(buf, TEST_BLOCK) != 0 {
-                       t.Errorf("buf should be %s, is %s", string(TEST_BLOCK), string(buf))
-               }
-               sem <- 1
-       }(sem)
-
-       go func(sem chan int) {
-               buf, err := v.Get(TEST_HASH_2)
-               if err != nil {
-                       t.Errorf("err2: %v", err)
-               }
-               if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
-                       t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_2), string(buf))
-               }
-               sem <- 1
-       }(sem)
-
-       go func(sem chan int) {
-               buf, err := v.Get(TEST_HASH_3)
-               if err != nil {
-                       t.Errorf("err3: %v", err)
-               }
-               if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
-                       t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_3), string(buf))
-               }
-               sem <- 1
-       }(sem)
-
-       // Wait for all goroutines to finish
-       for done := 0; done < 3; {
-               done += <-sem
-       }
-}
-
-func TestPutSerialized(t *testing.T) {
-       // Create a volume with I/O serialization enabled.
-       v := TempUnixVolume(t, true, false)
-       defer _teardown(v)
-
-       sem := make(chan int)
-       go func(sem chan int) {
-               err := v.Put(TEST_HASH, TEST_BLOCK)
-               if err != nil {
-                       t.Errorf("err1: %v", err)
-               }
-               sem <- 1
-       }(sem)
-
-       go func(sem chan int) {
-               err := v.Put(TEST_HASH_2, TEST_BLOCK_2)
-               if err != nil {
-                       t.Errorf("err2: %v", err)
-               }
-               sem <- 1
-       }(sem)
-
-       go func(sem chan int) {
-               err := v.Put(TEST_HASH_3, TEST_BLOCK_3)
-               if err != nil {
-                       t.Errorf("err3: %v", err)
-               }
-               sem <- 1
-       }(sem)
-
-       // Wait for all goroutines to finish
-       for done := 0; done < 3; {
-               done += <-sem
-       }
-
-       // Double check that we actually wrote the blocks we expected to write.
-       buf, err := v.Get(TEST_HASH)
-       if err != nil {
-               t.Errorf("Get #1: %v", err)
-       }
-       if bytes.Compare(buf, TEST_BLOCK) != 0 {
-               t.Errorf("Get #1: expected %s, got %s", string(TEST_BLOCK), string(buf))
-       }
-
-       buf, err = v.Get(TEST_HASH_2)
-       if err != nil {
-               t.Errorf("Get #2: %v", err)
-       }
-       if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
-               t.Errorf("Get #2: expected %s, got %s", string(TEST_BLOCK_2), string(buf))
-       }
-
-       buf, err = v.Get(TEST_HASH_3)
-       if err != nil {
-               t.Errorf("Get #3: %v", err)
-       }
-       if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
-               t.Errorf("Get #3: expected %s, got %s", string(TEST_BLOCK_3), string(buf))
+               t.Errorf("got err %v, expected MethodDisabledError", err)
        }
 }
 
 func TestIsFull(t *testing.T) {
-       v := TempUnixVolume(t, false, false)
-       defer _teardown(v)
+       v := NewTestableUnixVolume(t, false, false)
+       defer v.Teardown()
 
-       full_path := v.root + "/full"
+       fullPath := v.root + "/full"
        now := fmt.Sprintf("%d", time.Now().Unix())
-       os.Symlink(now, full_path)
+       os.Symlink(now, fullPath)
        if !v.IsFull() {
                t.Errorf("%s: claims not to be full", v)
        }
-       os.Remove(full_path)
+       os.Remove(fullPath)
 
        // Test with an expired /full link.
        expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
-       os.Symlink(expired, full_path)
+       os.Symlink(expired, fullPath)
        if v.IsFull() {
                t.Errorf("%s: should no longer be full", v)
        }
 }
 
 func TestNodeStatus(t *testing.T) {
-       v := TempUnixVolume(t, false, false)
-       defer _teardown(v)
+       v := NewTestableUnixVolume(t, false, false)
+       defer v.Teardown()
 
        // Get node status and make a basic sanity check.
        volinfo := v.Status()
@@ -385,3 +195,110 @@ func TestNodeStatus(t *testing.T) {
                t.Errorf("uninitialized bytes_used in %v", volinfo)
        }
 }
+
+func TestUnixVolumeGetFuncWorkerError(t *testing.T) {
+       v := NewTestableUnixVolume(t, false, false)
+       defer v.Teardown()
+
+       v.Put(TestHash, TestBlock)
+       mockErr := errors.New("Mock error")
+       err := v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
+               return mockErr
+       })
+       if err != mockErr {
+               t.Errorf("Got %v, expected %v", err, mockErr)
+       }
+}
+
+func TestUnixVolumeGetFuncFileError(t *testing.T) {
+       v := NewTestableUnixVolume(t, false, false)
+       defer v.Teardown()
+
+       funcCalled := false
+       err := v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
+               funcCalled = true
+               return nil
+       })
+       if err == nil {
+               t.Errorf("Expected error opening non-existent file")
+       }
+       if funcCalled {
+               t.Errorf("Worker func should not have been called")
+       }
+}
+
+func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) {
+       v := NewTestableUnixVolume(t, false, false)
+       defer v.Teardown()
+
+       v.Put(TestHash, TestBlock)
+
+       mtx := NewMockMutex()
+       v.locker = mtx
+
+       funcCalled := make(chan struct{})
+       go v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
+               funcCalled <- struct{}{}
+               return nil
+       })
+       select {
+       case mtx.AllowLock <- struct{}{}:
+       case <-funcCalled:
+               t.Fatal("Function was called before mutex was acquired")
+       case <-time.After(5 * time.Second):
+               t.Fatal("Timed out before mutex was acquired")
+       }
+       select {
+       case <-funcCalled:
+       case mtx.AllowUnlock <- struct{}{}:
+               t.Fatal("Mutex was released before function was called")
+       case <-time.After(5 * time.Second):
+               t.Fatal("Timed out waiting for funcCalled")
+       }
+       select {
+       case mtx.AllowUnlock <- struct{}{}:
+       case <-time.After(5 * time.Second):
+               t.Fatal("Timed out waiting for getFunc() to release mutex")
+       }
+}
+
+func TestUnixVolumeCompare(t *testing.T) {
+       v := NewTestableUnixVolume(t, false, false)
+       defer v.Teardown()
+
+       v.Put(TestHash, TestBlock)
+       err := v.Compare(TestHash, TestBlock)
+       if err != nil {
+               t.Errorf("Got err %q, expected nil", err)
+       }
+
+       err = v.Compare(TestHash, []byte("baddata"))
+       if err != CollisionError {
+               t.Errorf("Got err %q, expected %q", err, CollisionError)
+       }
+
+       v.Put(TestHash, []byte("baddata"))
+       err = v.Compare(TestHash, TestBlock)
+       if err != DiskHashError {
+               t.Errorf("Got err %q, expected %q", err, DiskHashError)
+       }
+
+       p := fmt.Sprintf("%s/%s/%s", v.root, TestHash[:3], TestHash)
+       os.Chmod(p, 000)
+       err = v.Compare(TestHash, TestBlock)
+       if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
+               t.Errorf("Got err %q, expected %q", err, "permission denied")
+       }
+}
+
+// TODO(twp): show that the underlying Read/Write operations executed
+// serially and not concurrently. The easiest way to do this is
+// probably to activate verbose or debug logging, capture log output
+// and examine it to confirm that Reads and Writes did not overlap.
+//
+// TODO(twp): a proper test of I/O serialization requires that a
+// second request start while the first one is still underway.
+// Guaranteeing that the test behaves this way requires some tricky
+// synchronization and mocking.  For now we'll just launch a bunch of
+// requests simultaenously in goroutines and demonstrate that they
+// return accurate results.
index f1878ffbbc550250ab88c5ea9a4a694d12d63132..27646ad3d8d98c9b58e8693c079e3b40a14d1e1e 100644 (file)
@@ -84,6 +84,7 @@ package main
 
 import "container/list"
 
+// WorkQueue definition
 type WorkQueue struct {
        getStatus chan WorkQueueStatus
        newlist   chan *list.List
@@ -96,6 +97,7 @@ type WorkQueue struct {
        DoneItem chan<- struct{}
 }
 
+// WorkQueueStatus reflects the queue status.
 type WorkQueueStatus struct {
        InProgress int
        Queued     int
index 4ef4e10bc90486710d6258012d44dedc715e7613..b1494d02851f0f78b85ddc070a16211983e98b2f 100644 (file)
@@ -7,6 +7,7 @@ import time
 import libcloud.compute.base as cloud_base
 import libcloud.compute.providers as cloud_provider
 import libcloud.compute.types as cloud_types
+from libcloud.common.exceptions import BaseHTTPError
 
 from . import BaseComputeNodeDriver
 from .. import arvados_node_fqdn, arvados_timestamp, ARVADOS_TIMEFMT
@@ -49,8 +50,12 @@ class ComputeNodeDriver(BaseComputeNodeDriver):
         }
 
     def sync_node(self, cloud_node, arvados_node):
-        self.real.ex_create_tags(cloud_node,
-                                 {'hostname': arvados_node_fqdn(arvados_node)})
+        try:
+            self.real.ex_create_tags(cloud_node,
+                                     {'hostname': arvados_node_fqdn(arvados_node)})
+            return True
+        except BaseHTTPError as b:
+            return False
 
     def _init_image(self, urn):
         return "image", self.get_image(urn)
index ca9a702e0c39d374c90dd8411710742a5284fd4c..6d85e86d3274d4e6f6f7dc1fcb6935e5e5597f2b 100644 (file)
@@ -31,11 +31,11 @@ setup(name='arvados-node-manager',
         'python-daemon',
         ],
       dependency_links = [
-          "https://github.com/curoverse/libcloud/archive/apache-libcloud-0.18.1.dev2.zip"
+          "https://github.com/curoverse/libcloud/archive/apache-libcloud-0.18.1.dev3.zip"
       ],
       scripts=['bin/arvados-node-manager'],
       test_suite='tests',
-      tests_require=['mock>=1.0', "apache-libcloud==0.18.1.dev2"],
+      tests_require=['mock>=1.0', "apache-libcloud==0.18.1.dev3"],
       zip_safe=False,
       cmdclass={'egg_info': tagger},
       )