Len int `json:"BuffersInUse"`
}
+type WorkQueueStatus struct {
+ InProgress int
+ Outstanding int
+ Queued int
+}
+
type NodeStatus struct {
Volumes []*VolumeStatus `json:"volumes"`
BufferPool PoolStatus
+ PullQueue WorkQueueStatus
+ TrashQueue WorkQueueStatus
Memory runtime.MemStats
}
func StatusHandler(resp http.ResponseWriter, req *http.Request) {
stLock.Lock()
- ReadNodeStatus(&st)
+ readNodeStatus(&st)
jstat, err := json.Marshal(&st)
stLock.Unlock()
if err == nil {
}
}
-// ReadNodeStatus populates the given NodeStatus struct with current
-// values.
-//
-func ReadNodeStatus(st *NodeStatus) {
+// populate the given NodeStatus struct with current values.
+func readNodeStatus(st *NodeStatus) {
vols := KeepVM.AllReadable()
if cap(st.Volumes) < len(vols) {
st.Volumes = make([]*VolumeStatus, len(vols))
st.BufferPool.Alloc = bufs.Alloc()
st.BufferPool.Cap = bufs.Cap()
st.BufferPool.Len = bufs.Len()
+ readWorkQueueStatus(&st.PullQueue, pullq)
+ readWorkQueueStatus(&st.TrashQueue, trashq)
runtime.ReadMemStats(&st.Memory)
}
+// Populate a WorkQueueStatus. This is not atomic, so race conditions
+// can cause InProgress + Queued != Outstanding.
+func readWorkQueueStatus(st *WorkQueueStatus, q *WorkQueue) {
+ if q == nil {
+ // This should only happen during tests.
+ *st = WorkQueueStatus{}
+ return
+ }
+ st.InProgress = q.CountInProgress()
+ st.Outstanding = q.CountOutstanding()
+ st.Queued = q.CountQueued()
+}
+
// DeleteHandler processes DELETE requests.
//
// DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
}
func performTest(testData PullWorkerTestData, c *C) {
+ KeepVM = MakeTestVolumeManager(2)
+ defer KeepVM.Close()
+
RunTestPullWorker(c)
defer pullq.Close()
GetContent = orig
}(GetContent)
GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
+ c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(1))
processedPullLists[testData.name] = testData.response_body
if testData.read_error {
err = errors.New("Error getting data")
}
}
+ c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
+ c.Assert(getStatusItem("PullQueue", "Outstanding"), Equals, float64(0))
+ c.Assert(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
+
response := IssueRequest(&testData.req)
c.Assert(response.Code, Equals, testData.response_code)
c.Assert(response.Body.String(), Equals, testData.response_body)
--- /dev/null
+package main
+
+import (
+ "encoding/json"
+)
+
+func getStatusItem(keys ...string) interface{} {
+ resp := IssueRequest(&RequestTester{"/status.json", "", "GET", nil})
+ var s interface{}
+ json.NewDecoder(resp.Body).Decode(&s)
+ for _, k := range keys {
+ s = s.(map[string]interface{})[k]
+ }
+ return s
+}
}
go RunTrashWorker(trashq)
+ // Install gate so all local operations block until we say go
+ gate := make(chan struct{})
+ for _, v := range vols {
+ v.(*MockVolume).Gate = gate
+ }
+
+ assertStatusItem := func(k string, expect float64) {
+ if v := getStatusItem("TrashQueue", k); v != expect {
+ t.Errorf("Got %s %v, expected %v", k, v, expect)
+ }
+ }
+
+ assertStatusItem("InProgress", 0)
+ assertStatusItem("Outstanding", 0)
+ assertStatusItem("Queued", 0)
+
+ listLen := trashList.Len()
trashq.ReplaceQueue(trashList)
- time.Sleep(10 * time.Millisecond) // give a moment to finish processing the list
+
+ // Wait for worker to take request(s)
+ expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.CountOutstanding() })
+ expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.CountInProgress() })
+
+ // Ensure status.json also reports work is happening
+ assertStatusItem("InProgress", float64(1))
+ assertStatusItem("Outstanding", float64(listLen))
+ assertStatusItem("Queued", float64(listLen-1))
+
+ // Let worker proceed
+ close(gate)
+
+ // Wait for worker to finish
+ expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.CountOutstanding() })
// Verify Locator1 to be un/deleted as expected
data, _ := GetBlock(testData.Locator1, false)
// Readonly volumes return an error for Put, Delete, and
// Touch.
Readonly bool
- called map[string]int
- mutex sync.Mutex
+ // Every operation (except Status) starts by receiving from
+ // Gate. Send one value to unblock one operation; close the
+ // channel to unblock all. By default, it is a closed channel,
+ // so all operations proceed without blocking.
+ Gate chan struct{}
+ called map[string]int
+ mutex sync.Mutex
}
// CreateMockVolume returns a non-Bad, non-Readonly, Touchable mock
// volume.
func CreateMockVolume() *MockVolume {
+ gate := make(chan struct{})
+ close(gate)
return &MockVolume{
Store: make(map[string][]byte),
Timestamps: make(map[string]time.Time),
Touchable: true,
Readonly: false,
called: map[string]int{},
+ Gate: gate,
}
}
func (v *MockVolume) Get(loc string) ([]byte, error) {
v.gotCall("Get")
+ <-v.Gate
if v.Bad {
return nil, errors.New("Bad volume")
} else if block, ok := v.Store[loc]; ok {
func (v *MockVolume) Put(loc string, block []byte) error {
v.gotCall("Put")
+ <-v.Gate
if v.Bad {
return errors.New("Bad volume")
}
func (v *MockVolume) Touch(loc string) error {
v.gotCall("Touch")
+ <-v.Gate
if v.Readonly {
return MethodDisabledError
}
func (v *MockVolume) Mtime(loc string) (time.Time, error) {
v.gotCall("Mtime")
+ <-v.Gate
var mtime time.Time
var err error
if v.Bad {
func (v *MockVolume) IndexTo(prefix string, w io.Writer) error {
v.gotCall("IndexTo")
+ <-v.Gate
for loc, block := range v.Store {
if !IsValidLocator(loc) || !strings.HasPrefix(loc, prefix) {
continue
func (v *MockVolume) Delete(loc string) error {
v.gotCall("Delete")
+ <-v.Gate
if v.Readonly {
return MethodDisabledError
}