func (s *GitSuite) TestExpiredToken(c *check.C) {
for _, repo := range []string{"active/foo.git", "active/foo/.git"} {
err := s.RunGit(c, expiredToken, "fetch", repo)
- c.Assert(err, check.ErrorMatches, `.* 500 while accessing.*`)
+ c.Assert(err, check.ErrorMatches, `.* (500 while accessing|requested URL returned error: 500).*`)
}
}
func (s *GitSuite) TestShortToken(c *check.C) {
for _, repo := range []string{"active/foo.git", "active/foo/.git"} {
err := s.RunGit(c, "s3cr3t", "fetch", repo)
- c.Assert(err, check.ErrorMatches, `.* 500 while accessing.*`)
+ c.Assert(err, check.ErrorMatches, `.* (500 while accessing|requested URL returned error: 500).*`)
}
}
// Initialize a default-sized buffer pool for the benefit of test
// suites that don't run main().
func init() {
- bufs = newBufferPool(maxBuffers, BLOCKSIZE)
+ bufs = newBufferPool(maxBuffers, BlockSize)
}
// Restore sane default after bufferpool's own tests
func (s *BufferPoolSuite) TearDownTest(c *C) {
- bufs = newBufferPool(maxBuffers, BLOCKSIZE)
+ bufs = newBufferPool(maxBuffers, BlockSize)
}
func (s *BufferPoolSuite) TestBufferPoolBufSize(c *C) {
--- /dev/null
+package main
+
+import (
+ "crypto/md5"
+ "fmt"
+ "io"
+)
+
+// Compute the MD5 digest of a data block (consisting of buf1 + buf2 +
+// all bytes readable from rdr). If all data is read successfully,
+// return DiskHashError or CollisionError depending on whether it
+// matches expectMD5. If an error occurs while reading, return that
+// error.
+//
+// "content has expected MD5" is called a collision because this
+// function is used in cases where we have another block in hand with
+// the given MD5 but different content.
+func collisionOrCorrupt(expectMD5 string, buf1, buf2 []byte, rdr io.Reader) error {
+ outcome := make(chan error)
+ data := make(chan []byte, 1)
+ go func() {
+ h := md5.New()
+ for b := range data {
+ h.Write(b)
+ }
+ if fmt.Sprintf("%x", h.Sum(nil)) == expectMD5 {
+ outcome <- CollisionError
+ } else {
+ outcome <- DiskHashError
+ }
+ }()
+ data <- buf1
+ if buf2 != nil {
+ data <- buf2
+ }
+ var err error
+ for rdr != nil && err == nil {
+ buf := make([]byte, 1 << 18)
+ var n int
+ n, err = rdr.Read(buf)
+ data <- buf[:n]
+ }
+ close(data)
+ if rdr != nil && err != io.EOF {
+ <-outcome
+ return err
+ }
+ return <-outcome
+}
--- /dev/null
+package main
+
+import (
+ "bytes"
+ "testing"
+ "testing/iotest"
+
+ check "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+ check.TestingT(t)
+}
+
+var _ = check.Suite(&CollisionSuite{})
+
+type CollisionSuite struct{}
+
+func (s *CollisionSuite) TestCollisionOrCorrupt(c *check.C) {
+ fooMD5 := "acbd18db4cc2f85cedef654fccc4a4d8"
+
+ c.Check(collisionOrCorrupt(fooMD5, []byte{'f'}, []byte{'o'}, bytes.NewBufferString("o")),
+ check.Equals, CollisionError)
+ c.Check(collisionOrCorrupt(fooMD5, []byte{'f'}, nil, bytes.NewBufferString("oo")),
+ check.Equals, CollisionError)
+ c.Check(collisionOrCorrupt(fooMD5, []byte{'f'}, []byte{'o', 'o'}, nil),
+ check.Equals, CollisionError)
+ c.Check(collisionOrCorrupt(fooMD5, nil, []byte{}, bytes.NewBufferString("foo")),
+ check.Equals, CollisionError)
+ c.Check(collisionOrCorrupt(fooMD5, []byte{'f', 'o', 'o'}, nil, bytes.NewBufferString("")),
+ check.Equals, CollisionError)
+ c.Check(collisionOrCorrupt(fooMD5, nil, nil, iotest.NewReadLogger("foo: ", iotest.DataErrReader(iotest.OneByteReader(bytes.NewBufferString("foo"))))),
+ check.Equals, CollisionError)
+
+ c.Check(collisionOrCorrupt(fooMD5, []byte{'f', 'o', 'o'}, nil, bytes.NewBufferString("bar")),
+ check.Equals, DiskHashError)
+ c.Check(collisionOrCorrupt(fooMD5, []byte{'f', 'o'}, nil, nil),
+ check.Equals, DiskHashError)
+ c.Check(collisionOrCorrupt(fooMD5, []byte{}, nil, bytes.NewBufferString("")),
+ check.Equals, DiskHashError)
+ c.Check(collisionOrCorrupt(fooMD5, []byte{'f', 'O'}, nil, bytes.NewBufferString("o")),
+ check.Equals, DiskHashError)
+ c.Check(collisionOrCorrupt(fooMD5, []byte{'f', 'O', 'o'}, nil, nil),
+ check.Equals, DiskHashError)
+ c.Check(collisionOrCorrupt(fooMD5, []byte{'f', 'o'}, []byte{'O'}, nil),
+ check.Equals, DiskHashError)
+ c.Check(collisionOrCorrupt(fooMD5, []byte{'f', 'o'}, nil, bytes.NewBufferString("O")),
+ check.Equals, DiskHashError)
+
+ c.Check(collisionOrCorrupt(fooMD5, []byte{}, nil, iotest.TimeoutReader(iotest.OneByteReader(bytes.NewBufferString("foo")))),
+ check.Equals, iotest.ErrTimeout)
+}
// A RequestTester represents the parameters for an HTTP request to
// be issued on behalf of a unit test.
type RequestTester struct {
- uri string
- api_token string
- method string
- request_body []byte
+ uri string
+ apiToken string
+ method string
+ requestBody []byte
}
// Test GetBlockHandler on the following situations:
defer KeepVM.Close()
vols := KeepVM.AllWritable()
- if err := vols[0].Put(TEST_HASH, TEST_BLOCK); err != nil {
+ if err := vols[0].Put(TestHash, TestBlock); err != nil {
t.Error(err)
}
// Create locators for testing.
// Turn on permission settings so we can generate signed locators.
- enforce_permissions = true
- PermissionSecret = []byte(known_key)
+ enforcePermissions = true
+ PermissionSecret = []byte(knownKey)
blob_signature_ttl = 300 * time.Second
var (
- unsigned_locator = "/" + TEST_HASH
- valid_timestamp = time.Now().Add(blob_signature_ttl)
- expired_timestamp = time.Now().Add(-time.Hour)
- signed_locator = "/" + SignLocator(TEST_HASH, known_token, valid_timestamp)
- expired_locator = "/" + SignLocator(TEST_HASH, known_token, expired_timestamp)
+ unsignedLocator = "/" + TestHash
+ validTimestamp = time.Now().Add(blob_signature_ttl)
+ expiredTimestamp = time.Now().Add(-time.Hour)
+ signedLocator = "/" + SignLocator(TestHash, knownToken, validTimestamp)
+ expiredLocator = "/" + SignLocator(TestHash, knownToken, expiredTimestamp)
)
// -----------------
// Test unauthenticated request with permissions off.
- enforce_permissions = false
+ enforcePermissions = false
// Unauthenticated request, unsigned locator
// => OK
response := IssueRequest(
&RequestTester{
method: "GET",
- uri: unsigned_locator,
+ uri: unsignedLocator,
})
ExpectStatusCode(t,
"Unauthenticated request, unsigned locator", http.StatusOK, response)
ExpectBody(t,
"Unauthenticated request, unsigned locator",
- string(TEST_BLOCK),
+ string(TestBlock),
response)
- received_cl := response.Header().Get("Content-Length")
- expected_cl := fmt.Sprintf("%d", len(TEST_BLOCK))
- if received_cl != expected_cl {
- t.Errorf("expected Content-Length %s, got %s", expected_cl, received_cl)
+ receivedLen := response.Header().Get("Content-Length")
+ expectedLen := fmt.Sprintf("%d", len(TestBlock))
+ if receivedLen != expectedLen {
+ t.Errorf("expected Content-Length %s, got %s", expectedLen, receivedLen)
}
// ----------------
// Permissions: on.
- enforce_permissions = true
+ enforcePermissions = true
// Authenticated request, signed locator
// => OK
response = IssueRequest(&RequestTester{
- method: "GET",
- uri: signed_locator,
- api_token: known_token,
+ method: "GET",
+ uri: signedLocator,
+ apiToken: knownToken,
})
ExpectStatusCode(t,
"Authenticated request, signed locator", http.StatusOK, response)
ExpectBody(t,
- "Authenticated request, signed locator", string(TEST_BLOCK), response)
+ "Authenticated request, signed locator", string(TestBlock), response)
- received_cl = response.Header().Get("Content-Length")
- expected_cl = fmt.Sprintf("%d", len(TEST_BLOCK))
- if received_cl != expected_cl {
- t.Errorf("expected Content-Length %s, got %s", expected_cl, received_cl)
+ receivedLen = response.Header().Get("Content-Length")
+ expectedLen = fmt.Sprintf("%d", len(TestBlock))
+ if receivedLen != expectedLen {
+ t.Errorf("expected Content-Length %s, got %s", expectedLen, receivedLen)
}
// Authenticated request, unsigned locator
// => PermissionError
response = IssueRequest(&RequestTester{
- method: "GET",
- uri: unsigned_locator,
- api_token: known_token,
+ method: "GET",
+ uri: unsignedLocator,
+ apiToken: knownToken,
})
ExpectStatusCode(t, "unsigned locator", PermissionError.HTTPCode, response)
// => PermissionError
response = IssueRequest(&RequestTester{
method: "GET",
- uri: signed_locator,
+ uri: signedLocator,
})
ExpectStatusCode(t,
"Unauthenticated request, signed locator",
// Authenticated request, expired locator
// => ExpiredError
response = IssueRequest(&RequestTester{
- method: "GET",
- uri: expired_locator,
- api_token: known_token,
+ method: "GET",
+ uri: expiredLocator,
+ apiToken: knownToken,
})
ExpectStatusCode(t,
"Authenticated request, expired locator",
// Unauthenticated request, no server key
// => OK (unsigned response)
- unsigned_locator := "/" + TEST_HASH
+ unsignedLocator := "/" + TestHash
response := IssueRequest(
&RequestTester{
- method: "PUT",
- uri: unsigned_locator,
- request_body: TEST_BLOCK,
+ method: "PUT",
+ uri: unsignedLocator,
+ requestBody: TestBlock,
})
ExpectStatusCode(t,
"Unauthenticated request, no server key", http.StatusOK, response)
ExpectBody(t,
"Unauthenticated request, no server key",
- TEST_HASH_PUT_RESPONSE, response)
+ TestHashPutResp, response)
// ------------------
// With a server key.
- PermissionSecret = []byte(known_key)
+ PermissionSecret = []byte(knownKey)
blob_signature_ttl = 300 * time.Second
// When a permission key is available, the locator returned
// => OK (signed response)
response = IssueRequest(
&RequestTester{
- method: "PUT",
- uri: unsigned_locator,
- request_body: TEST_BLOCK,
- api_token: known_token,
+ method: "PUT",
+ uri: unsignedLocator,
+ requestBody: TestBlock,
+ apiToken: knownToken,
})
ExpectStatusCode(t,
"Authenticated PUT, signed locator, with server key",
http.StatusOK, response)
- response_locator := strings.TrimSpace(response.Body.String())
- if VerifySignature(response_locator, known_token) != nil {
+ responseLocator := strings.TrimSpace(response.Body.String())
+ if VerifySignature(responseLocator, knownToken) != nil {
t.Errorf("Authenticated PUT, signed locator, with server key:\n"+
"response '%s' does not contain a valid signature",
- response_locator)
+ responseLocator)
}
// Unauthenticated PUT, unsigned locator
// => OK
response = IssueRequest(
&RequestTester{
- method: "PUT",
- uri: unsigned_locator,
- request_body: TEST_BLOCK,
+ method: "PUT",
+ uri: unsignedLocator,
+ requestBody: TestBlock,
})
ExpectStatusCode(t,
http.StatusOK, response)
ExpectBody(t,
"Unauthenticated PUT, unsigned locator, with server key",
- TEST_HASH_PUT_RESPONSE, response)
+ TestHashPutResp, response)
}
func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
defer KeepVM.Close()
IssueRequest(
&RequestTester{
- method: "PUT",
- uri: "/" + TEST_HASH,
- request_body: TEST_BLOCK,
+ method: "PUT",
+ uri: "/" + TestHash,
+ requestBody: TestBlock,
})
+ defer func(orig bool) {
+ never_delete = orig
+ }(never_delete)
never_delete = false
IssueRequest(
&RequestTester{
- method: "DELETE",
- uri: "/" + TEST_HASH,
- request_body: TEST_BLOCK,
- api_token: data_manager_token,
+ method: "DELETE",
+ uri: "/" + TestHash,
+ requestBody: TestBlock,
+ apiToken: data_manager_token,
})
type expect struct {
volnum int
}
for _, e := range []expect{
{0, "Get", 0},
+ {0, "Compare", 0},
{0, "Touch", 0},
{0, "Put", 0},
{0, "Delete", 0},
- {1, "Get", 1},
+ {1, "Get", 0},
+ {1, "Compare", 1},
+ {1, "Touch", 1},
{1, "Put", 1},
{1, "Delete", 1},
} {
// - authenticated /index/prefix request | superuser
//
// The only /index requests that should succeed are those issued by the
-// superuser. They should pass regardless of the value of enforce_permissions.
+// superuser. They should pass regardless of the value of enforcePermissions.
//
func TestIndexHandler(t *testing.T) {
defer teardown()
defer KeepVM.Close()
vols := KeepVM.AllWritable()
- vols[0].Put(TEST_HASH, TEST_BLOCK)
- vols[1].Put(TEST_HASH_2, TEST_BLOCK_2)
- vols[0].Put(TEST_HASH+".meta", []byte("metadata"))
- vols[1].Put(TEST_HASH_2+".meta", []byte("metadata"))
+ vols[0].Put(TestHash, TestBlock)
+ vols[1].Put(TestHash2, TestBlock2)
+ vols[0].Put(TestHash+".meta", []byte("metadata"))
+ vols[1].Put(TestHash2+".meta", []byte("metadata"))
data_manager_token = "DATA MANAGER TOKEN"
- unauthenticated_req := &RequestTester{
+ unauthenticatedReq := &RequestTester{
method: "GET",
uri: "/index",
}
- authenticated_req := &RequestTester{
- method: "GET",
- uri: "/index",
- api_token: known_token,
+ authenticatedReq := &RequestTester{
+ method: "GET",
+ uri: "/index",
+ apiToken: knownToken,
}
- superuser_req := &RequestTester{
- method: "GET",
- uri: "/index",
- api_token: data_manager_token,
+ superuserReq := &RequestTester{
+ method: "GET",
+ uri: "/index",
+ apiToken: data_manager_token,
}
- unauth_prefix_req := &RequestTester{
+ unauthPrefixReq := &RequestTester{
method: "GET",
- uri: "/index/" + TEST_HASH[0:3],
+ uri: "/index/" + TestHash[0:3],
}
- auth_prefix_req := &RequestTester{
- method: "GET",
- uri: "/index/" + TEST_HASH[0:3],
- api_token: known_token,
+ authPrefixReq := &RequestTester{
+ method: "GET",
+ uri: "/index/" + TestHash[0:3],
+ apiToken: knownToken,
}
- superuser_prefix_req := &RequestTester{
- method: "GET",
- uri: "/index/" + TEST_HASH[0:3],
- api_token: data_manager_token,
+ superuserPrefixReq := &RequestTester{
+ method: "GET",
+ uri: "/index/" + TestHash[0:3],
+ apiToken: data_manager_token,
}
// -------------------------------------------------------------
// Only the superuser should be allowed to issue /index requests.
// ---------------------------
- // enforce_permissions enabled
+ // enforcePermissions enabled
// This setting should not affect tests passing.
- enforce_permissions = true
+ enforcePermissions = true
// unauthenticated /index request
// => UnauthorizedError
- response := IssueRequest(unauthenticated_req)
+ response := IssueRequest(unauthenticatedReq)
ExpectStatusCode(t,
- "enforce_permissions on, unauthenticated request",
+ "enforcePermissions on, unauthenticated request",
UnauthorizedError.HTTPCode,
response)
// unauthenticated /index/prefix request
// => UnauthorizedError
- response = IssueRequest(unauth_prefix_req)
+ response = IssueRequest(unauthPrefixReq)
ExpectStatusCode(t,
"permissions on, unauthenticated /index/prefix request",
UnauthorizedError.HTTPCode,
// authenticated /index request, non-superuser
// => UnauthorizedError
- response = IssueRequest(authenticated_req)
+ response = IssueRequest(authenticatedReq)
ExpectStatusCode(t,
"permissions on, authenticated request, non-superuser",
UnauthorizedError.HTTPCode,
// authenticated /index/prefix request, non-superuser
// => UnauthorizedError
- response = IssueRequest(auth_prefix_req)
+ response = IssueRequest(authPrefixReq)
ExpectStatusCode(t,
"permissions on, authenticated /index/prefix request, non-superuser",
UnauthorizedError.HTTPCode,
// superuser /index request
// => OK
- response = IssueRequest(superuser_req)
+ response = IssueRequest(superuserReq)
ExpectStatusCode(t,
"permissions on, superuser request",
http.StatusOK,
response)
// ----------------------------
- // enforce_permissions disabled
+ // enforcePermissions disabled
// Valid Request should still pass.
- enforce_permissions = false
+ enforcePermissions = false
// superuser /index request
// => OK
- response = IssueRequest(superuser_req)
+ response = IssueRequest(superuserReq)
ExpectStatusCode(t,
"permissions on, superuser request",
http.StatusOK,
response)
- expected := `^` + TEST_HASH + `\+\d+ \d+\n` +
- TEST_HASH_2 + `\+\d+ \d+\n\n$`
+ expected := `^` + TestHash + `\+\d+ \d+\n` +
+ TestHash2 + `\+\d+ \d+\n\n$`
match, _ := regexp.MatchString(expected, response.Body.String())
if !match {
t.Errorf(
// superuser /index/prefix request
// => OK
- response = IssueRequest(superuser_prefix_req)
+ response = IssueRequest(superuserPrefixReq)
ExpectStatusCode(t,
"permissions on, superuser request",
http.StatusOK,
response)
- expected = `^` + TEST_HASH + `\+\d+ \d+\n\n$`
+ expected = `^` + TestHash + `\+\d+ \d+\n\n$`
match, _ = regexp.MatchString(expected, response.Body.String())
if !match {
t.Errorf(
defer KeepVM.Close()
vols := KeepVM.AllWritable()
- vols[0].Put(TEST_HASH, TEST_BLOCK)
+ vols[0].Put(TestHash, TestBlock)
// Explicitly set the blob_signature_ttl to 0 for these
// tests, to ensure the MockVolume deletes the blocks
// even though they have just been created.
blob_signature_ttl = time.Duration(0)
- var user_token = "NOT DATA MANAGER TOKEN"
+ var userToken = "NOT DATA MANAGER TOKEN"
data_manager_token = "DATA MANAGER TOKEN"
never_delete = false
- unauth_req := &RequestTester{
+ unauthReq := &RequestTester{
method: "DELETE",
- uri: "/" + TEST_HASH,
+ uri: "/" + TestHash,
}
- user_req := &RequestTester{
- method: "DELETE",
- uri: "/" + TEST_HASH,
- api_token: user_token,
+ userReq := &RequestTester{
+ method: "DELETE",
+ uri: "/" + TestHash,
+ apiToken: userToken,
}
- superuser_existing_block_req := &RequestTester{
- method: "DELETE",
- uri: "/" + TEST_HASH,
- api_token: data_manager_token,
+ superuserExistingBlockReq := &RequestTester{
+ method: "DELETE",
+ uri: "/" + TestHash,
+ apiToken: data_manager_token,
}
- superuser_nonexistent_block_req := &RequestTester{
- method: "DELETE",
- uri: "/" + TEST_HASH_2,
- api_token: data_manager_token,
+ superuserNonexistentBlockReq := &RequestTester{
+ method: "DELETE",
+ uri: "/" + TestHash2,
+ apiToken: data_manager_token,
}
// Unauthenticated request returns PermissionError.
var response *httptest.ResponseRecorder
- response = IssueRequest(unauth_req)
+ response = IssueRequest(unauthReq)
ExpectStatusCode(t,
"unauthenticated request",
PermissionError.HTTPCode,
response)
// Authenticated non-admin request returns PermissionError.
- response = IssueRequest(user_req)
+ response = IssueRequest(userReq)
ExpectStatusCode(t,
"authenticated non-admin request",
PermissionError.HTTPCode,
Deleted int `json:"copies_deleted"`
Failed int `json:"copies_failed"`
}
- var response_dc, expected_dc deletecounter
+ var responseDc, expectedDc deletecounter
- response = IssueRequest(superuser_nonexistent_block_req)
+ response = IssueRequest(superuserNonexistentBlockReq)
ExpectStatusCode(t,
"data manager request, nonexistent block",
http.StatusNotFound,
// Authenticated admin request for existing block while never_delete is set.
never_delete = true
- response = IssueRequest(superuser_existing_block_req)
+ response = IssueRequest(superuserExistingBlockReq)
ExpectStatusCode(t,
"authenticated request, existing block, method disabled",
MethodDisabledError.HTTPCode,
never_delete = false
// Authenticated admin request for existing block.
- response = IssueRequest(superuser_existing_block_req)
+ response = IssueRequest(superuserExistingBlockReq)
ExpectStatusCode(t,
"data manager request, existing block",
http.StatusOK,
response)
// Expect response {"copies_deleted":1,"copies_failed":0}
- expected_dc = deletecounter{1, 0}
- json.NewDecoder(response.Body).Decode(&response_dc)
- if response_dc != expected_dc {
- t.Errorf("superuser_existing_block_req\nexpected: %+v\nreceived: %+v",
- expected_dc, response_dc)
+ expectedDc = deletecounter{1, 0}
+ json.NewDecoder(response.Body).Decode(&responseDc)
+ if responseDc != expectedDc {
+ t.Errorf("superuserExistingBlockReq\nexpected: %+v\nreceived: %+v",
+ expectedDc, responseDc)
}
// Confirm the block has been deleted
- _, err := vols[0].Get(TEST_HASH)
- var block_deleted = os.IsNotExist(err)
- if !block_deleted {
- t.Error("superuser_existing_block_req: block not deleted")
+ _, err := vols[0].Get(TestHash)
+ var blockDeleted = os.IsNotExist(err)
+ if !blockDeleted {
+ t.Error("superuserExistingBlockReq: block not deleted")
}
// A DELETE request on a block newer than blob_signature_ttl
// should return success but leave the block on the volume.
- vols[0].Put(TEST_HASH, TEST_BLOCK)
+ vols[0].Put(TestHash, TestBlock)
blob_signature_ttl = time.Hour
- response = IssueRequest(superuser_existing_block_req)
+ response = IssueRequest(superuserExistingBlockReq)
ExpectStatusCode(t,
"data manager request, existing block",
http.StatusOK,
response)
// Expect response {"copies_deleted":1,"copies_failed":0}
- expected_dc = deletecounter{1, 0}
- json.NewDecoder(response.Body).Decode(&response_dc)
- if response_dc != expected_dc {
- t.Errorf("superuser_existing_block_req\nexpected: %+v\nreceived: %+v",
- expected_dc, response_dc)
+ expectedDc = deletecounter{1, 0}
+ json.NewDecoder(response.Body).Decode(&responseDc)
+ if responseDc != expectedDc {
+ t.Errorf("superuserExistingBlockReq\nexpected: %+v\nreceived: %+v",
+ expectedDc, responseDc)
}
// Confirm the block has NOT been deleted.
- _, err = vols[0].Get(TEST_HASH)
+ _, err = vols[0].Get(TestHash)
if err != nil {
t.Errorf("testing delete on new block: %s\n", err)
}
func TestPullHandler(t *testing.T) {
defer teardown()
- var user_token = "USER TOKEN"
+ var userToken = "USER TOKEN"
data_manager_token = "DATA MANAGER TOKEN"
pullq = NewWorkQueue()
- good_json := []byte(`[
+ goodJSON := []byte(`[
{
"locator":"locator_with_two_servers",
"servers":[
}
]`)
- bad_json := []byte(`{ "key":"I'm a little teapot" }`)
+ badJSON := []byte(`{ "key":"I'm a little teapot" }`)
type pullTest struct {
- name string
- req RequestTester
- response_code int
- response_body string
+ name string
+ req RequestTester
+ responseCode int
+ responseBody string
}
var testcases = []pullTest{
{
"Valid pull list from an ordinary user",
- RequestTester{"/pull", user_token, "PUT", good_json},
+ RequestTester{"/pull", userToken, "PUT", goodJSON},
http.StatusUnauthorized,
"Unauthorized\n",
},
{
"Invalid pull request from an ordinary user",
- RequestTester{"/pull", user_token, "PUT", bad_json},
+ RequestTester{"/pull", userToken, "PUT", badJSON},
http.StatusUnauthorized,
"Unauthorized\n",
},
{
"Valid pull request from the data manager",
- RequestTester{"/pull", data_manager_token, "PUT", good_json},
+ RequestTester{"/pull", data_manager_token, "PUT", goodJSON},
http.StatusOK,
"Received 3 pull requests\n",
},
{
"Invalid pull request from the data manager",
- RequestTester{"/pull", data_manager_token, "PUT", bad_json},
+ RequestTester{"/pull", data_manager_token, "PUT", badJSON},
http.StatusBadRequest,
"",
},
for _, tst := range testcases {
response := IssueRequest(&tst.req)
- ExpectStatusCode(t, tst.name, tst.response_code, response)
- ExpectBody(t, tst.name, tst.response_body, response)
+ ExpectStatusCode(t, tst.name, tst.responseCode, response)
+ ExpectBody(t, tst.name, tst.responseBody, response)
}
// The Keep pull manager should have received one good list with 3
func TestTrashHandler(t *testing.T) {
defer teardown()
- var user_token = "USER TOKEN"
+ var userToken = "USER TOKEN"
data_manager_token = "DATA MANAGER TOKEN"
trashq = NewWorkQueue()
- good_json := []byte(`[
+ goodJSON := []byte(`[
{
"locator":"block1",
"block_mtime":1409082153
}
]`)
- bad_json := []byte(`I am not a valid JSON string`)
+ badJSON := []byte(`I am not a valid JSON string`)
type trashTest struct {
- name string
- req RequestTester
- response_code int
- response_body string
+ name string
+ req RequestTester
+ responseCode int
+ responseBody string
}
var testcases = []trashTest{
{
"Valid trash list from an ordinary user",
- RequestTester{"/trash", user_token, "PUT", good_json},
+ RequestTester{"/trash", userToken, "PUT", goodJSON},
http.StatusUnauthorized,
"Unauthorized\n",
},
{
"Invalid trash list from an ordinary user",
- RequestTester{"/trash", user_token, "PUT", bad_json},
+ RequestTester{"/trash", userToken, "PUT", badJSON},
http.StatusUnauthorized,
"Unauthorized\n",
},
{
"Valid trash list from the data manager",
- RequestTester{"/trash", data_manager_token, "PUT", good_json},
+ RequestTester{"/trash", data_manager_token, "PUT", goodJSON},
http.StatusOK,
"Received 3 trash requests\n",
},
{
"Invalid trash list from the data manager",
- RequestTester{"/trash", data_manager_token, "PUT", bad_json},
+ RequestTester{"/trash", data_manager_token, "PUT", badJSON},
http.StatusBadRequest,
"",
},
for _, tst := range testcases {
response := IssueRequest(&tst.req)
- ExpectStatusCode(t, tst.name, tst.response_code, response)
- ExpectBody(t, tst.name, tst.response_body, response)
+ ExpectStatusCode(t, tst.name, tst.responseCode, response)
+ ExpectBody(t, tst.name, tst.responseBody, response)
}
// The trash collector should have received one good list with 3
// REST router. It returns the HTTP response to the request.
func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder {
response := httptest.NewRecorder()
- body := bytes.NewReader(rt.request_body)
+ body := bytes.NewReader(rt.requestBody)
req, _ := http.NewRequest(rt.method, rt.uri, body)
- if rt.api_token != "" {
- req.Header.Set("Authorization", "OAuth2 "+rt.api_token)
+ if rt.apiToken != "" {
+ req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
}
loggingRouter := MakeLoggingRESTRouter()
loggingRouter.ServeHTTP(response, req)
func ExpectStatusCode(
t *testing.T,
testname string,
- expected_status int,
+ expectedStatus int,
response *httptest.ResponseRecorder) {
- if response.Code != expected_status {
+ if response.Code != expectedStatus {
t.Errorf("%s: expected status %d, got %+v",
- testname, expected_status, response)
+ testname, expectedStatus, response)
}
}
func ExpectBody(
t *testing.T,
testname string,
- expected_body string,
+ expectedBody string,
response *httptest.ResponseRecorder) {
- if expected_body != "" && response.Body.String() != expected_body {
+ if expectedBody != "" && response.Body.String() != expectedBody {
t.Errorf("%s: expected response body '%s', got %+v",
- testname, expected_body, response)
+ testname, expectedBody, response)
+ }
+}
+
+// See #7121
+func TestPutNeedsOnlyOneBuffer(t *testing.T) {
+ defer teardown()
+ KeepVM = MakeTestVolumeManager(1)
+ defer KeepVM.Close()
+
+ defer func(orig *bufferPool) {
+ bufs = orig
+ }(bufs)
+ bufs = newBufferPool(1, BlockSize)
+
+ ok := make(chan struct{})
+ go func() {
+ for i := 0; i < 2; i++ {
+ response := IssueRequest(
+ &RequestTester{
+ method: "PUT",
+ uri: "/" + TestHash,
+ requestBody: TestBlock,
+ })
+ ExpectStatusCode(t,
+ "TestPutNeedsOnlyOneBuffer", http.StatusOK, response)
+ }
+ ok <- struct{}{}
+ }()
+
+ select {
+ case <-ok:
+ case <-time.After(time.Second):
+ t.Fatal("PUT deadlocks with maxBuffers==1")
}
}
ok := make(chan bool)
go func() {
- for i := 0; i < maxBuffers+1; i += 1 {
+ for i := 0; i < maxBuffers+1; i++ {
// Unauthenticated request, no server key
// => OK (unsigned response)
- unsigned_locator := "/" + TEST_HASH
+ unsignedLocator := "/" + TestHash
response := IssueRequest(
&RequestTester{
- method: "PUT",
- uri: unsigned_locator,
- request_body: TEST_BLOCK,
+ method: "PUT",
+ uri: unsignedLocator,
+ requestBody: TestBlock,
})
ExpectStatusCode(t,
"TestPutHandlerBufferleak", http.StatusOK, response)
ExpectBody(t,
"TestPutHandlerBufferleak",
- TEST_HASH_PUT_RESPONSE, response)
+ TestHashPutResp, response)
}
ok <- true
}()
defer KeepVM.Close()
vols := KeepVM.AllWritable()
- if err := vols[0].Put(TEST_HASH, TEST_BLOCK); err != nil {
+ if err := vols[0].Put(TestHash, TestBlock); err != nil {
t.Error(err)
}
ok := make(chan bool)
go func() {
- for i := 0; i < maxBuffers+1; i += 1 {
+ for i := 0; i < maxBuffers+1; i++ {
// Unauthenticated request, unsigned locator
// => OK
- unsigned_locator := "/" + TEST_HASH
+ unsignedLocator := "/" + TestHash
response := IssueRequest(
&RequestTester{
method: "GET",
- uri: unsigned_locator,
+ uri: unsignedLocator,
})
ExpectStatusCode(t,
"Unauthenticated request, unsigned locator", http.StatusOK, response)
ExpectBody(t,
"Unauthenticated request, unsigned locator",
- string(TEST_BLOCK),
+ string(TestBlock),
response)
}
ok <- true
// StatusHandler (GET /status.json)
import (
- "bytes"
"container/list"
"crypto/md5"
"encoding/json"
return rest
}
+// BadRequestHandler is a HandleFunc to address bad requests.
func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
}
+// GetBlockHandler is a HandleFunc to address Get block requests.
func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
- if enforce_permissions {
+ if enforcePermissions {
locator := req.URL.Path[1:] // strip leading slash
if err := VerifySignature(locator, GetApiToken(req)); err != nil {
http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
}
}
- block, err := GetBlock(mux.Vars(req)["hash"], false)
+ block, err := GetBlock(mux.Vars(req)["hash"])
if err != nil {
// This type assertion is safe because the only errors
// GetBlock can return are DiskHashError or NotFoundError.
resp.Write(block)
}
+// PutBlockHandler is a HandleFunc to address Put block requests.
func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
hash := mux.Vars(req)["hash"]
return
}
- if req.ContentLength > BLOCKSIZE {
+ if req.ContentLength > BlockSize {
http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
return
}
// Success; add a size hint, sign the locator if possible, and
// return it to the client.
- return_hash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
- api_token := GetApiToken(req)
- if PermissionSecret != nil && api_token != "" {
+ returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
+ apiToken := GetApiToken(req)
+ if PermissionSecret != nil && apiToken != "" {
expiry := time.Now().Add(blob_signature_ttl)
- return_hash = SignLocator(return_hash, api_token, expiry)
+ returnHash = SignLocator(returnHash, apiToken, expiry)
}
- resp.Write([]byte(return_hash + "\n"))
+ resp.Write([]byte(returnHash + "\n"))
}
-// IndexHandler
-// A HandleFunc to address /index and /index/{prefix} requests.
-//
+// IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
func IndexHandler(resp http.ResponseWriter, req *http.Request) {
// Reject unauthorized requests.
if !IsDataManagerToken(GetApiToken(req)) {
// * device_num (an integer identifying the underlying filesystem)
// * bytes_free
// * bytes_used
-//
-type VolumeStatus struct {
- MountPoint string `json:"mount_point"`
- DeviceNum uint64 `json:"device_num"`
- BytesFree uint64 `json:"bytes_free"`
- BytesUsed uint64 `json:"bytes_used"`
-}
+// PoolStatus struct
type PoolStatus struct {
Alloc uint64 `json:"BytesAllocated"`
Cap int `json:"BuffersMax"`
Len int `json:"BuffersInUse"`
}
+// NodeStatus struct
type NodeStatus struct {
Volumes []*VolumeStatus `json:"volumes"`
BufferPool PoolStatus
var st NodeStatus
var stLock sync.Mutex
+// StatusHandler addresses /status.json requests.
func StatusHandler(resp http.ResponseWriter, req *http.Request) {
stLock.Lock()
readNodeStatus(&st)
if err == nil {
resp.Write(jstat)
} else {
- log.Printf("json.Marshal: %s\n", err)
- log.Printf("NodeStatus = %v\n", &st)
+ log.Printf("json.Marshal: %s", err)
+ log.Printf("NodeStatus = %v", &st)
http.Error(resp, err.Error(), 500)
}
}
if body, err := json.Marshal(result); err == nil {
resp.Write(body)
} else {
- log.Printf("json.Marshal: %s (result = %v)\n", err, result)
+ log.Printf("json.Marshal: %s (result = %v)", err, result)
http.Error(resp, err.Error(), 500)
}
}
If the JSON unmarshalling fails, return 400 Bad Request.
*/
+// PullRequest consists of a block locator and an ordered list of servers
type PullRequest struct {
Locator string `json:"locator"`
Servers []string `json:"servers"`
}
+// PullHandler processes "PUT /pull" requests for the data manager.
func PullHandler(resp http.ResponseWriter, req *http.Request) {
// Reject unauthorized requests.
if !IsDataManagerToken(GetApiToken(req)) {
pullq.ReplaceQueue(plist)
}
+// TrashRequest consists of a block locator and it's Mtime
type TrashRequest struct {
Locator string `json:"locator"`
BlockMtime int64 `json:"block_mtime"`
}
+// TrashHandler processes /trash requests.
func TrashHandler(resp http.ResponseWriter, req *http.Request) {
// Reject unauthorized requests.
if !IsDataManagerToken(GetApiToken(req)) {
// should be the only part of the code that cares about which volume a
// block is stored on, so it should be responsible for figuring out
// which volume to check for fetching blocks, storing blocks, etc.
-
// ==============================
-// GetBlock fetches and returns the block identified by "hash". If
-// the update_timestamp argument is true, GetBlock also updates the
-// block's file modification time (for the sake of PutBlock, which
-// must update the file's timestamp when the block already exists).
+
+// GetBlock fetches and returns the block identified by "hash".
//
// On success, GetBlock returns a byte slice with the block data, and
// a nil error.
// If the block found does not have the correct MD5 hash, returns
// DiskHashError.
//
-
-func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
+func GetBlock(hash string) ([]byte, error) {
// Attempt to read the requested hash from a keep volume.
- error_to_caller := NotFoundError
-
- var vols []Volume
- if update_timestamp {
- // Pointless to find the block on an unwritable volume
- // because Touch() will fail -- this is as good as
- // "not found" for purposes of callers who need to
- // update_timestamp.
- vols = KeepVM.AllWritable()
- } else {
- vols = KeepVM.AllReadable()
- }
+ errorToCaller := NotFoundError
- for _, vol := range vols {
+ for _, vol := range KeepVM.AllReadable() {
buf, err := vol.Get(hash)
if err != nil {
// IsNotExist is an expected error and may be
// volumes. If all volumes report IsNotExist,
// we return a NotFoundError.
if !os.IsNotExist(err) {
- log.Printf("GetBlock: reading %s: %s\n", hash, err)
+ log.Printf("%s: Get(%s): %s", vol, hash, err)
}
continue
}
if filehash != hash {
// TODO: Try harder to tell a sysadmin about
// this.
- log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
+ log.Printf("%s: checksum mismatch for request %s (actual %s)",
vol, hash, filehash)
- error_to_caller = DiskHashError
+ errorToCaller = DiskHashError
bufs.Put(buf)
continue
}
- if error_to_caller == DiskHashError {
+ if errorToCaller == DiskHashError {
log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
vol, hash)
}
- if update_timestamp {
- if err := vol.Touch(hash); err != nil {
- error_to_caller = GenericError
- log.Printf("%s: Touch %s failed: %s",
- vol, hash, error_to_caller)
- bufs.Put(buf)
- continue
- }
- }
return buf, nil
}
- return nil, error_to_caller
+ return nil, errorToCaller
}
-/* PutBlock(block, hash)
- Stores the BLOCK (identified by the content id HASH) in Keep.
-
- The MD5 checksum of the block must be identical to the content id HASH.
- If not, an error is returned.
-
- PutBlock stores the BLOCK on the first Keep volume with free space.
- A failure code is returned to the user only if all volumes fail.
-
- On success, PutBlock returns nil.
- On failure, it returns a KeepError with one of the following codes:
-
- 500 Collision
- A different block with the same hash already exists on this
- Keep server.
- 422 MD5Fail
- The MD5 hash of the BLOCK does not match the argument HASH.
- 503 Full
- There was not enough space left in any Keep volume to store
- the object.
- 500 Fail
- The object could not be stored for some other reason (e.g.
- all writes failed). The text of the error message should
- provide as much detail as possible.
-*/
-
+// PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
+//
+// PutBlock(block, hash)
+// Stores the BLOCK (identified by the content id HASH) in Keep.
+//
+// The MD5 checksum of the block must be identical to the content id HASH.
+// If not, an error is returned.
+//
+// PutBlock stores the BLOCK on the first Keep volume with free space.
+// A failure code is returned to the user only if all volumes fail.
+//
+// On success, PutBlock returns nil.
+// On failure, it returns a KeepError with one of the following codes:
+//
+// 500 Collision
+// A different block with the same hash already exists on this
+// Keep server.
+// 422 MD5Fail
+// The MD5 hash of the BLOCK does not match the argument HASH.
+// 503 Full
+// There was not enough space left in any Keep volume to store
+// the object.
+// 500 Fail
+// The object could not be stored for some other reason (e.g.
+// all writes failed). The text of the error message should
+// provide as much detail as possible.
+//
func PutBlock(block []byte, hash string) error {
// Check that BLOCK's checksum matches HASH.
blockhash := fmt.Sprintf("%x", md5.Sum(block))
return RequestHashError
}
- // If we already have a block on disk under this identifier, return
- // success (but check for MD5 collisions). While fetching the block,
- // update its timestamp.
- // The only errors that GetBlock can return are DiskHashError and NotFoundError.
- // In either case, we want to write our new (good) block to disk,
- // so there is nothing special to do if err != nil.
- //
- if oldblock, err := GetBlock(hash, true); err == nil {
- defer bufs.Put(oldblock)
- if bytes.Compare(block, oldblock) == 0 {
- // The block already exists; return success.
- return nil
- } else {
- return CollisionError
- }
+ // If we already have this data, it's intact on disk, and we
+ // can update its timestamp, return success. If we have
+ // different data with the same hash, return failure.
+ if err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
+ return err
}
// Choose a Keep volume to write to.
// write did not succeed. Report the
// error and continue trying.
allFull = false
- log.Printf("%s: Write(%s): %s\n", vol, hash, err)
+ log.Printf("%s: Write(%s): %s", vol, hash, err)
}
}
if allFull {
log.Print("All volumes are full.")
return FullError
- } else {
- // Already logged the non-full errors.
- return GenericError
}
+ // Already logged the non-full errors.
+ return GenericError
+}
+
+// CompareAndTouch returns nil if one of the volumes already has the
+// given content and it successfully updates the relevant block's
+// modification time in order to protect it from premature garbage
+// collection.
+func CompareAndTouch(hash string, buf []byte) error {
+ var bestErr error = NotFoundError
+ for _, vol := range KeepVM.AllWritable() {
+ if err := vol.Compare(hash, buf); err == CollisionError {
+ // Stop if we have a block with same hash but
+ // different content. (It will be impossible
+ // to tell which one is wanted if we have
+ // both, so there's no point writing it even
+ // on a different volume.)
+ log.Printf("%s: Compare(%s): %s", vol, hash, err)
+ return err
+ } else if os.IsNotExist(err) {
+ // Block does not exist. This is the only
+ // "normal" error: we don't log anything.
+ continue
+ } else if err != nil {
+ // Couldn't open file, data is corrupt on
+ // disk, etc.: log this abnormal condition,
+ // and try the next volume.
+ log.Printf("%s: Compare(%s): %s", vol, hash, err)
+ continue
+ }
+ if err := vol.Touch(hash); err != nil {
+ log.Printf("%s: Touch %s failed: %s", vol, hash, err)
+ bestErr = err
+ continue
+ }
+ // Compare and Touch both worked --> done.
+ return nil
+ }
+ return bestErr
}
var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
-// IsValidLocator
-// Return true if the specified string is a valid Keep locator.
-// When Keep is extended to support hash types other than MD5,
-// this should be updated to cover those as well.
+// IsValidLocator returns true if the specified string is a valid Keep locator.
+// When Keep is extended to support hash types other than MD5,
+// this should be updated to cover those as well.
//
func IsValidLocator(loc string) bool {
return validLocatorRe.MatchString(loc)
}
// IsExpired returns true if the given Unix timestamp (expressed as a
-// hexadecimal string) is in the past, or if timestamp_hex cannot be
+// hexadecimal string) is in the past, or if timestampHex cannot be
// parsed as a hexadecimal string.
-func IsExpired(timestamp_hex string) bool {
- ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
+func IsExpired(timestampHex string) bool {
+ ts, err := strconv.ParseInt(timestampHex, 16, 0)
if err != nil {
- log.Printf("IsExpired: %s\n", err)
+ log.Printf("IsExpired: %s", err)
return true
}
return time.Unix(ts, 0).Before(time.Now())
}
-// CanDelete returns true if the user identified by api_token is
+// CanDelete returns true if the user identified by apiToken is
// allowed to delete blocks.
-func CanDelete(api_token string) bool {
- if api_token == "" {
+func CanDelete(apiToken string) bool {
+ if apiToken == "" {
return false
}
// Blocks may be deleted only when Keep has been configured with a
// data manager.
- if IsDataManagerToken(api_token) {
+ if IsDataManagerToken(apiToken) {
return true
}
- // TODO(twp): look up api_token with the API server
+ // TODO(twp): look up apiToken with the API server
// return true if is_admin is true and if the token
// has unlimited scope
return false
}
-// IsDataManagerToken returns true if api_token represents the data
+// IsDataManagerToken returns true if apiToken represents the data
// manager's token.
-func IsDataManagerToken(api_token string) bool {
- return data_manager_token != "" && api_token == data_manager_token
+func IsDataManagerToken(apiToken string) bool {
+ return data_manager_token != "" && apiToken == data_manager_token
}
"os"
"os/signal"
"strings"
+ "sync"
"syscall"
"time"
)
// Default TCP address on which to listen for requests.
// Initialized by the --listen flag.
-const DEFAULT_ADDR = ":25107"
+const DefaultAddr = ":25107"
// A Keep "block" is 64MB.
-const BLOCKSIZE = 64 * 1024 * 1024
+const BlockSize = 64 * 1024 * 1024
-// A Keep volume must have at least MIN_FREE_KILOBYTES available
+// A Keep volume must have at least MinFreeKilobytes available
// in order to permit writes.
-const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
+const MinFreeKilobytes = BlockSize / 1024
// Until #6221 is resolved, never_delete must be true.
-// However, allow it to be false in testing.
+// However, allow it to be false in testing with TEST_DATA_MANAGER_TOKEN
const TEST_DATA_MANAGER_TOKEN = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h"
-var PROC_MOUNTS = "/proc/mounts"
+// ProcMounts /proc/mounts
+var ProcMounts = "/proc/mounts"
-// enforce_permissions controls whether permission signatures
+// enforcePermissions controls whether permission signatures
// should be enforced (affecting GET and DELETE requests).
// Initialized by the -enforce-permissions flag.
-var enforce_permissions bool
+var enforcePermissions bool
// blob_signature_ttl is the time duration for which new permission
// signatures (returned by PUT requests) will be valid.
var maxBuffers = 128
var bufs *bufferPool
-// ==========
-// Error types.
+// KeepError types.
//
type KeepError struct {
HTTPCode int
if _, err := os.Stat(value); err != nil {
return err
}
+ var locker sync.Locker
+ if flagSerializeIO {
+ locker = &sync.Mutex{}
+ }
*vs = append(*vs, &UnixVolume{
- root: value,
- serialize: flagSerializeIO,
- readonly: flagReadonly,
+ root: value,
+ locker: locker,
+ readonly: flagReadonly,
})
return nil
}
// other than "/". It returns the number of volumes added.
func (vs *volumeSet) Discover() int {
added := 0
- f, err := os.Open(PROC_MOUNTS)
+ f, err := os.Open(ProcMounts)
if err != nil {
- log.Fatalf("opening %s: %s", PROC_MOUNTS, err)
+ log.Fatalf("opening %s: %s", ProcMounts, err)
}
scanner := bufio.NewScanner(f)
for scanner.Scan() {
args := strings.Fields(scanner.Text())
if err := scanner.Err(); err != nil {
- log.Fatalf("reading %s: %s", PROC_MOUNTS, err)
+ log.Fatalf("reading %s: %s", ProcMounts, err)
}
dev, mount := args[0], args[1]
if mount == "/" {
"File with the API token used by the Data Manager. All DELETE "+
"requests or GET /index requests must carry this token.")
flag.BoolVar(
- &enforce_permissions,
+ &enforcePermissions,
"enforce-permissions",
false,
"Enforce permission signatures on requests.")
flag.StringVar(
&listen,
"listen",
- DEFAULT_ADDR,
+ DefaultAddr,
"Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
flag.BoolVar(
&never_delete,
&maxBuffers,
"max-buffers",
maxBuffers,
- fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BLOCKSIZE>>20))
+ fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
flag.Parse()
if maxBuffers < 0 {
log.Fatal("-max-buffers must be greater than zero.")
}
- bufs = newBufferPool(maxBuffers, BLOCKSIZE)
+ bufs = newBufferPool(maxBuffers, BlockSize)
if pidfile != "" {
f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
blob_signature_ttl = time.Duration(permission_ttl_sec) * time.Second
if PermissionSecret == nil {
- if enforce_permissions {
+ if enforcePermissions {
log.Fatal("-enforce-permissions requires a permission key")
} else {
log.Println("Running without a PermissionSecret. Block locators " +
"testing"
)
-var TEST_BLOCK = []byte("The quick brown fox jumps over the lazy dog.")
-var TEST_HASH = "e4d909c290d0fb1ca068ffaddf22cbd0"
-var TEST_HASH_PUT_RESPONSE = "e4d909c290d0fb1ca068ffaddf22cbd0+44\n"
+var TestBlock = []byte("The quick brown fox jumps over the lazy dog.")
+var TestHash = "e4d909c290d0fb1ca068ffaddf22cbd0"
+var TestHashPutResp = "e4d909c290d0fb1ca068ffaddf22cbd0+44\n"
-var TEST_BLOCK_2 = []byte("Pack my box with five dozen liquor jugs.")
-var TEST_HASH_2 = "f15ac516f788aec4f30932ffb6395c39"
+var TestBlock2 = []byte("Pack my box with five dozen liquor jugs.")
+var TestHash2 = "f15ac516f788aec4f30932ffb6395c39"
-var TEST_BLOCK_3 = []byte("Now is the time for all good men to come to the aid of their country.")
-var TEST_HASH_3 = "eed29bbffbc2dbe5e5ee0bb71888e61f"
+var TestBlock3 = []byte("Now is the time for all good men to come to the aid of their country.")
+var TestHash3 = "eed29bbffbc2dbe5e5ee0bb71888e61f"
-// BAD_BLOCK is used to test collisions and corruption.
+// BadBlock is used to test collisions and corruption.
// It must not match any test hashes.
-var BAD_BLOCK = []byte("The magic words are squeamish ossifrage.")
+var BadBlock = []byte("The magic words are squeamish ossifrage.")
// TODO(twp): Tests still to be written
//
defer KeepVM.Close()
vols := KeepVM.AllReadable()
- if err := vols[1].Put(TEST_HASH, TEST_BLOCK); err != nil {
+ if err := vols[1].Put(TestHash, TestBlock); err != nil {
t.Error(err)
}
// Check that GetBlock returns success.
- result, err := GetBlock(TEST_HASH, false)
+ result, err := GetBlock(TestHash)
if err != nil {
t.Errorf("GetBlock error: %s", err)
}
- if fmt.Sprint(result) != fmt.Sprint(TEST_BLOCK) {
- t.Errorf("expected %s, got %s", TEST_BLOCK, result)
+ if fmt.Sprint(result) != fmt.Sprint(TestBlock) {
+ t.Errorf("expected %s, got %s", TestBlock, result)
}
}
defer KeepVM.Close()
// Check that GetBlock returns failure.
- result, err := GetBlock(TEST_HASH, false)
+ result, err := GetBlock(TestHash)
if err != NotFoundError {
t.Errorf("Expected NotFoundError, got %v", result)
}
defer KeepVM.Close()
vols := KeepVM.AllReadable()
- vols[0].Put(TEST_HASH, BAD_BLOCK)
+ vols[0].Put(TestHash, BadBlock)
// Check that GetBlock returns failure.
- result, err := GetBlock(TEST_HASH, false)
+ result, err := GetBlock(TestHash)
if err != DiskHashError {
t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, result)
}
defer KeepVM.Close()
// Check that PutBlock stores the data as expected.
- if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
+ if err := PutBlock(TestBlock, TestHash); err != nil {
t.Fatalf("PutBlock: %v", err)
}
vols := KeepVM.AllReadable()
- result, err := vols[1].Get(TEST_HASH)
+ result, err := vols[1].Get(TestHash)
if err != nil {
t.Fatalf("Volume #0 Get returned error: %v", err)
}
- if string(result) != string(TEST_BLOCK) {
+ if string(result) != string(TestBlock) {
t.Fatalf("PutBlock stored '%s', Get retrieved '%s'",
- string(TEST_BLOCK), string(result))
+ string(TestBlock), string(result))
}
}
vols[0].(*MockVolume).Bad = true
// Check that PutBlock stores the data as expected.
- if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
+ if err := PutBlock(TestBlock, TestHash); err != nil {
t.Fatalf("PutBlock: %v", err)
}
- result, err := GetBlock(TEST_HASH, false)
+ result, err := GetBlock(TestHash)
if err != nil {
t.Fatalf("GetBlock: %v", err)
}
- if string(result) != string(TEST_BLOCK) {
+ if string(result) != string(TestBlock) {
t.Error("PutBlock/GetBlock mismatch")
t.Fatalf("PutBlock stored '%s', GetBlock retrieved '%s'",
- string(TEST_BLOCK), string(result))
+ string(TestBlock), string(result))
}
}
// Check that PutBlock returns the expected error when the hash does
// not match the block.
- if err := PutBlock(BAD_BLOCK, TEST_HASH); err != RequestHashError {
+ if err := PutBlock(BadBlock, TestHash); err != RequestHashError {
t.Error("Expected RequestHashError, got %v", err)
}
// Confirm that GetBlock fails to return anything.
- if result, err := GetBlock(TEST_HASH, false); err != NotFoundError {
+ if result, err := GetBlock(TestHash); err != NotFoundError {
t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)",
string(result), err)
}
KeepVM = MakeTestVolumeManager(2)
defer KeepVM.Close()
- // Store a corrupted block under TEST_HASH.
+ // Store a corrupted block under TestHash.
vols := KeepVM.AllWritable()
- vols[0].Put(TEST_HASH, BAD_BLOCK)
- if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
+ vols[0].Put(TestHash, BadBlock)
+ if err := PutBlock(TestBlock, TestHash); err != nil {
t.Errorf("PutBlock: %v", err)
}
- // The block on disk should now match TEST_BLOCK.
- if block, err := GetBlock(TEST_HASH, false); err != nil {
+ // The block on disk should now match TestBlock.
+ if block, err := GetBlock(TestHash); err != nil {
t.Errorf("GetBlock: %v", err)
- } else if bytes.Compare(block, TEST_BLOCK) != 0 {
+ } else if bytes.Compare(block, TestBlock) != 0 {
t.Errorf("GetBlock returned: '%s'", string(block))
}
}
// Store a block and then make the underlying volume bad,
// so a subsequent attempt to update the file timestamp
// will fail.
- vols[0].Put(TEST_HASH, BAD_BLOCK)
- old_mtime, err := vols[0].Mtime(TEST_HASH)
+ vols[0].Put(TestHash, BadBlock)
+ oldMtime, err := vols[0].Mtime(TestHash)
if err != nil {
- t.Fatalf("vols[0].Mtime(%s): %s\n", TEST_HASH, err)
+ t.Fatalf("vols[0].Mtime(%s): %s\n", TestHash, err)
}
// vols[0].Touch will fail on the next call, so the volume
// manager will store a copy on vols[1] instead.
vols[0].(*MockVolume).Touchable = false
- if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
+ if err := PutBlock(TestBlock, TestHash); err != nil {
t.Fatalf("PutBlock: %v", err)
}
vols[0].(*MockVolume).Touchable = true
// Now the mtime on the block on vols[0] should be unchanged, and
// there should be a copy of the block on vols[1].
- new_mtime, err := vols[0].Mtime(TEST_HASH)
+ newMtime, err := vols[0].Mtime(TestHash)
if err != nil {
- t.Fatalf("vols[0].Mtime(%s): %s\n", TEST_HASH, err)
+ t.Fatalf("vols[0].Mtime(%s): %s\n", TestHash, err)
}
- if !new_mtime.Equal(old_mtime) {
- t.Errorf("mtime was changed on vols[0]:\nold_mtime = %v\nnew_mtime = %v\n",
- old_mtime, new_mtime)
+ if !newMtime.Equal(oldMtime) {
+ t.Errorf("mtime was changed on vols[0]:\noldMtime = %v\nnewMtime = %v\n",
+ oldMtime, newMtime)
}
- result, err := vols[1].Get(TEST_HASH)
+ result, err := vols[1].Get(TestHash)
if err != nil {
t.Fatalf("vols[1]: %v", err)
}
- if bytes.Compare(result, TEST_BLOCK) != 0 {
+ if bytes.Compare(result, TestBlock) != 0 {
t.Errorf("new block does not match test block\nnew block = %v\n", result)
}
}
}
}
- // Set up a bogus PROC_MOUNTS file.
+ // Set up a bogus ProcMounts file.
f, err := ioutil.TempFile("", "keeptest")
if err != nil {
t.Fatal(err)
fmt.Fprintf(f, "tmpfs %s tmpfs %s 0 0\n", path.Dir(vol), opts)
}
f.Close()
- PROC_MOUNTS = f.Name()
+ ProcMounts = f.Name()
var resultVols volumeSet
added := resultVols.Discover()
func TestDiscoverNone(t *testing.T) {
defer teardown()
- // Set up a bogus PROC_MOUNTS file with no Keep vols.
+ // Set up a bogus ProcMounts file with no Keep vols.
f, err := ioutil.TempFile("", "keeptest")
if err != nil {
t.Fatal(err)
fmt.Fprintln(f, "udev /dev devtmpfs opts 0 0")
fmt.Fprintln(f, "devpts /dev/pts devpts opts 0 0")
f.Close()
- PROC_MOUNTS = f.Name()
+ ProcMounts = f.Name()
var resultVols volumeSet
added := resultVols.Discover()
defer KeepVM.Close()
vols := KeepVM.AllReadable()
- vols[0].Put(TEST_HASH, TEST_BLOCK)
- vols[1].Put(TEST_HASH_2, TEST_BLOCK_2)
- vols[0].Put(TEST_HASH_3, TEST_BLOCK_3)
- vols[0].Put(TEST_HASH+".meta", []byte("metadata"))
- vols[1].Put(TEST_HASH_2+".meta", []byte("metadata"))
+ vols[0].Put(TestHash, TestBlock)
+ vols[1].Put(TestHash2, TestBlock2)
+ vols[0].Put(TestHash3, TestBlock3)
+ vols[0].Put(TestHash+".meta", []byte("metadata"))
+ vols[1].Put(TestHash2+".meta", []byte("metadata"))
buf := new(bytes.Buffer)
vols[0].IndexTo("", buf)
vols[1].IndexTo("", buf)
- index_rows := strings.Split(string(buf.Bytes()), "\n")
- sort.Strings(index_rows)
- sorted_index := strings.Join(index_rows, "\n")
- expected := `^\n` + TEST_HASH + `\+\d+ \d+\n` +
- TEST_HASH_3 + `\+\d+ \d+\n` +
- TEST_HASH_2 + `\+\d+ \d+$`
-
- match, err := regexp.MatchString(expected, sorted_index)
+ indexRows := strings.Split(string(buf.Bytes()), "\n")
+ sort.Strings(indexRows)
+ sortedIndex := strings.Join(indexRows, "\n")
+ expected := `^\n` + TestHash + `\+\d+ \d+\n` +
+ TestHash3 + `\+\d+ \d+\n` +
+ TestHash2 + `\+\d+ \d+$`
+
+ match, err := regexp.MatchString(expected, sortedIndex)
if err == nil {
if !match {
t.Errorf("IndexLocators returned:\n%s", string(buf.Bytes()))
// MakeTestVolumeManager returns a RRVolumeManager with the specified
// number of MockVolumes.
-func MakeTestVolumeManager(num_volumes int) VolumeManager {
- vols := make([]Volume, num_volumes)
+func MakeTestVolumeManager(numVolumes int) VolumeManager {
+ vols := make([]Volume, numVolumes)
for i := range vols {
vols[i] = CreateMockVolume()
}
// teardown cleans up after each test.
func teardown() {
data_manager_token = ""
- enforce_permissions = false
+ enforcePermissions = false
PermissionSecret = nil
KeepVM = nil
}
"time"
)
+// LoggingResponseWriter has anonymous fields ResponseWriter and ResponseBody
type LoggingResponseWriter struct {
Status int
Length int
ResponseBody string
}
+// WriteHeader writes header to ResponseWriter
func (loggingWriter *LoggingResponseWriter) WriteHeader(code int) {
loggingWriter.Status = code
loggingWriter.ResponseWriter.WriteHeader(code)
return loggingWriter.ResponseWriter.Write(data)
}
+// LoggingRESTRouter is used to add logging capabilities to mux.Router
type LoggingRESTRouter struct {
router *mux.Router
}
+// MakeLoggingRESTRouter initializes LoggingRESTRouter
func MakeLoggingRESTRouter() *LoggingRESTRouter {
router := MakeRESTRouter()
return (&LoggingRESTRouter{router})
--- /dev/null
+package main
+
+type MockMutex struct {
+ AllowLock chan struct{}
+ AllowUnlock chan struct{}
+}
+
+func NewMockMutex() *MockMutex {
+ return &MockMutex{
+ AllowLock: make(chan struct{}),
+ AllowUnlock: make(chan struct{}),
+ }
+}
+
+// Lock waits for someone to send to AllowLock.
+func (m *MockMutex) Lock() {
+ <- m.AllowLock
+}
+
+// Unlock waits for someone to send to AllowUnlock.
+func (m *MockMutex) Unlock() {
+ <- m.AllowUnlock
+}
var PermissionSecret []byte
// MakePermSignature returns a string representing the signed permission
-// hint for the blob identified by blob_hash, api_token and expiration timestamp.
-func MakePermSignature(blob_hash string, api_token string, expiry string) string {
+// hint for the blob identified by blobHash, apiToken and expiration timestamp.
+func MakePermSignature(blobHash string, apiToken string, expiry string) string {
hmac := hmac.New(sha1.New, PermissionSecret)
- hmac.Write([]byte(blob_hash))
+ hmac.Write([]byte(blobHash))
hmac.Write([]byte("@"))
- hmac.Write([]byte(api_token))
+ hmac.Write([]byte(apiToken))
hmac.Write([]byte("@"))
hmac.Write([]byte(expiry))
digest := hmac.Sum(nil)
return fmt.Sprintf("%x", digest)
}
-// SignLocator takes a blob_locator, an api_token and an expiry time, and
+// SignLocator takes a blobLocator, an apiToken and an expiry time, and
// returns a signed locator string.
-func SignLocator(blob_locator string, api_token string, expiry time.Time) string {
+func SignLocator(blobLocator string, apiToken string, expiry time.Time) string {
// If no permission secret or API token is available,
// return an unsigned locator.
- if PermissionSecret == nil || api_token == "" {
- return blob_locator
+ if PermissionSecret == nil || apiToken == "" {
+ return blobLocator
}
// Extract the hash from the blob locator, omitting any size hint that may be present.
- blob_hash := strings.Split(blob_locator, "+")[0]
+ blobHash := strings.Split(blobLocator, "+")[0]
// Return the signed locator string.
- timestamp_hex := fmt.Sprintf("%08x", expiry.Unix())
- return blob_locator +
- "+A" + MakePermSignature(blob_hash, api_token, timestamp_hex) +
- "@" + timestamp_hex
+ timestampHex := fmt.Sprintf("%08x", expiry.Unix())
+ return blobLocator +
+ "+A" + MakePermSignature(blobHash, apiToken, timestampHex) +
+ "@" + timestampHex
}
var signedLocatorRe = regexp.MustCompile(`^([[:xdigit:]]{32}).*\+A([[:xdigit:]]{40})@([[:xdigit:]]{8})`)
-// VerifySignature returns nil if the signature on the signed_locator
-// can be verified using the given api_token. Otherwise it returns
+// VerifySignature returns nil if the signature on the signedLocator
+// can be verified using the given apiToken. Otherwise it returns
// either ExpiredError (if the timestamp has expired, which is
// something the client could have figured out independently) or
// PermissionError.
-func VerifySignature(signed_locator string, api_token string) error {
- matches := signedLocatorRe.FindStringSubmatch(signed_locator)
+func VerifySignature(signedLocator string, apiToken string) error {
+ matches := signedLocatorRe.FindStringSubmatch(signedLocator)
if matches == nil {
// Could not find a permission signature at all
return PermissionError
}
- blob_hash := matches[1]
- sig_hex := matches[2]
- exp_hex := matches[3]
- if exp_time, err := ParseHexTimestamp(exp_hex); err != nil {
+ blobHash := matches[1]
+ sigHex := matches[2]
+ expHex := matches[3]
+ if expTime, err := ParseHexTimestamp(expHex); err != nil {
return PermissionError
- } else if exp_time.Before(time.Now()) {
+ } else if expTime.Before(time.Now()) {
return ExpiredError
}
- if sig_hex != MakePermSignature(blob_hash, api_token, exp_hex) {
+ if sigHex != MakePermSignature(blobHash, apiToken, expHex) {
return PermissionError
}
return nil
}
-func ParseHexTimestamp(timestamp_hex string) (ts time.Time, err error) {
- if ts_int, e := strconv.ParseInt(timestamp_hex, 16, 0); e == nil {
- ts = time.Unix(ts_int, 0)
+// ParseHexTimestamp parses timestamp
+func ParseHexTimestamp(timestampHex string) (ts time.Time, err error) {
+ if tsInt, e := strconv.ParseInt(timestampHex, 16, 0); e == nil {
+ ts = time.Unix(tsInt, 0)
} else {
err = e
}
)
const (
- known_hash = "acbd18db4cc2f85cedef654fccc4a4d8"
- known_locator = known_hash + "+3"
- known_token = "hocfupkn2pjhrpgp2vxv8rsku7tvtx49arbc9s4bvu7p7wxqvk"
- known_key = "13u9fkuccnboeewr0ne3mvapk28epf68a3bhj9q8sb4l6e4e5mkk" +
+ knownHash = "acbd18db4cc2f85cedef654fccc4a4d8"
+ knownLocator = knownHash + "+3"
+ knownToken = "hocfupkn2pjhrpgp2vxv8rsku7tvtx49arbc9s4bvu7p7wxqvk"
+ knownKey = "13u9fkuccnboeewr0ne3mvapk28epf68a3bhj9q8sb4l6e4e5mkk" +
"p6nhj2mmpscgu1zze5h5enydxfe3j215024u16ij4hjaiqs5u4pzsl3nczmaoxnc" +
"ljkm4875xqn4xv058koz3vkptmzhyheiy6wzevzjmdvxhvcqsvr5abhl15c2d4o4" +
"jhl0s91lojy1mtrzqqvprqcverls0xvy9vai9t1l1lvvazpuadafm71jl4mrwq2y" +
"gokee3eamvjy8qq1fvy238838enjmy5wzy2md7yvsitp5vztft6j4q866efym7e6" +
"vu5wm9fpnwjyxfldw3vbo01mgjs75rgo7qioh8z8ij7jpyp8508okhgbbex3ceei" +
"786u5rw2a9gx743dj3fgq2irk"
- known_signature = "257f3f5f5f0a4e4626a18fc74bd42ec34dcb228a"
- known_timestamp = "7fffffff"
- known_sig_hint = "+A" + known_signature + "@" + known_timestamp
- known_signed_locator = known_locator + known_sig_hint
+ knownSignature = "257f3f5f5f0a4e4626a18fc74bd42ec34dcb228a"
+ knownTimestamp = "7fffffff"
+ knownSigHint = "+A" + knownSignature + "@" + knownTimestamp
+ knownSignedLocator = knownLocator + knownSigHint
)
func TestSignLocator(t *testing.T) {
- PermissionSecret = []byte(known_key)
+ PermissionSecret = []byte(knownKey)
defer func() { PermissionSecret = nil }()
- if ts, err := ParseHexTimestamp(known_timestamp); err != nil {
- t.Errorf("bad known_timestamp %s", known_timestamp)
+ if ts, err := ParseHexTimestamp(knownTimestamp); err != nil {
+ t.Errorf("bad knownTimestamp %s", knownTimestamp)
} else {
- if known_signed_locator != SignLocator(known_locator, known_token, ts) {
+ if knownSignedLocator != SignLocator(knownLocator, knownToken, ts) {
t.Fail()
}
}
}
func TestVerifySignature(t *testing.T) {
- PermissionSecret = []byte(known_key)
+ PermissionSecret = []byte(knownKey)
defer func() { PermissionSecret = nil }()
- if VerifySignature(known_signed_locator, known_token) != nil {
+ if VerifySignature(knownSignedLocator, knownToken) != nil {
t.Fail()
}
}
func TestVerifySignatureExtraHints(t *testing.T) {
- PermissionSecret = []byte(known_key)
+ PermissionSecret = []byte(knownKey)
defer func() { PermissionSecret = nil }()
- if VerifySignature(known_locator+"+K@xyzzy"+known_sig_hint, known_token) != nil {
+ if VerifySignature(knownLocator+"+K@xyzzy"+knownSigHint, knownToken) != nil {
t.Fatal("Verify cannot handle hint before permission signature")
}
- if VerifySignature(known_locator+known_sig_hint+"+Zfoo", known_token) != nil {
+ if VerifySignature(knownLocator+knownSigHint+"+Zfoo", knownToken) != nil {
t.Fatal("Verify cannot handle hint after permission signature")
}
- if VerifySignature(known_locator+"+K@xyzzy"+known_sig_hint+"+Zfoo", known_token) != nil {
+ if VerifySignature(knownLocator+"+K@xyzzy"+knownSigHint+"+Zfoo", knownToken) != nil {
t.Fatal("Verify cannot handle hints around permission signature")
}
}
// The size hint on the locator string should not affect signature validation.
func TestVerifySignatureWrongSize(t *testing.T) {
- PermissionSecret = []byte(known_key)
+ PermissionSecret = []byte(knownKey)
defer func() { PermissionSecret = nil }()
- if VerifySignature(known_hash+"+999999"+known_sig_hint, known_token) != nil {
+ if VerifySignature(knownHash+"+999999"+knownSigHint, knownToken) != nil {
t.Fatal("Verify cannot handle incorrect size hint")
}
- if VerifySignature(known_hash+known_sig_hint, known_token) != nil {
+ if VerifySignature(knownHash+knownSigHint, knownToken) != nil {
t.Fatal("Verify cannot handle missing size hint")
}
}
func TestVerifySignatureBadSig(t *testing.T) {
- PermissionSecret = []byte(known_key)
+ PermissionSecret = []byte(knownKey)
defer func() { PermissionSecret = nil }()
- bad_locator := known_locator + "+Aaaaaaaaaaaaaaaa@" + known_timestamp
- if VerifySignature(bad_locator, known_token) != PermissionError {
+ badLocator := knownLocator + "+Aaaaaaaaaaaaaaaa@" + knownTimestamp
+ if VerifySignature(badLocator, knownToken) != PermissionError {
t.Fail()
}
}
func TestVerifySignatureBadTimestamp(t *testing.T) {
- PermissionSecret = []byte(known_key)
+ PermissionSecret = []byte(knownKey)
defer func() { PermissionSecret = nil }()
- bad_locator := known_locator + "+A" + known_signature + "@OOOOOOOl"
- if VerifySignature(bad_locator, known_token) != PermissionError {
+ badLocator := knownLocator + "+A" + knownSignature + "@OOOOOOOl"
+ if VerifySignature(badLocator, knownToken) != PermissionError {
t.Fail()
}
}
PermissionSecret = []byte("00000000000000000000")
defer func() { PermissionSecret = nil }()
- if VerifySignature(known_signed_locator, known_token) != PermissionError {
+ if VerifySignature(knownSignedLocator, knownToken) != PermissionError {
t.Fail()
}
}
func TestVerifySignatureBadToken(t *testing.T) {
- PermissionSecret = []byte(known_key)
+ PermissionSecret = []byte(knownKey)
defer func() { PermissionSecret = nil }()
- if VerifySignature(known_signed_locator, "00000000") != PermissionError {
+ if VerifySignature(knownSignedLocator, "00000000") != PermissionError {
t.Fail()
}
}
func TestVerifySignatureExpired(t *testing.T) {
- PermissionSecret = []byte(known_key)
+ PermissionSecret = []byte(knownKey)
defer func() { PermissionSecret = nil }()
yesterday := time.Now().AddDate(0, 0, -1)
- expired_locator := SignLocator(known_hash, known_token, yesterday)
- if VerifySignature(expired_locator, known_token) != ExpiredError {
+ expiredLocator := SignLocator(knownHash, knownToken, yesterday)
+ if VerifySignature(expiredLocator, knownToken) != ExpiredError {
t.Fail()
}
}
"time"
)
-/*
- Keepstore initiates pull worker channel goroutine.
- The channel will process pull list.
- For each (next) pull request:
- For each locator listed, execute Pull on the server(s) listed
- Skip the rest of the servers if no errors
- Repeat
-*/
+// RunPullWorker is used by Keepstore to initiate pull worker channel goroutine.
+// The channel will process pull list.
+// For each (next) pull request:
+// For each locator listed, execute Pull on the server(s) listed
+// Skip the rest of the servers if no errors
+// Repeat
+//
func RunPullWorker(pullq *WorkQueue, keepClient *keepclient.KeepClient) {
nextItem := pullq.NextItem
for item := range nextItem {
pullRequest := item.(PullRequest)
- err := PullItemAndProcess(item.(PullRequest), GenerateRandomApiToken(), keepClient)
+ err := PullItemAndProcess(item.(PullRequest), GenerateRandomAPIToken(), keepClient)
pullq.DoneItem <- struct{}{}
if err == nil {
log.Printf("Pull %s success", pullRequest)
}
}
-/*
- For each Pull request:
- Generate a random API token.
- Generate a permission signature using this token, timestamp ~60 seconds in the future, and desired block hash.
- Using this token & signature, retrieve the given block.
- Write to storage
-*/
+// PullItemAndProcess pulls items from PullQueue and processes them.
+// For each Pull request:
+// Generate a random API token.
+// Generate a permission signature using this token, timestamp ~60 seconds in the future, and desired block hash.
+// Using this token & signature, retrieve the given block.
+// Write to storage
+//
func PullItemAndProcess(pullRequest PullRequest, token string, keepClient *keepclient.KeepClient) (err error) {
keepClient.Arvados.ApiToken = token
- service_roots := make(map[string]string)
+ serviceRoots := make(map[string]string)
for _, addr := range pullRequest.Servers {
- service_roots[addr] = addr
+ serviceRoots[addr] = addr
}
- keepClient.SetServiceRoots(service_roots, nil, nil)
+ keepClient.SetServiceRoots(serviceRoots, nil, nil)
// Generate signature with a random token
- expires_at := time.Now().Add(60 * time.Second)
- signedLocator := SignLocator(pullRequest.Locator, token, expires_at)
+ expiresAt := time.Now().Add(60 * time.Second)
+ signedLocator := SignLocator(pullRequest.Locator, token, expiresAt)
reader, contentLen, _, err := GetContent(signedLocator, keepClient)
if err != nil {
}
defer reader.Close()
- read_content, err := ioutil.ReadAll(reader)
+ readContent, err := ioutil.ReadAll(reader)
if err != nil {
return err
}
- if (read_content == nil) || (int64(len(read_content)) != contentLen) {
+ if (readContent == nil) || (int64(len(readContent)) != contentLen) {
return errors.New(fmt.Sprintf("Content not found for: %s", signedLocator))
}
- err = PutContent(read_content, pullRequest.Locator)
+ err = PutContent(readContent, pullRequest.Locator)
return
}
return reader, blocklen, url, err
}
-const ALPHA_NUMERIC = "0123456789abcdefghijklmnopqrstuvwxyz"
+const alphaNumeric = "0123456789abcdefghijklmnopqrstuvwxyz"
-func GenerateRandomApiToken() string {
+// GenerateRandomAPIToken generates a random api token
+func GenerateRandomAPIToken() string {
var bytes = make([]byte, 36)
rand.Read(bytes)
for i, b := range bytes {
- bytes[i] = ALPHA_NUMERIC[b%byte(len(ALPHA_NUMERIC))]
+ bytes[i] = alphaNumeric[b%byte(len(alphaNumeric))]
}
return (string(bytes))
}
return rdr, int64(len(testData.Content)), "", nil
}
- keepClient.Arvados.ApiToken = GenerateRandomApiToken()
+ keepClient.Arvados.ApiToken = GenerateRandomAPIToken()
err := PullItemAndProcess(pullRequest, keepClient.Arvados.ApiToken, keepClient)
if len(testData.GetError) > 0 {
// When a new pull request arrives, the old one will be overwritten.
// This behavior is verified using these two maps in the
- // "TestPullWorker_pull_list_with_two_items_latest_replacing_old"
+ // "TestPullWorkerPullList_with_two_items_latest_replacing_old"
testPullLists = make(map[string]string)
}
go RunPullWorker(pullq, keepClient)
}
-var first_pull_list = []byte(`[
+var firstPullList = []byte(`[
{
"locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
"servers":[
}
]`)
-var second_pull_list = []byte(`[
+var secondPullList = []byte(`[
{
"locator":"73feffa4b7f6bb68e44cf984c85f6e88+3",
"servers":[
]`)
type PullWorkerTestData struct {
- name string
- req RequestTester
- response_code int
- response_body string
- read_content string
- read_error bool
- put_error bool
+ name string
+ req RequestTester
+ responseCode int
+ responseBody string
+ readContent string
+ readError bool
+ putError bool
}
-func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_two_locators(c *C) {
+func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
defer teardown()
data_manager_token = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
- name: "TestPullWorker_pull_list_with_two_locators",
- req: RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
- response_code: http.StatusOK,
- response_body: "Received 2 pull requests\n",
- read_content: "hello",
- read_error: false,
- put_error: false,
+ name: "TestPullWorkerPullList_with_two_locators",
+ req: RequestTester{"/pull", data_manager_token, "PUT", firstPullList},
+ responseCode: http.StatusOK,
+ responseBody: "Received 2 pull requests\n",
+ readContent: "hello",
+ readError: false,
+ putError: false,
}
performTest(testData, c)
}
-func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_one_locator(c *C) {
+func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
defer teardown()
data_manager_token = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
- name: "TestPullWorker_pull_list_with_one_locator",
- req: RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
- response_code: http.StatusOK,
- response_body: "Received 1 pull requests\n",
- read_content: "hola",
- read_error: false,
- put_error: false,
+ name: "TestPullWorkerPullList_with_one_locator",
+ req: RequestTester{"/pull", data_manager_token, "PUT", secondPullList},
+ responseCode: http.StatusOK,
+ responseBody: "Received 1 pull requests\n",
+ readContent: "hola",
+ readError: false,
+ putError: false,
}
performTest(testData, c)
data_manager_token = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
- name: "TestPullWorker_error_on_get_one_locator",
- req: RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
- response_code: http.StatusOK,
- response_body: "Received 1 pull requests\n",
- read_content: "unused",
- read_error: true,
- put_error: false,
+ name: "TestPullWorker_error_on_get_one_locator",
+ req: RequestTester{"/pull", data_manager_token, "PUT", secondPullList},
+ responseCode: http.StatusOK,
+ responseBody: "Received 1 pull requests\n",
+ readContent: "unused",
+ readError: true,
+ putError: false,
}
performTest(testData, c)
data_manager_token = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
- name: "TestPullWorker_error_on_get_two_locators",
- req: RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
- response_code: http.StatusOK,
- response_body: "Received 2 pull requests\n",
- read_content: "unused",
- read_error: true,
- put_error: false,
+ name: "TestPullWorker_error_on_get_two_locators",
+ req: RequestTester{"/pull", data_manager_token, "PUT", firstPullList},
+ responseCode: http.StatusOK,
+ responseBody: "Received 2 pull requests\n",
+ readContent: "unused",
+ readError: true,
+ putError: false,
}
performTest(testData, c)
data_manager_token = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
- name: "TestPullWorker_error_on_put_one_locator",
- req: RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
- response_code: http.StatusOK,
- response_body: "Received 1 pull requests\n",
- read_content: "hello hello",
- read_error: false,
- put_error: true,
+ name: "TestPullWorker_error_on_put_one_locator",
+ req: RequestTester{"/pull", data_manager_token, "PUT", secondPullList},
+ responseCode: http.StatusOK,
+ responseBody: "Received 1 pull requests\n",
+ readContent: "hello hello",
+ readError: false,
+ putError: true,
}
performTest(testData, c)
data_manager_token = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
- name: "TestPullWorker_error_on_put_two_locators",
- req: RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
- response_code: http.StatusOK,
- response_body: "Received 2 pull requests\n",
- read_content: "hello again",
- read_error: false,
- put_error: true,
+ name: "TestPullWorker_error_on_put_two_locators",
+ req: RequestTester{"/pull", data_manager_token, "PUT", firstPullList},
+ responseCode: http.StatusOK,
+ responseBody: "Received 2 pull requests\n",
+ readContent: "hello again",
+ readError: false,
+ putError: true,
}
performTest(testData, c)
// is used to check that behavior by first putting an item on the queue,
// and then performing the test. Thus the "testPullLists" has two entries;
// however, processedPullLists will see only the newest item in the list.
-func (s *PullWorkerTestSuite) TestPullWorker_pull_list_with_two_items_latest_replacing_old(c *C) {
+func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_items_latest_replacing_old(c *C) {
defer teardown()
var firstInput = []int{1}
data_manager_token = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
- name: "TestPullWorker_pull_list_with_two_items_latest_replacing_old",
- req: RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
- response_code: http.StatusOK,
- response_body: "Received 1 pull requests\n",
- read_content: "hola de nuevo",
- read_error: false,
- put_error: false,
+ name: "TestPullWorkerPullList_with_two_items_latest_replacing_old",
+ req: RequestTester{"/pull", data_manager_token, "PUT", secondPullList},
+ responseCode: http.StatusOK,
+ responseBody: "Received 1 pull requests\n",
+ readContent: "hola de nuevo",
+ readError: false,
+ putError: false,
}
performTest(testData, c)
data_manager_token = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
- name: "TestPullWorker_pull_list_with_two_locators",
- req: RequestTester{"/pull", "invalid_data_manager_token", "PUT", first_pull_list},
- response_code: http.StatusUnauthorized,
- response_body: "Unauthorized\n",
- read_content: "hello",
- read_error: false,
- put_error: false,
+ name: "TestPullWorkerPullList_with_two_locators",
+ req: RequestTester{"/pull", "invalid_data_manager_token", "PUT", firstPullList},
+ responseCode: http.StatusUnauthorized,
+ responseBody: "Unauthorized\n",
+ readContent: "hello",
+ readError: false,
+ putError: false,
}
performTest(testData, c)
defer pullq.Close()
currentTestData = testData
- testPullLists[testData.name] = testData.response_body
+ testPullLists[testData.name] = testData.responseBody
processedPullLists := make(map[string]string)
}(GetContent)
GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(1))
- processedPullLists[testData.name] = testData.response_body
- if testData.read_error {
+ processedPullLists[testData.name] = testData.responseBody
+ if testData.readError {
err = errors.New("Error getting data")
readError = err
return nil, 0, "", err
- } else {
- readContent = testData.read_content
- cb := &ClosingBuffer{bytes.NewBufferString(testData.read_content)}
- var rc io.ReadCloser
- rc = cb
- return rc, int64(len(testData.read_content)), "", nil
}
+ readContent = testData.readContent
+ cb := &ClosingBuffer{bytes.NewBufferString(testData.readContent)}
+ var rc io.ReadCloser
+ rc = cb
+ return rc, int64(len(testData.readContent)), "", nil
}
// Override PutContent to mock PutBlock functionality
defer func(orig func([]byte, string) error) { PutContent = orig }(PutContent)
PutContent = func(content []byte, locator string) (err error) {
- if testData.put_error {
+ if testData.putError {
err = errors.New("Error putting data")
putError = err
return err
- } else {
- putContent = content
- return nil
}
+ putContent = content
+ return nil
}
c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
c.Assert(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
response := IssueRequest(&testData.req)
- c.Assert(response.Code, Equals, testData.response_code)
- c.Assert(response.Body.String(), Equals, testData.response_body)
+ c.Assert(response.Code, Equals, testData.responseCode)
+ c.Assert(response.Body.String(), Equals, testData.responseBody)
expectEqualWithin(c, time.Second, 0, func() interface{} {
st := pullq.Status()
return st.InProgress + st.Queued
})
- if testData.name == "TestPullWorker_pull_list_with_two_items_latest_replacing_old" {
+ if testData.name == "TestPullWorkerPullList_with_two_items_latest_replacing_old" {
c.Assert(len(testPullLists), Equals, 2)
c.Assert(len(processedPullLists), Equals, 1)
c.Assert(testPullLists["Added_before_actual_test_item"], NotNil)
- c.Assert(testPullLists["TestPullWorker_pull_list_with_two_items_latest_replacing_old"], NotNil)
- c.Assert(processedPullLists["TestPullWorker_pull_list_with_two_items_latest_replacing_old"], NotNil)
+ c.Assert(testPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
+ c.Assert(processedPullLists["TestPullWorkerPullList_with_two_items_latest_replacing_old"], NotNil)
} else {
- if testData.response_code == http.StatusOK {
+ if testData.responseCode == http.StatusOK {
c.Assert(len(testPullLists), Equals, 1)
c.Assert(len(processedPullLists), Equals, 1)
c.Assert(testPullLists[testData.name], NotNil)
}
}
- if testData.read_error {
+ if testData.readError {
c.Assert(readError, NotNil)
- } else if testData.response_code == http.StatusOK {
+ } else if testData.responseCode == http.StatusOK {
c.Assert(readError, IsNil)
- c.Assert(readContent, Equals, testData.read_content)
- if testData.put_error {
+ c.Assert(readContent, Equals, testData.readContent)
+ if testData.putError {
c.Assert(putError, NotNil)
} else {
c.Assert(putError, IsNil)
- c.Assert(string(putContent), Equals, testData.read_content)
+ c.Assert(string(putContent), Equals, testData.readContent)
}
}
"time"
)
-/*
- Keepstore initiates trash worker channel goroutine.
- The channel will process trash list.
- For each (next) trash request:
- Delete the block indicated by the trash request Locator
- Repeat
-*/
-
+// RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine.
+// The channel will process trash list.
+// For each (next) trash request:
+// Delete the block indicated by the trash request Locator
+// Repeat
+//
func RunTrashWorker(trashq *WorkQueue) {
for item := range trashq.NextItem {
trashRequest := item.(TrashRequest)
func TestTrashWorkerIntegration_LocatorInVolume1(t *testing.T) {
never_delete = false
testData := TrashWorkerTestData{
- Locator1: TEST_HASH,
- Block1: TEST_BLOCK,
+ Locator1: TestHash,
+ Block1: TestBlock,
- Locator2: TEST_HASH_2,
- Block2: TEST_BLOCK_2,
+ Locator2: TestHash2,
+ Block2: TestBlock2,
CreateData: true,
- DeleteLocator: TEST_HASH, // first locator
+ DeleteLocator: TestHash, // first locator
ExpectLocator1: false,
ExpectLocator2: true,
func TestTrashWorkerIntegration_LocatorInVolume2(t *testing.T) {
never_delete = false
testData := TrashWorkerTestData{
- Locator1: TEST_HASH,
- Block1: TEST_BLOCK,
+ Locator1: TestHash,
+ Block1: TestBlock,
- Locator2: TEST_HASH_2,
- Block2: TEST_BLOCK_2,
+ Locator2: TestHash2,
+ Block2: TestBlock2,
CreateData: true,
- DeleteLocator: TEST_HASH_2, // locator 2
+ DeleteLocator: TestHash2, // locator 2
ExpectLocator1: true,
ExpectLocator2: false,
func TestTrashWorkerIntegration_LocatorInBothVolumes(t *testing.T) {
never_delete = false
testData := TrashWorkerTestData{
- Locator1: TEST_HASH,
- Block1: TEST_BLOCK,
+ Locator1: TestHash,
+ Block1: TestBlock,
- Locator2: TEST_HASH,
- Block2: TEST_BLOCK,
+ Locator2: TestHash,
+ Block2: TestBlock,
CreateData: true,
- DeleteLocator: TEST_HASH,
+ DeleteLocator: TestHash,
ExpectLocator1: false,
ExpectLocator2: false,
func TestTrashWorkerIntegration_MtimeMatchesForLocator1ButNotForLocator2(t *testing.T) {
never_delete = false
testData := TrashWorkerTestData{
- Locator1: TEST_HASH,
- Block1: TEST_BLOCK,
+ Locator1: TestHash,
+ Block1: TestBlock,
- Locator2: TEST_HASH,
- Block2: TEST_BLOCK,
+ Locator2: TestHash,
+ Block2: TestBlock,
CreateData: true,
DifferentMtimes: true,
- DeleteLocator: TEST_HASH,
+ DeleteLocator: TestHash,
ExpectLocator1: true,
ExpectLocator2: false,
func TestTrashWorkerIntegration_TwoDifferentLocatorsInVolume1(t *testing.T) {
never_delete = false
testData := TrashWorkerTestData{
- Locator1: TEST_HASH,
- Block1: TEST_BLOCK,
+ Locator1: TestHash,
+ Block1: TestBlock,
- Locator2: TEST_HASH_2,
- Block2: TEST_BLOCK_2,
+ Locator2: TestHash2,
+ Block2: TestBlock2,
CreateData: true,
CreateInVolume1: true,
- DeleteLocator: TEST_HASH, // locator 1
+ DeleteLocator: TestHash, // locator 1
ExpectLocator1: false,
ExpectLocator2: true,
func TestTrashWorkerIntegration_SameLocatorInTwoVolumesWithDefaultTrashLifeTime(t *testing.T) {
never_delete = false
testData := TrashWorkerTestData{
- Locator1: TEST_HASH,
- Block1: TEST_BLOCK,
+ Locator1: TestHash,
+ Block1: TestBlock,
- Locator2: TEST_HASH_2,
- Block2: TEST_BLOCK_2,
+ Locator2: TestHash2,
+ Block2: TestBlock2,
CreateData: true,
CreateInVolume1: true,
UseTrashLifeTime: true,
- DeleteLocator: TEST_HASH, // locator 1
+ DeleteLocator: TestHash, // locator 1
// Since trash life time is in effect, block won't be deleted.
ExpectLocator1: true,
func TestTrashWorkerIntegration_NeverDelete(t *testing.T) {
never_delete = true
testData := TrashWorkerTestData{
- Locator1: TEST_HASH,
- Block1: TEST_BLOCK,
+ Locator1: TestHash,
+ Block1: TestBlock,
- Locator2: TEST_HASH,
- Block2: TEST_BLOCK,
+ Locator2: TestHash,
+ Block2: TestBlock,
CreateData: true,
- DeleteLocator: TEST_HASH,
+ DeleteLocator: TestHash,
ExpectLocator1: true,
ExpectLocator2: true,
expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.Status().InProgress })
// Verify Locator1 to be un/deleted as expected
- data, _ := GetBlock(testData.Locator1, false)
+ data, _ := GetBlock(testData.Locator1)
if testData.ExpectLocator1 {
if len(data) == 0 {
t.Errorf("Expected Locator1 to be still present: %s", testData.Locator1)
// Verify Locator2 to be un/deleted as expected
if testData.Locator1 != testData.Locator2 {
- data, _ = GetBlock(testData.Locator2, false)
+ data, _ = GetBlock(testData.Locator2)
if testData.ExpectLocator2 {
if len(data) == 0 {
t.Errorf("Expected Locator2 to be still present: %s", testData.Locator2)
-// A Volume is an interface representing a Keep back-end storage unit:
-// for example, a single mounted disk, a RAID array, an Amazon S3 volume,
-// etc.
-
package main
import (
"time"
)
+// A Volume is an interface representing a Keep back-end storage unit:
+// for example, a single mounted disk, a RAID array, an Amazon S3 volume,
+// etc.
type Volume interface {
// Get a block. IFF the returned error is nil, the caller must
// put the returned slice back into the buffer pool when it's
- // finished with it.
+ // finished with it. (Otherwise, the buffer pool will be
+ // depleted and eventually -- when all available buffers are
+ // used and not returned -- operations will reach deadlock.)
+ //
+ // loc is guaranteed to consist of 32 or more lowercase hex
+ // digits.
+ //
+ // Get should not verify the integrity of the returned data:
+ // it should just return whatever was found in its backing
+ // store. (Integrity checking is the caller's responsibility.)
+ //
+ // If an error is encountered that prevents it from
+ // retrieving the data, that error should be returned so the
+ // caller can log (and send to the client) a more useful
+ // message.
+ //
+ // If the error is "not found", and there's no particular
+ // reason to expect the block to be found (other than that a
+ // caller is asking for it), the returned error should satisfy
+ // os.IsNotExist(err): this is a normal condition and will not
+ // be logged as an error (except that a 404 will appear in the
+ // access log if the block is not found on any other volumes
+ // either).
+ //
+ // If the data in the backing store is bigger than BlockSize,
+ // Get is permitted to return an error without reading any of
+ // the data.
Get(loc string) ([]byte, error)
+
+ // Compare the given data with the stored data (i.e., what Get
+ // would return). If equal, return nil. If not, return
+ // CollisionError or DiskHashError (depending on whether the
+ // data on disk matches the expected hash), or whatever error
+ // was encountered opening/reading the stored data.
+ Compare(loc string, data []byte) error
+
+ // Put writes a block to an underlying storage device.
+ //
+ // loc is as described in Get.
+ //
+ // len(block) is guaranteed to be between 0 and BlockSize.
+ //
+ // If a block is already stored under the same name (loc) with
+ // different content, Put must either overwrite the existing
+ // data with the new data or return a non-nil error. When
+ // overwriting existing data, it must never leave the storage
+ // device in an inconsistent state: a subsequent call to Get
+ // must return either the entire old block, the entire new
+ // block, or an error. (An implementation that cannot peform
+ // atomic updates must leave the old data alone and return an
+ // error.)
+ //
+ // Put also sets the timestamp for the given locator to the
+ // current time.
+ //
+ // Put must return a non-nil error unless it can guarantee
+ // that the entire block has been written and flushed to
+ // persistent storage, and that its timestamp is current. Of
+ // course, this guarantee is only as good as the underlying
+ // storage device, but it is Put's responsibility to at least
+ // get whatever guarantee is offered by the storage device.
+ //
+ // Put should not verify that loc==hash(block): this is the
+ // caller's responsibility.
Put(loc string, block []byte) error
+
+ // Touch sets the timestamp for the given locator to the
+ // current time.
+ //
+ // loc is as described in Get.
+ //
+ // If invoked at time t0, Touch must guarantee that a
+ // subsequent call to Mtime will return a timestamp no older
+ // than {t0 minus one second}. For example, if Touch is called
+ // at 2015-07-07T01:23:45.67890123Z, it is acceptable for a
+ // subsequent Mtime to return any of the following:
+ //
+ // - 2015-07-07T01:23:45.00000000Z
+ // - 2015-07-07T01:23:45.67890123Z
+ // - 2015-07-07T01:23:46.67890123Z
+ // - 2015-07-08T00:00:00.00000000Z
+ //
+ // It is not acceptable for a subsequente Mtime to return
+ // either of the following:
+ //
+ // - 2015-07-07T00:00:00.00000000Z -- ERROR
+ // - 2015-07-07T01:23:44.00000000Z -- ERROR
+ //
+ // Touch must return a non-nil error if the timestamp cannot
+ // be updated.
Touch(loc string) error
+
+ // Mtime returns the stored timestamp for the given locator.
+ //
+ // loc is as described in Get.
+ //
+ // Mtime must return a non-nil error if the given block is not
+ // found or the timestamp could not be retrieved.
Mtime(loc string) (time.Time, error)
+
+ // IndexTo writes a complete list of locators with the given
+ // prefix for which Get() can retrieve data.
+ //
+ // prefix consists of zero or more lowercase hexadecimal
+ // digits.
+ //
+ // Each locator must be written to the given writer using the
+ // following format:
+ //
+ // loc "+" size " " timestamp "\n"
+ //
+ // where:
+ //
+ // - size is the number of bytes of content, given as a
+ // decimal number with one or more digits
+ //
+ // - timestamp is the timestamp stored for the locator,
+ // given as a decimal number of seconds after January 1,
+ // 1970 UTC.
+ //
+ // IndexTo must not write any other data to writer: for
+ // example, it must not write any blank lines.
+ //
+ // If an error makes it impossible to provide a complete
+ // index, IndexTo must return a non-nil error. It is
+ // acceptable to return a non-nil error after writing a
+ // partial index to writer.
+ //
+ // The resulting index is not expected to be sorted in any
+ // particular order.
IndexTo(prefix string, writer io.Writer) error
+
+ // Delete deletes the block data from the underlying storage
+ // device.
+ //
+ // loc is as described in Get.
+ //
+ // If the timestamp for the given locator is newer than
+ // blob_signature_ttl, Delete must not delete the data.
+ //
+ // If a Delete operation overlaps with any Touch or Put
+ // operations on the same locator, the implementation must
+ // ensure one of the following outcomes:
+ //
+ // - Touch and Put return a non-nil error, or
+ // - Delete does not delete the block, or
+ // - Both of the above.
+ //
+ // If it is possible for the storage device to be accessed by
+ // a different process or host, the synchronization mechanism
+ // should also guard against races with other processes and
+ // hosts. If such a mechanism is not available, there must be
+ // a mechanism for detecting unsafe configurations, alerting
+ // the operator, and aborting or falling back to a read-only
+ // state. In other words, running multiple keepstore processes
+ // with the same underlying storage device must either work
+ // reliably or fail outright.
+ //
+ // Corollary: A successful Touch or Put guarantees a block
+ // will not be deleted for at least blob_signature_ttl
+ // seconds.
Delete(loc string) error
+
+ // Status returns a *VolumeStatus representing the current
+ // in-use and available storage capacity and an
+ // implementation-specific volume identifier (e.g., "mount
+ // point" for a UnixVolume).
Status() *VolumeStatus
+
+ // String returns an identifying label for this volume,
+ // suitable for including in log messages. It should contain
+ // enough information to uniquely identify the underlying
+ // storage device, but should not contain any credentials or
+ // secrets.
String() string
+
+ // Writable returns false if all future Put, Mtime, and Delete
+ // calls are expected to fail.
+ //
+ // If the volume is only temporarily unwritable -- or if Put
+ // will fail because it is full, but Mtime or Delete can
+ // succeed -- then Writable should return false.
Writable() bool
}
type VolumeManager interface {
// AllReadable returns all volumes.
AllReadable() []Volume
+
// AllWritable returns all volumes that aren't known to be in
// a read-only state. (There is no guarantee that a write to
// one will succeed, though.)
AllWritable() []Volume
+
// NextWritable returns the volume where the next new block
// should be written. A VolumeManager can select a volume in
// order to distribute activity across spindles, fill up disks
// with more free space, etc.
NextWritable() Volume
+
// Close shuts down the volume manager cleanly.
Close()
}
+// RRVolumeManager is a round-robin VolumeManager: the Nth call to
+// NextWritable returns the (N % len(writables))th writable Volume
+// (where writables are all Volumes v where v.Writable()==true).
type RRVolumeManager struct {
readables []Volume
writables []Volume
counter uint32
}
+// MakeRRVolumeManager initializes RRVolumeManager
func MakeRRVolumeManager(volumes []Volume) *RRVolumeManager {
vm := &RRVolumeManager{}
for _, v := range volumes {
return vm
}
+// AllReadable returns an array of all readable volumes
func (vm *RRVolumeManager) AllReadable() []Volume {
return vm.readables
}
+// AllWritable returns an array of all writable volumes
func (vm *RRVolumeManager) AllWritable() []Volume {
return vm.writables
}
+// NextWritable returns the next writable
func (vm *RRVolumeManager) NextWritable() Volume {
if len(vm.writables) == 0 {
return nil
return vm.writables[i%uint32(len(vm.writables))]
}
+// Close the RRVolumeManager
func (vm *RRVolumeManager) Close() {
}
+
+// VolumeStatus provides status information of the volume consisting of:
+// * mount_point
+// * device_num (an integer identifying the underlying storage system)
+// * bytes_free
+// * bytes_used
+type VolumeStatus struct {
+ MountPoint string `json:"mount_point"`
+ DeviceNum uint64 `json:"device_num"`
+ BytesFree uint64 `json:"bytes_free"`
+ BytesUsed uint64 `json:"bytes_used"`
+}
--- /dev/null
+package main
+
+import (
+ "bytes"
+ "os"
+ "regexp"
+ "sort"
+ "strings"
+ "testing"
+ "time"
+)
+
+// A TestableVolumeFactory returns a new TestableVolume. The factory
+// function, and the TestableVolume it returns, can use "t" to write
+// logs, fail the current test, etc.
+type TestableVolumeFactory func(t *testing.T) TestableVolume
+
+// DoGenericVolumeTests runs a set of tests that every TestableVolume
+// is expected to pass. It calls factory to create a new TestableVolume
+// for each test case, to avoid leaking state between tests.
+func DoGenericVolumeTests(t *testing.T, factory TestableVolumeFactory) {
+ testGet(t, factory)
+ testGetNoSuchBlock(t, factory)
+
+ testCompareSameContent(t, factory)
+ testCompareWithDifferentContent(t, factory)
+ testCompareWithBadData(t, factory)
+
+ testPutBlockWithSameContent(t, factory)
+ testPutBlockWithDifferentContent(t, factory)
+ testPutMultipleBlocks(t, factory)
+
+ testPutAndTouch(t, factory)
+ testTouchNoSuchBlock(t, factory)
+
+ testMtimeNoSuchBlock(t, factory)
+
+ testIndexTo(t, factory)
+
+ testDeleteNewBlock(t, factory)
+ testDeleteOldBlock(t, factory)
+ testDeleteNoSuchBlock(t, factory)
+
+ testStatus(t, factory)
+
+ testString(t, factory)
+
+ testUpdateReadOnly(t, factory)
+
+ testGetConcurrent(t, factory)
+ testPutConcurrent(t, factory)
+}
+
+// Put a test block, get it and verify content
+// Test should pass for both writable and read-only volumes
+func testGet(t *testing.T, factory TestableVolumeFactory) {
+ v := factory(t)
+ defer v.Teardown()
+
+ v.PutRaw(TestHash, TestBlock)
+
+ buf, err := v.Get(TestHash)
+ if err != nil {
+ t.Error(err)
+ }
+
+ bufs.Put(buf)
+
+ if bytes.Compare(buf, TestBlock) != 0 {
+ t.Errorf("expected %s, got %s", string(TestBlock), string(buf))
+ }
+}
+
+// Invoke get on a block that does not exist in volume; should result in error
+// Test should pass for both writable and read-only volumes
+func testGetNoSuchBlock(t *testing.T, factory TestableVolumeFactory) {
+ v := factory(t)
+ defer v.Teardown()
+
+ if _, err := v.Get(TestHash2); err == nil {
+ t.Errorf("Expected error while getting non-existing block %v", TestHash2)
+ }
+}
+
+// Put a test block and compare the locator with same content
+// Test should pass for both writable and read-only volumes
+func testCompareSameContent(t *testing.T, factory TestableVolumeFactory) {
+ v := factory(t)
+ defer v.Teardown()
+
+ v.PutRaw(TestHash, TestBlock)
+
+ // Compare the block locator with same content
+ err := v.Compare(TestHash, TestBlock)
+ if err != nil {
+ t.Errorf("Got err %q, expected nil", err)
+ }
+}
+
+// Put a test block and compare the locator with a different content
+// Expect error due to collision
+// Test should pass for both writable and read-only volumes
+func testCompareWithDifferentContent(t *testing.T, factory TestableVolumeFactory) {
+ v := factory(t)
+ defer v.Teardown()
+
+ v.PutRaw(TestHash, TestBlock)
+
+ // Compare the block locator with different content; collision
+ err := v.Compare(TestHash, []byte("baddata"))
+ if err == nil {
+ t.Errorf("Expected error due to collision")
+ }
+}
+
+// Put a test block with bad data (hash does not match, but Put does not verify)
+// Compare the locator with good data whose hash matches with locator
+// Expect error due to corruption.
+// Test should pass for both writable and read-only volumes
+func testCompareWithBadData(t *testing.T, factory TestableVolumeFactory) {
+ v := factory(t)
+ defer v.Teardown()
+
+ v.PutRaw(TestHash, []byte("baddata"))
+
+ err := v.Compare(TestHash, TestBlock)
+ if err == nil {
+ t.Errorf("Expected error due to corruption")
+ }
+}
+
+// Put a block and put again with same content
+// Test is intended for only writable volumes
+func testPutBlockWithSameContent(t *testing.T, factory TestableVolumeFactory) {
+ v := factory(t)
+ defer v.Teardown()
+
+ if v.Writable() == false {
+ return
+ }
+
+ err := v.Put(TestHash, TestBlock)
+ if err != nil {
+ t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
+ }
+
+ err = v.Put(TestHash, TestBlock)
+ if err != nil {
+ t.Errorf("Got err putting block second time %q: %q, expected nil", TestBlock, err)
+ }
+}
+
+// Put a block and put again with different content
+// Test is intended for only writable volumes
+func testPutBlockWithDifferentContent(t *testing.T, factory TestableVolumeFactory) {
+ v := factory(t)
+ defer v.Teardown()
+
+ if v.Writable() == false {
+ return
+ }
+
+ err := v.Put(TestHash, TestBlock)
+ if err != nil {
+ t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
+ }
+
+ putErr := v.Put(TestHash, TestBlock2)
+ buf, getErr := v.Get(TestHash)
+ if putErr == nil {
+ // Put must not return a nil error unless it has
+ // overwritten the existing data.
+ if bytes.Compare(buf, TestBlock2) != 0 {
+ t.Errorf("Put succeeded but Get returned %+v, expected %+v", buf, TestBlock2)
+ }
+ } else {
+ // It is permissible for Put to fail, but it must
+ // leave us with either the original data, the new
+ // data, or nothing at all.
+ if getErr == nil && bytes.Compare(buf, TestBlock) != 0 && bytes.Compare(buf, TestBlock2) != 0 {
+ t.Errorf("Put failed but Get returned %+v, which is neither %+v nor %+v", buf, TestBlock, TestBlock2)
+ }
+ }
+ if getErr == nil {
+ bufs.Put(buf)
+ }
+}
+
+// Put and get multiple blocks
+// Test is intended for only writable volumes
+func testPutMultipleBlocks(t *testing.T, factory TestableVolumeFactory) {
+ v := factory(t)
+ defer v.Teardown()
+
+ if v.Writable() == false {
+ return
+ }
+
+ err := v.Put(TestHash, TestBlock)
+ if err != nil {
+ t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
+ }
+
+ err = v.Put(TestHash2, TestBlock2)
+ if err != nil {
+ t.Errorf("Got err putting block %q: %q, expected nil", TestBlock2, err)
+ }
+
+ err = v.Put(TestHash3, TestBlock3)
+ if err != nil {
+ t.Errorf("Got err putting block %q: %q, expected nil", TestBlock3, err)
+ }
+
+ data, err := v.Get(TestHash)
+ if err != nil {
+ t.Error(err)
+ } else if bytes.Compare(data, TestBlock) != 0 {
+ t.Errorf("Block present, but content is incorrect: Expected: %v Found: %v", data, TestBlock)
+ }
+ bufs.Put(data)
+
+ data, err = v.Get(TestHash2)
+ if err != nil {
+ t.Error(err)
+ } else if bytes.Compare(data, TestBlock2) != 0 {
+ t.Errorf("Block present, but content is incorrect: Expected: %v Found: %v", data, TestBlock2)
+ }
+ bufs.Put(data)
+
+ data, err = v.Get(TestHash3)
+ if err != nil {
+ t.Error(err)
+ } else if bytes.Compare(data, TestBlock3) != 0 {
+ t.Errorf("Block present, but content is incorrect: Expected: %v Found: %v", data, TestBlock3)
+ }
+ bufs.Put(data)
+}
+
+// testPutAndTouch
+// Test that when applying PUT to a block that already exists,
+// the block's modification time is updated.
+// Test is intended for only writable volumes
+func testPutAndTouch(t *testing.T, factory TestableVolumeFactory) {
+ v := factory(t)
+ defer v.Teardown()
+
+ if v.Writable() == false {
+ return
+ }
+
+ if err := v.Put(TestHash, TestBlock); err != nil {
+ t.Error(err)
+ }
+
+ // We'll verify { t0 < threshold < t1 }, where t0 is the
+ // existing block's timestamp on disk before Put() and t1 is
+ // its timestamp after Put().
+ threshold := time.Now().Add(-time.Second)
+
+ // Set the stored block's mtime far enough in the past that we
+ // can see the difference between "timestamp didn't change"
+ // and "timestamp granularity is too low".
+ v.TouchWithDate(TestHash, time.Now().Add(-20*time.Second))
+
+ // Make sure v.Mtime() agrees the above Utime really worked.
+ if t0, err := v.Mtime(TestHash); err != nil || t0.IsZero() || !t0.Before(threshold) {
+ t.Errorf("Setting mtime failed: %v, %v", t0, err)
+ }
+
+ // Write the same block again.
+ if err := v.Put(TestHash, TestBlock); err != nil {
+ t.Error(err)
+ }
+
+ // Verify threshold < t1
+ if t1, err := v.Mtime(TestHash); err != nil {
+ t.Error(err)
+ } else if t1.Before(threshold) {
+ t.Errorf("t1 %v should be >= threshold %v after v.Put ", t1, threshold)
+ }
+}
+
+// Touching a non-existing block should result in error.
+// Test should pass for both writable and read-only volumes
+func testTouchNoSuchBlock(t *testing.T, factory TestableVolumeFactory) {
+ v := factory(t)
+ defer v.Teardown()
+
+ if err := v.Touch(TestHash); err == nil {
+ t.Error("Expected error when attempted to touch a non-existing block")
+ }
+}
+
+// Invoking Mtime on a non-existing block should result in error.
+// Test should pass for both writable and read-only volumes
+func testMtimeNoSuchBlock(t *testing.T, factory TestableVolumeFactory) {
+ v := factory(t)
+ defer v.Teardown()
+
+ if _, err := v.Mtime("12345678901234567890123456789012"); err == nil {
+ t.Error("Expected error when updating Mtime on a non-existing block")
+ }
+}
+
+// Put a few blocks and invoke IndexTo with:
+// * no prefix
+// * with a prefix
+// * with no such prefix
+// Test should pass for both writable and read-only volumes
+func testIndexTo(t *testing.T, factory TestableVolumeFactory) {
+ v := factory(t)
+ defer v.Teardown()
+
+ v.PutRaw(TestHash, TestBlock)
+ v.PutRaw(TestHash2, TestBlock2)
+ v.PutRaw(TestHash3, TestBlock3)
+
+ buf := new(bytes.Buffer)
+ v.IndexTo("", buf)
+ indexRows := strings.Split(string(buf.Bytes()), "\n")
+ sort.Strings(indexRows)
+ sortedIndex := strings.Join(indexRows, "\n")
+ m, err := regexp.MatchString(
+ `^\n`+TestHash+`\+\d+ \d+\n`+
+ TestHash3+`\+\d+ \d+\n`+
+ TestHash2+`\+\d+ \d+$`,
+ sortedIndex)
+ if err != nil {
+ t.Error(err)
+ } else if !m {
+ t.Errorf("Got index %q for empty prefix", sortedIndex)
+ }
+
+ for _, prefix := range []string{"f", "f15", "f15ac"} {
+ buf = new(bytes.Buffer)
+ v.IndexTo(prefix, buf)
+
+ m, err := regexp.MatchString(`^`+TestHash2+`\+\d+ \d+\n$`, string(buf.Bytes()))
+ if err != nil {
+ t.Error(err)
+ } else if !m {
+ t.Errorf("Got index %q for prefix %s", string(buf.Bytes()), prefix)
+ }
+ }
+
+ for _, prefix := range []string{"zero", "zip", "zilch"} {
+ buf = new(bytes.Buffer)
+ v.IndexTo(prefix, buf)
+ if err != nil {
+ t.Errorf("Got error on IndexTo with no such prefix %v", err.Error())
+ } else if buf.Len() != 0 {
+ t.Errorf("Expected empty list for IndexTo with no such prefix %s", prefix)
+ }
+ }
+}
+
+// Calling Delete() for a block immediately after writing it (not old enough)
+// should neither delete the data nor return an error.
+// Test is intended for only writable volumes
+func testDeleteNewBlock(t *testing.T, factory TestableVolumeFactory) {
+ v := factory(t)
+ defer v.Teardown()
+
+ if v.Writable() == false {
+ return
+ }
+
+ v.Put(TestHash, TestBlock)
+
+ if err := v.Delete(TestHash); err != nil {
+ t.Error(err)
+ }
+ data, err := v.Get(TestHash)
+ if err != nil {
+ t.Error(err)
+ } else if bytes.Compare(data, TestBlock) != 0 {
+ t.Error("Block still present, but content is incorrect: %+v != %+v", data, TestBlock)
+ }
+ bufs.Put(data)
+}
+
+// Calling Delete() for a block with a timestamp older than
+// blob_signature_ttl seconds in the past should delete the data.
+// Test is intended for only writable volumes
+func testDeleteOldBlock(t *testing.T, factory TestableVolumeFactory) {
+ v := factory(t)
+ defer v.Teardown()
+
+ if v.Writable() == false {
+ return
+ }
+
+ v.Put(TestHash, TestBlock)
+ v.TouchWithDate(TestHash, time.Now().Add(-2*blob_signature_ttl*time.Second))
+
+ if err := v.Delete(TestHash); err != nil {
+ t.Error(err)
+ }
+ if _, err := v.Get(TestHash); err == nil || !os.IsNotExist(err) {
+ t.Errorf("os.IsNotExist(%v) should have been true", err.Error())
+ }
+}
+
+// Calling Delete() for a block that does not exist should result in error.
+// Test should pass for both writable and read-only volumes
+func testDeleteNoSuchBlock(t *testing.T, factory TestableVolumeFactory) {
+ v := factory(t)
+ defer v.Teardown()
+
+ if err := v.Delete(TestHash2); err == nil {
+ t.Errorf("Expected error when attempting to delete a non-existing block")
+ }
+}
+
+// Invoke Status and verify that VolumeStatus is returned
+// Test should pass for both writable and read-only volumes
+func testStatus(t *testing.T, factory TestableVolumeFactory) {
+ v := factory(t)
+ defer v.Teardown()
+
+ // Get node status and make a basic sanity check.
+ status := v.Status()
+ if status.DeviceNum == 0 {
+ t.Errorf("uninitialized device_num in %v", status)
+ }
+
+ if status.BytesFree == 0 {
+ t.Errorf("uninitialized bytes_free in %v", status)
+ }
+
+ if status.BytesUsed == 0 {
+ t.Errorf("uninitialized bytes_used in %v", status)
+ }
+}
+
+// Invoke String for the volume; expect non-empty result
+// Test should pass for both writable and read-only volumes
+func testString(t *testing.T, factory TestableVolumeFactory) {
+ v := factory(t)
+ defer v.Teardown()
+
+ if id := v.String(); len(id) == 0 {
+ t.Error("Got empty string for v.String()")
+ }
+}
+
+// Putting, updating, touching, and deleting blocks from a read-only volume result in error.
+// Test is intended for only read-only volumes
+func testUpdateReadOnly(t *testing.T, factory TestableVolumeFactory) {
+ v := factory(t)
+ defer v.Teardown()
+
+ if v.Writable() == true {
+ return
+ }
+
+ v.PutRaw(TestHash, TestBlock)
+
+ // Get from read-only volume should succeed
+ _, err := v.Get(TestHash)
+ if err != nil {
+ t.Errorf("got err %v, expected nil", err)
+ }
+
+ // Put a new block to read-only volume should result in error
+ err = v.Put(TestHash2, TestBlock2)
+ if err == nil {
+ t.Errorf("Expected error when putting block in a read-only volume")
+ }
+ _, err = v.Get(TestHash2)
+ if err == nil {
+ t.Errorf("Expected error when getting block whose put in read-only volume failed")
+ }
+
+ // Touch a block in read-only volume should result in error
+ err = v.Touch(TestHash)
+ if err == nil {
+ t.Errorf("Expected error when touching block in a read-only volume")
+ }
+
+ // Delete a block from a read-only volume should result in error
+ err = v.Delete(TestHash)
+ if err == nil {
+ t.Errorf("Expected error when deleting block from a read-only volume")
+ }
+
+ // Overwriting an existing block in read-only volume should result in error
+ err = v.Put(TestHash, TestBlock)
+ if err == nil {
+ t.Errorf("Expected error when putting block in a read-only volume")
+ }
+}
+
+// Launch concurrent Gets
+// Test should pass for both writable and read-only volumes
+func testGetConcurrent(t *testing.T, factory TestableVolumeFactory) {
+ v := factory(t)
+ defer v.Teardown()
+
+ v.PutRaw(TestHash, TestBlock)
+ v.PutRaw(TestHash2, TestBlock2)
+ v.PutRaw(TestHash3, TestBlock3)
+
+ sem := make(chan int)
+ go func(sem chan int) {
+ buf, err := v.Get(TestHash)
+ if err != nil {
+ t.Errorf("err1: %v", err)
+ }
+ bufs.Put(buf)
+ if bytes.Compare(buf, TestBlock) != 0 {
+ t.Errorf("buf should be %s, is %s", string(TestBlock), string(buf))
+ }
+ sem <- 1
+ }(sem)
+
+ go func(sem chan int) {
+ buf, err := v.Get(TestHash2)
+ if err != nil {
+ t.Errorf("err2: %v", err)
+ }
+ bufs.Put(buf)
+ if bytes.Compare(buf, TestBlock2) != 0 {
+ t.Errorf("buf should be %s, is %s", string(TestBlock2), string(buf))
+ }
+ sem <- 1
+ }(sem)
+
+ go func(sem chan int) {
+ buf, err := v.Get(TestHash3)
+ if err != nil {
+ t.Errorf("err3: %v", err)
+ }
+ bufs.Put(buf)
+ if bytes.Compare(buf, TestBlock3) != 0 {
+ t.Errorf("buf should be %s, is %s", string(TestBlock3), string(buf))
+ }
+ sem <- 1
+ }(sem)
+
+ // Wait for all goroutines to finish
+ for done := 0; done < 3; {
+ done += <-sem
+ }
+}
+
+// Launch concurrent Puts
+// Test is intended for only writable volumes
+func testPutConcurrent(t *testing.T, factory TestableVolumeFactory) {
+ v := factory(t)
+ defer v.Teardown()
+
+ if v.Writable() == false {
+ return
+ }
+
+ sem := make(chan int)
+ go func(sem chan int) {
+ err := v.Put(TestHash, TestBlock)
+ if err != nil {
+ t.Errorf("err1: %v", err)
+ }
+ sem <- 1
+ }(sem)
+
+ go func(sem chan int) {
+ err := v.Put(TestHash2, TestBlock2)
+ if err != nil {
+ t.Errorf("err2: %v", err)
+ }
+ sem <- 1
+ }(sem)
+
+ go func(sem chan int) {
+ err := v.Put(TestHash3, TestBlock3)
+ if err != nil {
+ t.Errorf("err3: %v", err)
+ }
+ sem <- 1
+ }(sem)
+
+ // Wait for all goroutines to finish
+ for done := 0; done < 3; {
+ done += <-sem
+ }
+
+ // Double check that we actually wrote the blocks we expected to write.
+ buf, err := v.Get(TestHash)
+ if err != nil {
+ t.Errorf("Get #1: %v", err)
+ }
+ bufs.Put(buf)
+ if bytes.Compare(buf, TestBlock) != 0 {
+ t.Errorf("Get #1: expected %s, got %s", string(TestBlock), string(buf))
+ }
+
+ buf, err = v.Get(TestHash2)
+ if err != nil {
+ t.Errorf("Get #2: %v", err)
+ }
+ bufs.Put(buf)
+ if bytes.Compare(buf, TestBlock2) != 0 {
+ t.Errorf("Get #2: expected %s, got %s", string(TestBlock2), string(buf))
+ }
+
+ buf, err = v.Get(TestHash3)
+ if err != nil {
+ t.Errorf("Get #3: %v", err)
+ }
+ bufs.Put(buf)
+ if bytes.Compare(buf, TestBlock3) != 0 {
+ t.Errorf("Get #3: expected %s, got %s", string(TestBlock3), string(buf))
+ }
+}
package main
import (
+ "bytes"
+ "crypto/md5"
"errors"
"fmt"
"io"
"time"
)
+// A TestableVolume allows test suites to manipulate the state of an
+// underlying Volume, in order to test behavior in cases that are
+// impractical to achieve with a sequence of normal Volume operations.
+type TestableVolume interface {
+ Volume
+ // [Over]write content for a locator with the given data,
+ // bypassing all constraints like readonly and serialize.
+ PutRaw(locator string, data []byte)
+
+ // Specify the value Mtime() should return, until the next
+ // call to Touch, TouchWithDate, or Put.
+ TouchWithDate(locator string, lastPut time.Time)
+
+ // Clean up, delete temporary files.
+ Teardown()
+}
+
// MockVolumes are test doubles for Volumes, used to test handlers.
type MockVolume struct {
Store map[string][]byte
Timestamps map[string]time.Time
+
// Bad volumes return an error for every operation.
Bad bool
+
// Touchable volumes' Touch() method succeeds for a locator
// that has been Put().
Touchable bool
+
// Readonly volumes return an error for Put, Delete, and
// Touch.
Readonly bool
+
// Gate is a "starting gate", allowing test cases to pause
// volume operations long enough to inspect state. Every
// operation (except Status) starts by receiving from
// channel unblocks all operations. By default, Gate is a
// closed channel, so all operations proceed without
// blocking. See trash_worker_test.go for an example.
- Gate chan struct{}
+ Gate chan struct{}
+
called map[string]int
mutex sync.Mutex
}
func (v *MockVolume) CallCount(method string) int {
v.mutex.Lock()
defer v.mutex.Unlock()
- if c, ok := v.called[method]; !ok {
+ c, ok := v.called[method]
+ if !ok {
return 0
- } else {
- return c
}
+ return c
}
func (v *MockVolume) gotCall(method string) {
}
}
+func (v *MockVolume) Compare(loc string, buf []byte) error {
+ v.gotCall("Compare")
+ <-v.Gate
+ if v.Bad {
+ return errors.New("Bad volume")
+ } else if block, ok := v.Store[loc]; ok {
+ if fmt.Sprintf("%x", md5.Sum(block)) != loc {
+ return DiskHashError
+ }
+ if bytes.Compare(buf, block) != 0 {
+ return CollisionError
+ }
+ return nil
+ } else {
+ return NotFoundError
+ }
+}
+
func (v *MockVolume) Get(loc string) ([]byte, error) {
v.gotCall("Get")
<-v.Gate
-// A UnixVolume is a Volume backed by a locally mounted disk.
-//
package main
import (
+ "bytes"
"fmt"
"io"
"io/ioutil"
// A UnixVolume stores and retrieves blocks in a local directory.
type UnixVolume struct {
- root string // path to the volume's root directory
- serialize bool
- readonly bool
- mutex sync.Mutex
+ // path to the volume's root directory
+ root string
+ // something to lock during IO, typically a sync.Mutex (or nil
+ // to skip locking)
+ locker sync.Locker
+ readonly bool
}
+// Touch sets the timestamp for the given locator to the current time
func (v *UnixVolume) Touch(loc string) error {
if v.readonly {
return MethodDisabledError
return err
}
defer f.Close()
- if v.serialize {
- v.mutex.Lock()
- defer v.mutex.Unlock()
+ if v.locker != nil {
+ v.locker.Lock()
+ defer v.locker.Unlock()
}
if e := lockfile(f); e != nil {
return e
return syscall.Utime(p, &utime)
}
+// Mtime returns the stored timestamp for the given locator.
func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
p := v.blockPath(loc)
- if fi, err := os.Stat(p); err != nil {
+ fi, err := os.Stat(p)
+ if err != nil {
return time.Time{}, err
- } else {
- return fi.ModTime(), nil
}
+ return fi.ModTime(), nil
+}
+
+// Lock the locker (if one is in use), open the file for reading, and
+// call the given function if and when the file is ready to read.
+func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
+ if v.locker != nil {
+ v.locker.Lock()
+ defer v.locker.Unlock()
+ }
+ f, err := os.Open(path)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+ return fn(f)
+}
+
+// stat is os.Stat() with some extra sanity checks.
+func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
+ stat, err := os.Stat(path)
+ if err == nil {
+ if stat.Size() < 0 {
+ err = os.ErrInvalid
+ } else if stat.Size() > BlockSize {
+ err = TooLongError
+ }
+ }
+ return stat, err
}
// Get retrieves a block identified by the locator string "loc", and
// returns its contents as a byte slice.
//
-// If the block could not be found, opened, or read, Get returns a nil
-// slice and whatever non-nil error was returned by Stat or ReadFile.
+// Get returns a nil buffer IFF it returns a non-nil error.
func (v *UnixVolume) Get(loc string) ([]byte, error) {
path := v.blockPath(loc)
- stat, err := os.Stat(path)
- if err != nil {
- return nil, err
- }
- if stat.Size() < 0 {
- return nil, os.ErrInvalid
- } else if stat.Size() == 0 {
- return bufs.Get(0), nil
- } else if stat.Size() > BLOCKSIZE {
- return nil, TooLongError
- }
- f, err := os.Open(path)
+ stat, err := v.stat(path)
if err != nil {
return nil, err
}
- defer f.Close()
buf := bufs.Get(int(stat.Size()))
- if v.serialize {
- v.mutex.Lock()
- defer v.mutex.Unlock()
- }
- _, err = io.ReadFull(f, buf)
+ err = v.getFunc(path, func(rdr io.Reader) error {
+ _, err = io.ReadFull(rdr, buf)
+ return err
+ })
if err != nil {
bufs.Put(buf)
return nil, err
return buf, nil
}
+// Compare returns nil if Get(loc) would return the same content as
+// expect. It is functionally equivalent to Get() followed by
+// bytes.Compare(), but uses less memory.
+func (v *UnixVolume) Compare(loc string, expect []byte) error {
+ path := v.blockPath(loc)
+ stat, err := v.stat(path)
+ if err != nil {
+ return err
+ }
+ bufLen := 1 << 20
+ if int64(bufLen) > stat.Size() {
+ bufLen = int(stat.Size())
+ }
+ cmp := expect
+ buf := make([]byte, bufLen)
+ return v.getFunc(path, func(rdr io.Reader) error {
+ // Loop invariants: all data read so far matched what
+ // we expected, and the first N bytes of cmp are
+ // expected to equal the next N bytes read from
+ // reader.
+ for {
+ n, err := rdr.Read(buf)
+ if n > len(cmp) || bytes.Compare(cmp[:n], buf[:n]) != 0 {
+ return collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], buf[:n], rdr)
+ }
+ cmp = cmp[n:]
+ if err == io.EOF {
+ if len(cmp) != 0 {
+ return collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], nil, nil)
+ }
+ return nil
+ } else if err != nil {
+ return err
+ }
+ }
+ })
+}
+
// Put stores a block of data identified by the locator string
// "loc". It returns nil on success. If the volume is full, it
// returns a FullError. If the write fails due to some other error,
}
bpath := v.blockPath(loc)
- if v.serialize {
- v.mutex.Lock()
- defer v.mutex.Unlock()
+ if v.locker != nil {
+ v.locker.Lock()
+ defer v.locker.Unlock()
}
if _, err := tmpfile.Write(block); err != nil {
log.Printf("%s: writing to %s: %s\n", v, bpath, err)
}
}
+// Delete deletes the block data from the unix storage
func (v *UnixVolume) Delete(loc string) error {
// Touch() must be called before calling Write() on a block. Touch()
// also uses lockfile(). This avoids a race condition between Write()
if v.readonly {
return MethodDisabledError
}
- if v.serialize {
- v.mutex.Lock()
- defer v.mutex.Unlock()
+ if v.locker != nil {
+ v.locker.Lock()
+ defer v.locker.Unlock()
}
p := v.blockPath(loc)
f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
}
// IsFull returns true if the free space on the volume is less than
-// MIN_FREE_KILOBYTES.
+// MinFreeKilobytes.
//
func (v *UnixVolume) IsFull() (isFull bool) {
fullSymlink := v.root + "/full"
}
if avail, err := v.FreeDiskSpace(); err == nil {
- isFull = avail < MIN_FREE_KILOBYTES
+ isFull = avail < MinFreeKilobytes
} else {
log.Printf("%s: FreeDiskSpace: %s\n", v, err)
isFull = false
return fmt.Sprintf("[UnixVolume %s]", v.root)
}
+// Writable returns false if all future Put, Mtime, and Delete calls are expected to fail.
func (v *UnixVolume) Writable() bool {
return !v.readonly
}
import (
"bytes"
+ "errors"
"fmt"
+ "io"
"io/ioutil"
"os"
- "regexp"
- "sort"
"strings"
+ "sync"
"syscall"
"testing"
"time"
)
-func TempUnixVolume(t *testing.T, serialize bool, readonly bool) *UnixVolume {
+type TestableUnixVolume struct {
+ UnixVolume
+ t *testing.T
+}
+
+func NewTestableUnixVolume(t *testing.T, serialize bool, readonly bool) *TestableUnixVolume {
d, err := ioutil.TempDir("", "volume_test")
if err != nil {
t.Fatal(err)
}
- return &UnixVolume{
- root: d,
- serialize: serialize,
- readonly: readonly,
+ var locker sync.Locker
+ if serialize {
+ locker = &sync.Mutex{}
+ }
+ return &TestableUnixVolume{
+ UnixVolume: UnixVolume{
+ root: d,
+ locker: locker,
+ readonly: readonly,
+ },
+ t: t,
}
}
-func _teardown(v *UnixVolume) {
- os.RemoveAll(v.root)
+// PutRaw writes a Keep block directly into a UnixVolume, even if
+// the volume is readonly.
+func (v *TestableUnixVolume) PutRaw(locator string, data []byte) {
+ defer func(orig bool) {
+ v.readonly = orig
+ }(v.readonly)
+ v.readonly = false
+ err := v.Put(locator, data)
+ if err != nil {
+ v.t.Fatal(err)
+ }
}
-// _store writes a Keep block directly into a UnixVolume, bypassing
-// the overhead and safeguards of Put(). Useful for storing bogus data
-// and isolating unit tests from Put() behavior.
-func _store(t *testing.T, vol *UnixVolume, filename string, block []byte) {
- blockdir := fmt.Sprintf("%s/%s", vol.root, filename[:3])
- if err := os.MkdirAll(blockdir, 0755); err != nil {
- t.Fatal(err)
+func (v *TestableUnixVolume) TouchWithDate(locator string, lastPut time.Time) {
+ err := syscall.Utime(v.blockPath(locator), &syscall.Utimbuf{lastPut.Unix(), lastPut.Unix()})
+ if err != nil {
+ v.t.Fatal(err)
}
+}
- blockpath := fmt.Sprintf("%s/%s", blockdir, filename)
- if f, err := os.Create(blockpath); err == nil {
- f.Write(block)
- f.Close()
- } else {
- t.Fatal(err)
+func (v *TestableUnixVolume) Teardown() {
+ if err := os.RemoveAll(v.root); err != nil {
+ v.t.Fatal(err)
}
}
-func TestGet(t *testing.T) {
- v := TempUnixVolume(t, false, false)
- defer _teardown(v)
- _store(t, v, TEST_HASH, TEST_BLOCK)
+// serialize = false; readonly = false
+func TestUnixVolumeWithGenericTests(t *testing.T) {
+ DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
+ return NewTestableUnixVolume(t, false, false)
+ })
+}
- buf, err := v.Get(TEST_HASH)
- if err != nil {
- t.Error(err)
- }
- if bytes.Compare(buf, TEST_BLOCK) != 0 {
- t.Errorf("expected %s, got %s", string(TEST_BLOCK), string(buf))
- }
+// serialize = false; readonly = true
+func TestUnixVolumeWithGenericTestsReadOnly(t *testing.T) {
+ DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
+ return NewTestableUnixVolume(t, false, true)
+ })
+}
+
+// serialize = true; readonly = false
+func TestUnixVolumeWithGenericTestsSerialized(t *testing.T) {
+ DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
+ return NewTestableUnixVolume(t, true, false)
+ })
}
func TestGetNotFound(t *testing.T) {
- v := TempUnixVolume(t, false, false)
- defer _teardown(v)
- _store(t, v, TEST_HASH, TEST_BLOCK)
+ v := NewTestableUnixVolume(t, false, false)
+ defer v.Teardown()
+ v.Put(TestHash, TestBlock)
- buf, err := v.Get(TEST_HASH_2)
+ buf, err := v.Get(TestHash2)
switch {
case os.IsNotExist(err):
break
}
}
-func TestIndexTo(t *testing.T) {
- v := TempUnixVolume(t, false, false)
- defer _teardown(v)
-
- _store(t, v, TEST_HASH, TEST_BLOCK)
- _store(t, v, TEST_HASH_2, TEST_BLOCK_2)
- _store(t, v, TEST_HASH_3, TEST_BLOCK_3)
-
- buf := new(bytes.Buffer)
- v.IndexTo("", buf)
- index_rows := strings.Split(string(buf.Bytes()), "\n")
- sort.Strings(index_rows)
- sorted_index := strings.Join(index_rows, "\n")
- m, err := regexp.MatchString(
- `^\n`+TEST_HASH+`\+\d+ \d+\n`+
- TEST_HASH_3+`\+\d+ \d+\n`+
- TEST_HASH_2+`\+\d+ \d+$`,
- sorted_index)
- if err != nil {
- t.Error(err)
- } else if !m {
- t.Errorf("Got index %q for empty prefix", sorted_index)
- }
-
- for _, prefix := range []string{"f", "f15", "f15ac"} {
- buf = new(bytes.Buffer)
- v.IndexTo(prefix, buf)
- m, err := regexp.MatchString(`^`+TEST_HASH_2+`\+\d+ \d+\n$`, string(buf.Bytes()))
- if err != nil {
- t.Error(err)
- } else if !m {
- t.Errorf("Got index %q for prefix %q", string(buf.Bytes()), prefix)
- }
- }
-}
-
func TestPut(t *testing.T) {
- v := TempUnixVolume(t, false, false)
- defer _teardown(v)
+ v := NewTestableUnixVolume(t, false, false)
+ defer v.Teardown()
- err := v.Put(TEST_HASH, TEST_BLOCK)
+ err := v.Put(TestHash, TestBlock)
if err != nil {
t.Error(err)
}
- p := fmt.Sprintf("%s/%s/%s", v.root, TEST_HASH[:3], TEST_HASH)
+ p := fmt.Sprintf("%s/%s/%s", v.root, TestHash[:3], TestHash)
if buf, err := ioutil.ReadFile(p); err != nil {
t.Error(err)
- } else if bytes.Compare(buf, TEST_BLOCK) != 0 {
+ } else if bytes.Compare(buf, TestBlock) != 0 {
t.Errorf("Write should have stored %s, did store %s",
- string(TEST_BLOCK), string(buf))
+ string(TestBlock), string(buf))
}
}
func TestPutBadVolume(t *testing.T) {
- v := TempUnixVolume(t, false, false)
- defer _teardown(v)
+ v := NewTestableUnixVolume(t, false, false)
+ defer v.Teardown()
os.Chmod(v.root, 000)
- err := v.Put(TEST_HASH, TEST_BLOCK)
+ err := v.Put(TestHash, TestBlock)
if err == nil {
t.Error("Write should have failed")
}
}
func TestUnixVolumeReadonly(t *testing.T) {
- v := TempUnixVolume(t, false, false)
- defer _teardown(v)
-
- // First write something before marking readonly
- err := v.Put(TEST_HASH, TEST_BLOCK)
- if err != nil {
- t.Error("got err %v, expected nil", err)
- }
+ v := NewTestableUnixVolume(t, false, true)
+ defer v.Teardown()
- v.readonly = true
+ v.PutRaw(TestHash, TestBlock)
- _, err = v.Get(TEST_HASH)
+ _, err := v.Get(TestHash)
if err != nil {
- t.Error("got err %v, expected nil", err)
+ t.Errorf("got err %v, expected nil", err)
}
- err = v.Put(TEST_HASH, TEST_BLOCK)
+ err = v.Put(TestHash, TestBlock)
if err != MethodDisabledError {
- t.Error("got err %v, expected MethodDisabledError", err)
+ t.Errorf("got err %v, expected MethodDisabledError", err)
}
- err = v.Touch(TEST_HASH)
+ err = v.Touch(TestHash)
if err != MethodDisabledError {
- t.Error("got err %v, expected MethodDisabledError", err)
+ t.Errorf("got err %v, expected MethodDisabledError", err)
}
- err = v.Delete(TEST_HASH)
+ err = v.Delete(TestHash)
if err != MethodDisabledError {
- t.Error("got err %v, expected MethodDisabledError", err)
- }
-}
-
-// TestPutTouch
-// Test that when applying PUT to a block that already exists,
-// the block's modification time is updated.
-func TestPutTouch(t *testing.T) {
- v := TempUnixVolume(t, false, false)
- defer _teardown(v)
-
- if err := v.Put(TEST_HASH, TEST_BLOCK); err != nil {
- t.Error(err)
- }
-
- // We'll verify { t0 < threshold < t1 }, where t0 is the
- // existing block's timestamp on disk before Put() and t1 is
- // its timestamp after Put().
- threshold := time.Now().Add(-time.Second)
-
- // Set the stored block's mtime far enough in the past that we
- // can see the difference between "timestamp didn't change"
- // and "timestamp granularity is too low".
- {
- oldtime := time.Now().Add(-20 * time.Second).Unix()
- if err := syscall.Utime(v.blockPath(TEST_HASH),
- &syscall.Utimbuf{oldtime, oldtime}); err != nil {
- t.Error(err)
- }
-
- // Make sure v.Mtime() agrees the above Utime really worked.
- if t0, err := v.Mtime(TEST_HASH); err != nil || t0.IsZero() || !t0.Before(threshold) {
- t.Errorf("Setting mtime failed: %v, %v", t0, err)
- }
- }
-
- // Write the same block again.
- if err := v.Put(TEST_HASH, TEST_BLOCK); err != nil {
- t.Error(err)
- }
-
- // Verify threshold < t1
- t1, err := v.Mtime(TEST_HASH)
- if err != nil {
- t.Error(err)
- }
- if t1.Before(threshold) {
- t.Errorf("t1 %v must be >= threshold %v after v.Put ",
- t1, threshold)
- }
-}
-
-// Serialization tests: launch a bunch of concurrent
-//
-// TODO(twp): show that the underlying Read/Write operations executed
-// serially and not concurrently. The easiest way to do this is
-// probably to activate verbose or debug logging, capture log output
-// and examine it to confirm that Reads and Writes did not overlap.
-//
-// TODO(twp): a proper test of I/O serialization requires that a
-// second request start while the first one is still underway.
-// Guaranteeing that the test behaves this way requires some tricky
-// synchronization and mocking. For now we'll just launch a bunch of
-// requests simultaenously in goroutines and demonstrate that they
-// return accurate results.
-//
-func TestGetSerialized(t *testing.T) {
- // Create a volume with I/O serialization enabled.
- v := TempUnixVolume(t, true, false)
- defer _teardown(v)
-
- _store(t, v, TEST_HASH, TEST_BLOCK)
- _store(t, v, TEST_HASH_2, TEST_BLOCK_2)
- _store(t, v, TEST_HASH_3, TEST_BLOCK_3)
-
- sem := make(chan int)
- go func(sem chan int) {
- buf, err := v.Get(TEST_HASH)
- if err != nil {
- t.Errorf("err1: %v", err)
- }
- if bytes.Compare(buf, TEST_BLOCK) != 0 {
- t.Errorf("buf should be %s, is %s", string(TEST_BLOCK), string(buf))
- }
- sem <- 1
- }(sem)
-
- go func(sem chan int) {
- buf, err := v.Get(TEST_HASH_2)
- if err != nil {
- t.Errorf("err2: %v", err)
- }
- if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
- t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_2), string(buf))
- }
- sem <- 1
- }(sem)
-
- go func(sem chan int) {
- buf, err := v.Get(TEST_HASH_3)
- if err != nil {
- t.Errorf("err3: %v", err)
- }
- if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
- t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_3), string(buf))
- }
- sem <- 1
- }(sem)
-
- // Wait for all goroutines to finish
- for done := 0; done < 3; {
- done += <-sem
- }
-}
-
-func TestPutSerialized(t *testing.T) {
- // Create a volume with I/O serialization enabled.
- v := TempUnixVolume(t, true, false)
- defer _teardown(v)
-
- sem := make(chan int)
- go func(sem chan int) {
- err := v.Put(TEST_HASH, TEST_BLOCK)
- if err != nil {
- t.Errorf("err1: %v", err)
- }
- sem <- 1
- }(sem)
-
- go func(sem chan int) {
- err := v.Put(TEST_HASH_2, TEST_BLOCK_2)
- if err != nil {
- t.Errorf("err2: %v", err)
- }
- sem <- 1
- }(sem)
-
- go func(sem chan int) {
- err := v.Put(TEST_HASH_3, TEST_BLOCK_3)
- if err != nil {
- t.Errorf("err3: %v", err)
- }
- sem <- 1
- }(sem)
-
- // Wait for all goroutines to finish
- for done := 0; done < 3; {
- done += <-sem
- }
-
- // Double check that we actually wrote the blocks we expected to write.
- buf, err := v.Get(TEST_HASH)
- if err != nil {
- t.Errorf("Get #1: %v", err)
- }
- if bytes.Compare(buf, TEST_BLOCK) != 0 {
- t.Errorf("Get #1: expected %s, got %s", string(TEST_BLOCK), string(buf))
- }
-
- buf, err = v.Get(TEST_HASH_2)
- if err != nil {
- t.Errorf("Get #2: %v", err)
- }
- if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
- t.Errorf("Get #2: expected %s, got %s", string(TEST_BLOCK_2), string(buf))
- }
-
- buf, err = v.Get(TEST_HASH_3)
- if err != nil {
- t.Errorf("Get #3: %v", err)
- }
- if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
- t.Errorf("Get #3: expected %s, got %s", string(TEST_BLOCK_3), string(buf))
+ t.Errorf("got err %v, expected MethodDisabledError", err)
}
}
func TestIsFull(t *testing.T) {
- v := TempUnixVolume(t, false, false)
- defer _teardown(v)
+ v := NewTestableUnixVolume(t, false, false)
+ defer v.Teardown()
- full_path := v.root + "/full"
+ fullPath := v.root + "/full"
now := fmt.Sprintf("%d", time.Now().Unix())
- os.Symlink(now, full_path)
+ os.Symlink(now, fullPath)
if !v.IsFull() {
t.Errorf("%s: claims not to be full", v)
}
- os.Remove(full_path)
+ os.Remove(fullPath)
// Test with an expired /full link.
expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
- os.Symlink(expired, full_path)
+ os.Symlink(expired, fullPath)
if v.IsFull() {
t.Errorf("%s: should no longer be full", v)
}
}
func TestNodeStatus(t *testing.T) {
- v := TempUnixVolume(t, false, false)
- defer _teardown(v)
+ v := NewTestableUnixVolume(t, false, false)
+ defer v.Teardown()
// Get node status and make a basic sanity check.
volinfo := v.Status()
t.Errorf("uninitialized bytes_used in %v", volinfo)
}
}
+
+func TestUnixVolumeGetFuncWorkerError(t *testing.T) {
+ v := NewTestableUnixVolume(t, false, false)
+ defer v.Teardown()
+
+ v.Put(TestHash, TestBlock)
+ mockErr := errors.New("Mock error")
+ err := v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
+ return mockErr
+ })
+ if err != mockErr {
+ t.Errorf("Got %v, expected %v", err, mockErr)
+ }
+}
+
+func TestUnixVolumeGetFuncFileError(t *testing.T) {
+ v := NewTestableUnixVolume(t, false, false)
+ defer v.Teardown()
+
+ funcCalled := false
+ err := v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
+ funcCalled = true
+ return nil
+ })
+ if err == nil {
+ t.Errorf("Expected error opening non-existent file")
+ }
+ if funcCalled {
+ t.Errorf("Worker func should not have been called")
+ }
+}
+
+func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) {
+ v := NewTestableUnixVolume(t, false, false)
+ defer v.Teardown()
+
+ v.Put(TestHash, TestBlock)
+
+ mtx := NewMockMutex()
+ v.locker = mtx
+
+ funcCalled := make(chan struct{})
+ go v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
+ funcCalled <- struct{}{}
+ return nil
+ })
+ select {
+ case mtx.AllowLock <- struct{}{}:
+ case <-funcCalled:
+ t.Fatal("Function was called before mutex was acquired")
+ case <-time.After(5 * time.Second):
+ t.Fatal("Timed out before mutex was acquired")
+ }
+ select {
+ case <-funcCalled:
+ case mtx.AllowUnlock <- struct{}{}:
+ t.Fatal("Mutex was released before function was called")
+ case <-time.After(5 * time.Second):
+ t.Fatal("Timed out waiting for funcCalled")
+ }
+ select {
+ case mtx.AllowUnlock <- struct{}{}:
+ case <-time.After(5 * time.Second):
+ t.Fatal("Timed out waiting for getFunc() to release mutex")
+ }
+}
+
+func TestUnixVolumeCompare(t *testing.T) {
+ v := NewTestableUnixVolume(t, false, false)
+ defer v.Teardown()
+
+ v.Put(TestHash, TestBlock)
+ err := v.Compare(TestHash, TestBlock)
+ if err != nil {
+ t.Errorf("Got err %q, expected nil", err)
+ }
+
+ err = v.Compare(TestHash, []byte("baddata"))
+ if err != CollisionError {
+ t.Errorf("Got err %q, expected %q", err, CollisionError)
+ }
+
+ v.Put(TestHash, []byte("baddata"))
+ err = v.Compare(TestHash, TestBlock)
+ if err != DiskHashError {
+ t.Errorf("Got err %q, expected %q", err, DiskHashError)
+ }
+
+ p := fmt.Sprintf("%s/%s/%s", v.root, TestHash[:3], TestHash)
+ os.Chmod(p, 000)
+ err = v.Compare(TestHash, TestBlock)
+ if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
+ t.Errorf("Got err %q, expected %q", err, "permission denied")
+ }
+}
+
+// TODO(twp): show that the underlying Read/Write operations executed
+// serially and not concurrently. The easiest way to do this is
+// probably to activate verbose or debug logging, capture log output
+// and examine it to confirm that Reads and Writes did not overlap.
+//
+// TODO(twp): a proper test of I/O serialization requires that a
+// second request start while the first one is still underway.
+// Guaranteeing that the test behaves this way requires some tricky
+// synchronization and mocking. For now we'll just launch a bunch of
+// requests simultaenously in goroutines and demonstrate that they
+// return accurate results.
import "container/list"
+// WorkQueue definition
type WorkQueue struct {
getStatus chan WorkQueueStatus
newlist chan *list.List
DoneItem chan<- struct{}
}
+// WorkQueueStatus reflects the queue status.
type WorkQueueStatus struct {
InProgress int
Queued int
import libcloud.compute.base as cloud_base
import libcloud.compute.providers as cloud_provider
import libcloud.compute.types as cloud_types
+from libcloud.common.exceptions import BaseHTTPError
from . import BaseComputeNodeDriver
from .. import arvados_node_fqdn, arvados_timestamp, ARVADOS_TIMEFMT
}
def sync_node(self, cloud_node, arvados_node):
- self.real.ex_create_tags(cloud_node,
- {'hostname': arvados_node_fqdn(arvados_node)})
+ try:
+ self.real.ex_create_tags(cloud_node,
+ {'hostname': arvados_node_fqdn(arvados_node)})
+ return True
+ except BaseHTTPError as b:
+ return False
def _init_image(self, urn):
return "image", self.get_image(urn)
'python-daemon',
],
dependency_links = [
- "https://github.com/curoverse/libcloud/archive/apache-libcloud-0.18.1.dev2.zip"
+ "https://github.com/curoverse/libcloud/archive/apache-libcloud-0.18.1.dev3.zip"
],
scripts=['bin/arvados-node-manager'],
test_suite='tests',
- tests_require=['mock>=1.0', "apache-libcloud==0.18.1.dev2"],
+ tests_require=['mock>=1.0', "apache-libcloud==0.18.1.dev3"],
zip_safe=False,
cmdclass={'egg_info': tagger},
)