From 5562d6d556a942b66ea392c1e9bc803f9b9733e7 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Mon, 10 Aug 2015 11:56:48 -0400 Subject: [PATCH] 6260: Expose queue sizes in /status.json. Fix sleep/race in trash_worker_test. --- services/keepstore/handlers.go | 31 +++++++++++++++++++---- services/keepstore/pull_worker_test.go | 8 ++++++ services/keepstore/status_test.go | 15 +++++++++++ services/keepstore/trash_worker_test.go | 33 ++++++++++++++++++++++++- services/keepstore/volume_test.go | 18 ++++++++++++-- 5 files changed, 97 insertions(+), 8 deletions(-) create mode 100644 services/keepstore/status_test.go diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go index c4ecfb4c88..a6665f6164 100644 --- a/services/keepstore/handlers.go +++ b/services/keepstore/handlers.go @@ -192,9 +192,17 @@ type PoolStatus struct { Len int `json:"BuffersInUse"` } +type WorkQueueStatus struct { + InProgress int + Outstanding int + Queued int +} + type NodeStatus struct { Volumes []*VolumeStatus `json:"volumes"` BufferPool PoolStatus + PullQueue WorkQueueStatus + TrashQueue WorkQueueStatus Memory runtime.MemStats } @@ -203,7 +211,7 @@ var stLock sync.Mutex func StatusHandler(resp http.ResponseWriter, req *http.Request) { stLock.Lock() - ReadNodeStatus(&st) + readNodeStatus(&st) jstat, err := json.Marshal(&st) stLock.Unlock() if err == nil { @@ -215,10 +223,8 @@ func StatusHandler(resp http.ResponseWriter, req *http.Request) { } } -// ReadNodeStatus populates the given NodeStatus struct with current -// values. -// -func ReadNodeStatus(st *NodeStatus) { +// populate the given NodeStatus struct with current values. +func readNodeStatus(st *NodeStatus) { vols := KeepVM.AllReadable() if cap(st.Volumes) < len(vols) { st.Volumes = make([]*VolumeStatus, len(vols)) @@ -232,9 +238,24 @@ func ReadNodeStatus(st *NodeStatus) { st.BufferPool.Alloc = bufs.Alloc() st.BufferPool.Cap = bufs.Cap() st.BufferPool.Len = bufs.Len() + readWorkQueueStatus(&st.PullQueue, pullq) + readWorkQueueStatus(&st.TrashQueue, trashq) runtime.ReadMemStats(&st.Memory) } +// Populate a WorkQueueStatus. This is not atomic, so race conditions +// can cause InProgress + Queued != Outstanding. +func readWorkQueueStatus(st *WorkQueueStatus, q *WorkQueue) { + if q == nil { + // This should only happen during tests. + *st = WorkQueueStatus{} + return + } + st.InProgress = q.CountInProgress() + st.Outstanding = q.CountOutstanding() + st.Queued = q.CountQueued() +} + // DeleteHandler processes DELETE requests. // // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash diff --git a/services/keepstore/pull_worker_test.go b/services/keepstore/pull_worker_test.go index 0d4f9be75b..e8d390ab2b 100644 --- a/services/keepstore/pull_worker_test.go +++ b/services/keepstore/pull_worker_test.go @@ -236,6 +236,9 @@ func (s *PullWorkerTestSuite) TestPullWorker_invalid_data_manager_token(c *C) { } func performTest(testData PullWorkerTestData, c *C) { + KeepVM = MakeTestVolumeManager(2) + defer KeepVM.Close() + RunTestPullWorker(c) defer pullq.Close() @@ -249,6 +252,7 @@ func performTest(testData PullWorkerTestData, c *C) { GetContent = orig }(GetContent) GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) { + c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(1)) processedPullLists[testData.name] = testData.response_body if testData.read_error { err = errors.New("Error getting data") @@ -276,6 +280,10 @@ func performTest(testData PullWorkerTestData, c *C) { } } + c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(0)) + c.Assert(getStatusItem("PullQueue", "Outstanding"), Equals, float64(0)) + c.Assert(getStatusItem("PullQueue", "Queued"), Equals, float64(0)) + response := IssueRequest(&testData.req) c.Assert(response.Code, Equals, testData.response_code) c.Assert(response.Body.String(), Equals, testData.response_body) diff --git a/services/keepstore/status_test.go b/services/keepstore/status_test.go new file mode 100644 index 0000000000..134b016625 --- /dev/null +++ b/services/keepstore/status_test.go @@ -0,0 +1,15 @@ +package main + +import ( + "encoding/json" +) + +func getStatusItem(keys ...string) interface{} { + resp := IssueRequest(&RequestTester{"/status.json", "", "GET", nil}) + var s interface{} + json.NewDecoder(resp.Body).Decode(&s) + for _, k := range keys { + s = s.(map[string]interface{})[k] + } + return s +} diff --git a/services/keepstore/trash_worker_test.go b/services/keepstore/trash_worker_test.go index 8268191b08..433eef5786 100644 --- a/services/keepstore/trash_worker_test.go +++ b/services/keepstore/trash_worker_test.go @@ -258,8 +258,39 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) { } go RunTrashWorker(trashq) + // Install gate so all local operations block until we say go + gate := make(chan struct{}) + for _, v := range vols { + v.(*MockVolume).Gate = gate + } + + assertStatusItem := func(k string, expect float64) { + if v := getStatusItem("TrashQueue", k); v != expect { + t.Errorf("Got %s %v, expected %v", k, v, expect) + } + } + + assertStatusItem("InProgress", 0) + assertStatusItem("Outstanding", 0) + assertStatusItem("Queued", 0) + + listLen := trashList.Len() trashq.ReplaceQueue(trashList) - time.Sleep(10 * time.Millisecond) // give a moment to finish processing the list + + // Wait for worker to take request(s) + expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.CountOutstanding() }) + expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.CountInProgress() }) + + // Ensure status.json also reports work is happening + assertStatusItem("InProgress", float64(1)) + assertStatusItem("Outstanding", float64(listLen)) + assertStatusItem("Queued", float64(listLen-1)) + + // Let worker proceed + close(gate) + + // Wait for worker to finish + expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.CountOutstanding() }) // Verify Locator1 to be un/deleted as expected data, _ := GetBlock(testData.Locator1, false) diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go index 261501992f..d660017aaa 100644 --- a/services/keepstore/volume_test.go +++ b/services/keepstore/volume_test.go @@ -22,13 +22,20 @@ type MockVolume struct { // Readonly volumes return an error for Put, Delete, and // Touch. Readonly bool - called map[string]int - mutex sync.Mutex + // Every operation (except Status) starts by receiving from + // Gate. Send one value to unblock one operation; close the + // channel to unblock all. By default, it is a closed channel, + // so all operations proceed without blocking. + Gate chan struct{} + called map[string]int + mutex sync.Mutex } // CreateMockVolume returns a non-Bad, non-Readonly, Touchable mock // volume. func CreateMockVolume() *MockVolume { + gate := make(chan struct{}) + close(gate) return &MockVolume{ Store: make(map[string][]byte), Timestamps: make(map[string]time.Time), @@ -36,6 +43,7 @@ func CreateMockVolume() *MockVolume { Touchable: true, Readonly: false, called: map[string]int{}, + Gate: gate, } } @@ -62,6 +70,7 @@ func (v *MockVolume) gotCall(method string) { func (v *MockVolume) Get(loc string) ([]byte, error) { v.gotCall("Get") + <-v.Gate if v.Bad { return nil, errors.New("Bad volume") } else if block, ok := v.Store[loc]; ok { @@ -74,6 +83,7 @@ func (v *MockVolume) Get(loc string) ([]byte, error) { func (v *MockVolume) Put(loc string, block []byte) error { v.gotCall("Put") + <-v.Gate if v.Bad { return errors.New("Bad volume") } @@ -86,6 +96,7 @@ func (v *MockVolume) Put(loc string, block []byte) error { func (v *MockVolume) Touch(loc string) error { v.gotCall("Touch") + <-v.Gate if v.Readonly { return MethodDisabledError } @@ -98,6 +109,7 @@ func (v *MockVolume) Touch(loc string) error { func (v *MockVolume) Mtime(loc string) (time.Time, error) { v.gotCall("Mtime") + <-v.Gate var mtime time.Time var err error if v.Bad { @@ -112,6 +124,7 @@ func (v *MockVolume) Mtime(loc string) (time.Time, error) { func (v *MockVolume) IndexTo(prefix string, w io.Writer) error { v.gotCall("IndexTo") + <-v.Gate for loc, block := range v.Store { if !IsValidLocator(loc) || !strings.HasPrefix(loc, prefix) { continue @@ -127,6 +140,7 @@ func (v *MockVolume) IndexTo(prefix string, w io.Writer) error { func (v *MockVolume) Delete(loc string) error { v.gotCall("Delete") + <-v.Gate if v.Readonly { return MethodDisabledError } -- 2.30.2