- install/install-manual-prerequisites.html.textile.liquid
- install/install-sso.html.textile.liquid
- install/install-api-server.html.textile.liquid
+ - install/install-arv-git-httpd.html.textile.liquid
- install/install-workbench-app.html.textile.liquid
- install/install-shell-server.html.textile.liquid
- install/create-standard-objects.html.textile.liquid
- - install/install-arv-git-httpd.html.textile.liquid
- install/install-keepstore.html.textile.liquid
- install/install-keepproxy.html.textile.liquid
- install/install-crunch-dispatch.html.textile.liquid
bufs = newBufferPool(maxBuffers, BLOCKSIZE)
}
+// Restore sane default after bufferpool's own tests
+func (s *BufferPoolSuite) TearDownTest(c *C) {
+ bufs = newBufferPool(maxBuffers, BLOCKSIZE)
+}
+
func (s *BufferPoolSuite) TestBufferPoolBufSize(c *C) {
bufs := newBufferPool(2, 10)
b1 := bufs.Get(1)
}
type NodeStatus struct {
- Volumes []*VolumeStatus `json:"volumes"`
+ Volumes []*VolumeStatus `json:"volumes"`
BufferPool PoolStatus
+ PullQueue WorkQueueStatus
+ TrashQueue WorkQueueStatus
Memory runtime.MemStats
}
var st NodeStatus
var stLock sync.Mutex
+
func StatusHandler(resp http.ResponseWriter, req *http.Request) {
stLock.Lock()
- ReadNodeStatus(&st)
+ readNodeStatus(&st)
jstat, err := json.Marshal(&st)
stLock.Unlock()
if err == nil {
}
}
-// ReadNodeStatus populates the given NodeStatus struct with current
-// values.
-//
-func ReadNodeStatus(st *NodeStatus) {
+// populate the given NodeStatus struct with current values.
+func readNodeStatus(st *NodeStatus) {
vols := KeepVM.AllReadable()
if cap(st.Volumes) < len(vols) {
st.Volumes = make([]*VolumeStatus, len(vols))
st.BufferPool.Alloc = bufs.Alloc()
st.BufferPool.Cap = bufs.Cap()
st.BufferPool.Len = bufs.Len()
+ st.PullQueue = getWorkQueueStatus(pullq)
+ st.TrashQueue = getWorkQueueStatus(trashq)
runtime.ReadMemStats(&st.Memory)
}
+// return a WorkQueueStatus for the given queue. If q is nil (which
+// should never happen except in test suites), return a zero status
+// value instead of crashing.
+func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
+ if q == nil {
+ // This should only happen during tests.
+ return WorkQueueStatus{}
+ }
+ return q.Status()
+}
+
// DeleteHandler processes DELETE requests.
//
// DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
PermissionSecret = []byte(known_key)
defer func() { PermissionSecret = nil }()
- if VerifySignature(known_locator+"+K@xyzzy"+known_sig_hint, known_token) != nil{
+ if VerifySignature(known_locator+"+K@xyzzy"+known_sig_hint, known_token) != nil {
t.Fatal("Verify cannot handle hint before permission signature")
}
for item := range nextItem {
pullRequest := item.(PullRequest)
err := PullItemAndProcess(item.(PullRequest), GenerateRandomApiToken(), keepClient)
+ pullq.DoneItem <- struct{}{}
if err == nil {
log.Printf("Pull %s success", pullRequest)
} else {
func performPullWorkerIntegrationTest(testData PullWorkIntegrationTestData, pullRequest PullRequest, t *testing.T) {
// Override PutContent to mock PutBlock functionality
- defer func(orig func([]byte, string)(error)) { PutContent = orig }(PutContent)
+ defer func(orig func([]byte, string) error) { PutContent = orig }(PutContent)
PutContent = func(content []byte, locator string) (err error) {
if string(content) != testData.Content {
t.Errorf("PutContent invoked with unexpected data. Expected: %s; Found: %s", testData.Content, content)
}
// Override GetContent to mock keepclient Get functionality
- defer func(orig func(string, *keepclient.KeepClient)(io.ReadCloser, int64, string, error)) { GetContent = orig }(GetContent)
+ defer func(orig func(string, *keepclient.KeepClient) (io.ReadCloser, int64, string, error)) {
+ GetContent = orig
+ }(GetContent)
GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (
reader io.ReadCloser, contentLength int64, url string, err error) {
if testData.GetError != "" {
"io"
"net/http"
"testing"
+ "time"
)
type PullWorkerTestSuite struct{}
var _ = Suite(&PullWorkerTestSuite{})
var testPullLists map[string]string
-var processedPullLists map[string]string
var readContent string
var readError error
var putContent []byte
// This behavior is verified using these two maps in the
// "TestPullWorker_pull_list_with_two_items_latest_replacing_old"
testPullLists = make(map[string]string)
- processedPullLists = make(map[string]string)
}
// Since keepstore does not come into picture in tests,
}
func performTest(testData PullWorkerTestData, c *C) {
+ KeepVM = MakeTestVolumeManager(2)
+ defer KeepVM.Close()
+
RunTestPullWorker(c)
+ defer pullq.Close()
currentTestData = testData
testPullLists[testData.name] = testData.response_body
- // Override GetContent to mock keepclient Get functionality
- defer func(orig func(string, *keepclient.KeepClient)(io.ReadCloser, int64, string, error)) { GetContent = orig }(GetContent)
- GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (
- reader io.ReadCloser, contentLength int64, url string, err error) {
+ processedPullLists := make(map[string]string)
+ // Override GetContent to mock keepclient Get functionality
+ defer func(orig func(string, *keepclient.KeepClient) (io.ReadCloser, int64, string, error)) {
+ GetContent = orig
+ }(GetContent)
+ GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
+ c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(1))
processedPullLists[testData.name] = testData.response_body
if testData.read_error {
err = errors.New("Error getting data")
}
// Override PutContent to mock PutBlock functionality
- defer func(orig func([]byte, string)(error)) { PutContent = orig }(PutContent)
+ defer func(orig func([]byte, string) error) { PutContent = orig }(PutContent)
PutContent = func(content []byte, locator string) (err error) {
if testData.put_error {
err = errors.New("Error putting data")
}
}
+ c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
+ c.Assert(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
+
response := IssueRequest(&testData.req)
c.Assert(response.Code, Equals, testData.response_code)
c.Assert(response.Body.String(), Equals, testData.response_body)
- expectWorkerChannelEmpty(c, pullq.NextItem)
-
- pullq.Close()
+ expectEqualWithin(c, time.Second, 0, func() interface{} {
+ st := pullq.Status()
+ return st.InProgress + st.Queued
+ })
if testData.name == "TestPullWorker_pull_list_with_two_items_latest_replacing_old" {
c.Assert(len(testPullLists), Equals, 2)
c.Assert(string(putContent), Equals, testData.read_content)
}
}
+
+ expectChannelEmpty(c, pullq.NextItem)
}
type ClosingBuffer struct {
func (cb *ClosingBuffer) Close() (err error) {
return
}
-
-func expectWorkerChannelEmpty(c *C, workerChannel <-chan interface{}) {
- select {
- case item := <-workerChannel:
- c.Fatalf("Received value (%v) from channel that was expected to be empty", item)
- default:
- }
-}
-
-func expectWorkerChannelNotEmpty(c *C, workerChannel <-chan interface{}) {
- select {
- case item := <-workerChannel:
- c.Fatalf("Received value (%v) from channel that was expected to be empty", item)
- default:
- }
-}
--- /dev/null
+package main
+
+import (
+ "encoding/json"
+)
+
+// We don't have isolated unit tests for /status.json yet, but we do
+// check (e.g., in pull_worker_test.go) that /status.json reports
+// specific statistics correctly at the appropriate times.
+
+// getStatusItem("foo","bar","baz") retrieves /status.json, decodes
+// the response body into resp, and returns resp["foo"]["bar"]["baz"].
+func getStatusItem(keys ...string) interface{} {
+ resp := IssueRequest(&RequestTester{"/status.json", "", "GET", nil})
+ var s interface{}
+ json.NewDecoder(resp.Body).Decode(&s)
+ for _, k := range keys {
+ s = s.(map[string]interface{})[k]
+ }
+ return s
+}
for item := range trashq.NextItem {
trashRequest := item.(TrashRequest)
TrashItem(trashRequest)
+ trashq.DoneItem <- struct{}{}
}
}
}
go RunTrashWorker(trashq)
+ // Install gate so all local operations block until we say go
+ gate := make(chan struct{})
+ for _, v := range vols {
+ v.(*MockVolume).Gate = gate
+ }
+
+ assertStatusItem := func(k string, expect float64) {
+ if v := getStatusItem("TrashQueue", k); v != expect {
+ t.Errorf("Got %s %v, expected %v", k, v, expect)
+ }
+ }
+
+ assertStatusItem("InProgress", 0)
+ assertStatusItem("Queued", 0)
+
+ listLen := trashList.Len()
trashq.ReplaceQueue(trashList)
- time.Sleep(10 * time.Millisecond) // give a moment to finish processing the list
+
+ // Wait for worker to take request(s)
+ expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.Status().InProgress })
+
+ // Ensure status.json also reports work is happening
+ assertStatusItem("InProgress", float64(1))
+ assertStatusItem("Queued", float64(listLen-1))
+
+ // Let worker proceed
+ close(gate)
+
+ // Wait for worker to finish
+ expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.Status().InProgress })
// Verify Locator1 to be un/deleted as expected
data, _ := GetBlock(testData.Locator1, false)
// Readonly volumes return an error for Put, Delete, and
// Touch.
Readonly bool
- called map[string]int
- mutex sync.Mutex
+ // Gate is a "starting gate", allowing test cases to pause
+ // volume operations long enough to inspect state. Every
+ // operation (except Status) starts by receiving from
+ // Gate. Sending one value unblocks one operation; closing the
+ // channel unblocks all operations. By default, Gate is a
+ // closed channel, so all operations proceed without
+ // blocking. See trash_worker_test.go for an example.
+ Gate chan struct{}
+ called map[string]int
+ mutex sync.Mutex
}
// CreateMockVolume returns a non-Bad, non-Readonly, Touchable mock
// volume.
func CreateMockVolume() *MockVolume {
+ gate := make(chan struct{})
+ close(gate)
return &MockVolume{
Store: make(map[string][]byte),
Timestamps: make(map[string]time.Time),
Touchable: true,
Readonly: false,
called: map[string]int{},
+ Gate: gate,
}
}
func (v *MockVolume) Get(loc string) ([]byte, error) {
v.gotCall("Get")
+ <-v.Gate
if v.Bad {
return nil, errors.New("Bad volume")
} else if block, ok := v.Store[loc]; ok {
func (v *MockVolume) Put(loc string, block []byte) error {
v.gotCall("Put")
+ <-v.Gate
if v.Bad {
return errors.New("Bad volume")
}
func (v *MockVolume) Touch(loc string) error {
v.gotCall("Touch")
+ <-v.Gate
if v.Readonly {
return MethodDisabledError
}
func (v *MockVolume) Mtime(loc string) (time.Time, error) {
v.gotCall("Mtime")
+ <-v.Gate
var mtime time.Time
var err error
if v.Bad {
func (v *MockVolume) IndexTo(prefix string, w io.Writer) error {
v.gotCall("IndexTo")
+ <-v.Gate
for loc, block := range v.Store {
if !IsValidLocator(loc) || !strings.HasPrefix(loc, prefix) {
continue
func (v *MockVolume) Delete(loc string) error {
v.gotCall("Delete")
+ <-v.Gate
if v.Readonly {
return MethodDisabledError
}
import "container/list"
type WorkQueue struct {
- newlist chan *list.List
- NextItem chan interface{}
+ getStatus chan WorkQueueStatus
+ newlist chan *list.List
+ // Workers get work items by reading from this channel.
+ NextItem <-chan interface{}
+ // Each worker must send struct{}{} to DoneItem exactly once
+ // for each work item received from NextItem, when it stops
+ // working on that item (regardless of whether the work was
+ // successful).
+ DoneItem chan<- struct{}
}
-// NewWorkQueue returns a new worklist, and launches a listener
-// goroutine that waits for work and farms it out to workers.
+type WorkQueueStatus struct {
+ InProgress int
+ Queued int
+}
+
+// NewWorkQueue returns a new empty WorkQueue.
//
func NewWorkQueue() *WorkQueue {
+ nextItem := make(chan interface{})
+ reportDone := make(chan struct{})
+ newList := make(chan *list.List)
b := WorkQueue{
- newlist: make(chan *list.List),
- NextItem: make(chan interface{}),
+ getStatus: make(chan WorkQueueStatus),
+ newlist: newList,
+ NextItem: nextItem,
+ DoneItem: reportDone,
}
- go b.listen()
+ go func() {
+ // Read new work lists from the newlist channel.
+ // Reply to "status" and "get next item" queries by
+ // sending to the getStatus and nextItem channels
+ // respectively. Return when the newlist channel
+ // closes.
+
+ todo := &list.List{}
+ status := WorkQueueStatus{}
+
+ // When we're done, close the output channel; workers will
+ // shut down next time they ask for new work.
+ defer close(nextItem)
+ defer close(b.getStatus)
+
+ // nextChan and nextVal are both nil when we have
+ // nothing to send; otherwise they are, respectively,
+ // the nextItem channel and the next work item to send
+ // to it.
+ var nextChan chan interface{}
+ var nextVal interface{}
+
+ for newList != nil || status.InProgress > 0 {
+ select {
+ case p, ok := <-newList:
+ if !ok {
+ // Closed, stop receiving
+ newList = nil
+ }
+ todo = p
+ if todo == nil {
+ todo = &list.List{}
+ }
+ status.Queued = todo.Len()
+ if status.Queued == 0 {
+ // Stop sending work
+ nextChan = nil
+ nextVal = nil
+ } else {
+ nextChan = nextItem
+ nextVal = todo.Front().Value
+ }
+ case nextChan <- nextVal:
+ todo.Remove(todo.Front())
+ status.InProgress++
+ status.Queued--
+ if status.Queued == 0 {
+ // Stop sending work
+ nextChan = nil
+ nextVal = nil
+ } else {
+ nextVal = todo.Front().Value
+ }
+ case <-reportDone:
+ status.InProgress--
+ case b.getStatus <- status:
+ }
+ }
+ }()
return &b
}
-// ReplaceQueue sends a new list of pull requests to the manager goroutine.
-// The manager will discard any outstanding pull list and begin
-// working on the new list.
+// ReplaceQueue abandons any work items left in the existing queue,
+// and starts giving workers items from the given list. After giving
+// it to ReplaceQueue, the caller must not read or write the given
+// list.
//
func (b *WorkQueue) ReplaceQueue(list *list.List) {
b.newlist <- list
}
// Close shuts down the manager and terminates the goroutine, which
-// completes any pull request in progress and abandons any pending
-// requests.
+// abandons any pending requests, but allows any pull request already
+// in progress to continue.
+//
+// After Close, Status will return correct values, NextItem will be
+// closed, and ReplaceQueue will panic.
//
func (b *WorkQueue) Close() {
close(b.newlist)
}
-// listen is run in a goroutine. It reads new pull lists from its
-// input queue until the queue is closed.
-// listen takes ownership of the list that is passed to it.
-//
-// Note that the routine does not ever need to access the list
-// itself once the current_item has been initialized, so we do
-// not bother to keep a pointer to the list. Because it is a
-// doubly linked list, holding on to the current item will keep
-// it from garbage collection.
+// Status returns an up-to-date WorkQueueStatus reflecting the current
+// queue status.
//
-func (b *WorkQueue) listen() {
- var current_item *list.Element
-
- // When we're done, close the output channel to shut down any
- // workers.
- defer close(b.NextItem)
-
- for {
- // If the current list is empty, wait for a new list before
- // even checking if workers are ready.
- if current_item == nil {
- if p, ok := <-b.newlist; ok {
- current_item = p.Front()
- } else {
- // The channel was closed; shut down.
- return
- }
- }
- select {
- case p, ok := <-b.newlist:
- if ok {
- current_item = p.Front()
- } else {
- // The input channel is closed; time to shut down
- return
- }
- case b.NextItem <- current_item.Value:
- current_item = current_item.Next()
- }
- }
+func (b *WorkQueue) Status() WorkQueueStatus {
+ // If the channel is closed, we get the nil value of
+ // WorkQueueStatus, which is an accurate description of a
+ // finished queue.
+ return <-b.getStatus
}
import (
"container/list"
+ "runtime"
"testing"
+ "time"
)
+type fatalfer interface {
+ Fatalf(string, ...interface{})
+}
+
func makeTestWorkList(ary []int) *list.List {
l := list.New()
for _, n := range ary {
return l
}
-func expectChannelEmpty(t *testing.T, c <-chan interface{}) {
+func expectChannelEmpty(t fatalfer, c <-chan interface{}) {
select {
- case item := <-c:
- t.Fatalf("Received value (%v) from channel that we expected to be empty", item)
+ case item, ok := <-c:
+ if ok {
+ t.Fatalf("Received value (%+v) from channel that we expected to be empty", item)
+ }
default:
- // no-op
}
}
-func expectChannelNotEmpty(t *testing.T, c <-chan interface{}) {
- if item, ok := <-c; !ok {
- t.Fatal("expected data on a closed channel")
- } else if item == nil {
- t.Fatal("expected data on an empty channel")
+func expectChannelNotEmpty(t fatalfer, c <-chan interface{}) interface{} {
+ select {
+ case item, ok := <-c:
+ if !ok {
+ t.Fatalf("expected data on a closed channel")
+ }
+ return item
+ case <-time.After(time.Second):
+ t.Fatalf("expected data on an empty channel")
+ return nil
}
}
-func expectChannelClosed(t *testing.T, c <-chan interface{}) {
- received, ok := <-c
- if ok {
- t.Fatalf("Expected channel to be closed, but received %v instead", received)
+func expectChannelClosedWithin(t fatalfer, timeout time.Duration, c <-chan interface{}) {
+ select {
+ case received, ok := <-c:
+ if ok {
+ t.Fatalf("Expected channel to be closed, but received %+v instead", received)
+ }
+ case <-time.After(timeout):
+ t.Fatalf("Expected channel to be closed, but it is still open after %v", timeout)
}
}
-func expectFromChannel(t *testing.T, c <-chan interface{}, expected []int) {
+func doWorkItems(t fatalfer, q *WorkQueue, expected []int) {
for i := range expected {
- actual, ok := <-c
- t.Logf("received %v", actual)
+ actual, ok := <-q.NextItem
if !ok {
- t.Fatalf("Expected %v but channel was closed after receiving the first %d elements correctly.", expected, i)
- } else if actual.(int) != expected[i] {
- t.Fatalf("Expected %v but received '%v' after receiving the first %d elements correctly.", expected[i], actual, i)
+ t.Fatalf("Expected %+v but channel was closed after receiving %+v as expected.", expected, expected[:i])
+ }
+ q.DoneItem <- struct{}{}
+ if actual.(int) != expected[i] {
+ t.Fatalf("Expected %+v but received %+v after receiving %+v as expected.", expected[i], actual, expected[:i])
+ }
+ }
+}
+
+func expectEqualWithin(t fatalfer, timeout time.Duration, expect interface{}, f func() interface{}) {
+ ok := make(chan struct{})
+ giveup := false
+ go func() {
+ for f() != expect && !giveup {
+ time.Sleep(time.Millisecond)
+ }
+ close(ok)
+ }()
+ select {
+ case <-ok:
+ case <-time.After(timeout):
+ giveup = true
+ _, file, line, _ := runtime.Caller(1)
+ t.Fatalf("Still getting %+v, timed out waiting for %+v\n%s:%d", f(), expect, file, line)
+ }
+}
+
+func expectQueued(t fatalfer, b *WorkQueue, expectQueued int) {
+ if l := b.Status().Queued; l != expectQueued {
+ t.Fatalf("Got Queued==%d, expected %d", l, expectQueued)
+ }
+}
+
+func TestWorkQueueDoneness(t *testing.T) {
+ b := NewWorkQueue()
+ defer b.Close()
+ b.ReplaceQueue(makeTestWorkList([]int{1, 2, 3}))
+ expectQueued(t, b, 3)
+ gate := make(chan struct{})
+ go func() {
+ <-gate
+ for _ = range b.NextItem {
+ <-gate
+ time.Sleep(time.Millisecond)
+ b.DoneItem <- struct{}{}
}
+ }()
+ expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
+ b.ReplaceQueue(makeTestWorkList([]int{4, 5, 6}))
+ for i := 1; i <= 3; i++ {
+ gate <- struct{}{}
+ expectEqualWithin(t, time.Second, 3-i, func() interface{} { return b.Status().Queued })
+ expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress })
}
+ close(gate)
+ expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
+ expectChannelEmpty(t, b.NextItem)
}
// Create a WorkQueue, generate a list for it, and instantiate a worker.
var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
b := NewWorkQueue()
+ expectQueued(t, b, 0)
+
b.ReplaceQueue(makeTestWorkList(input))
+ expectQueued(t, b, len(input))
- expectFromChannel(t, b.NextItem, input)
+ doWorkItems(t, b, input)
expectChannelEmpty(t, b.NextItem)
b.Close()
}
var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
b := NewWorkQueue()
+ defer b.Close()
// First, demonstrate that nothing is available on the NextItem
// channel.
//
done := make(chan int)
go func() {
- expectFromChannel(t, b.NextItem, input)
- b.Close()
+ doWorkItems(t, b, input)
done <- 1
}()
// finish.
b.ReplaceQueue(makeTestWorkList(input))
<-done
+ expectQueued(t, b, 0)
+}
- expectChannelClosed(t, b.NextItem)
+// After Close(), NextItem closes, work finishes, then stats return zero.
+func TestWorkQueueClose(t *testing.T) {
+ b := NewWorkQueue()
+ input := []int{1, 2, 3, 4, 5, 6, 7, 8}
+ mark := make(chan struct{})
+ go func() {
+ <-b.NextItem
+ mark <- struct{}{}
+ <-mark
+ b.DoneItem <- struct{}{}
+ }()
+ b.ReplaceQueue(makeTestWorkList(input))
+ // Wait for worker to take item 1
+ <-mark
+ b.Close()
+ expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress })
+ // Tell worker to report done
+ mark <- struct{}{}
+ expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
+ expectChannelClosedWithin(t, time.Second, b.NextItem)
}
// Show that a reader may block when the manager's list is exhausted,
)
b := NewWorkQueue()
+ defer b.Close()
sendmore := make(chan int)
done := make(chan int)
go func() {
- expectFromChannel(t, b.NextItem, inputBeforeBlock)
+ doWorkItems(t, b, inputBeforeBlock)
// Confirm that the channel is empty, so a subsequent read
// on it will block.
// Signal that we're ready for more input.
sendmore <- 1
- expectFromChannel(t, b.NextItem, inputAfterBlock)
- b.Close()
+ doWorkItems(t, b, inputAfterBlock)
done <- 1
}()
// Read just the first five elements from the work list.
// Confirm that the channel is not empty.
- expectFromChannel(t, b.NextItem, firstInput[0:5])
+ doWorkItems(t, b, firstInput[0:5])
expectChannelNotEmpty(t, b.NextItem)
// Replace the work list and read five more elements.
// The old list should have been discarded and all new
// elements come from the new list.
b.ReplaceQueue(makeTestWorkList(replaceInput))
- expectFromChannel(t, b.NextItem, replaceInput[0:5])
+ doWorkItems(t, b, replaceInput[0:5])
b.Close()
}