3705: rename BlockWorkList -> WorkQueue
authorTim Pierce <twp@curoverse.com>
Fri, 19 Sep 2014 20:13:28 +0000 (16:13 -0400)
committerTim Pierce <twp@curoverse.com>
Fri, 19 Sep 2014 20:13:28 +0000 (16:13 -0400)
Per discussion at
https://github.com/curoverse/arvados/pull/8#discussion_r17637500

refs #3705.

services/keepstore/handler_test.go
services/keepstore/handlers.go
services/keepstore/keepstore.go
services/keepstore/work_queue.go [moved from services/keepstore/block_work_list.go with 80% similarity]
services/keepstore/work_queue_test.go [moved from services/keepstore/block_work_list_test.go with 83% similarity]

index deb1c3dd110d146d50efa1e7bd5fc3e8711c5281..0cfa1f30ddbd1e74ab7f5a25f931bac3905dadd3 100644 (file)
@@ -591,7 +591,7 @@ func TestDeleteHandler(t *testing.T) {
 //
 // TODO(twp): test concurrency: launch 100 goroutines to update the
 // pull list simultaneously.  Make sure that none of them return 400
-// Bad Request and that pullmgr.GetList() returns a valid list.
+// Bad Request and that pullq.GetList() returns a valid list.
 //
 func TestPullHandler(t *testing.T) {
        defer teardown()
@@ -665,7 +665,7 @@ func TestPullHandler(t *testing.T) {
        // requests on it.
        var output_list = make([]PullRequest, 3)
        for i := 0; i < 3; i++ {
-               item := <-pullmgr.NextItem
+               item := <-pullq.NextItem
                if pr, ok := item.(PullRequest); ok {
                        output_list[i] = pr
                } else {
index 809520769754506f8c3bae8f7b49db6df3bf4bb0..fde60879a25659d1e953a32ee8677f8769c51044 100644 (file)
@@ -472,10 +472,10 @@ func PullHandler(resp http.ResponseWriter, req *http.Request) {
                plist.PushBack(p)
        }
 
-       if pullmgr == nil {
-               pullmgr = NewBlockWorkList()
+       if pullq == nil {
+               pullq = NewWorkQueue()
        }
-       pullmgr.ReplaceList(plist)
+       pullq.ReplaceQueue(plist)
 }
 
 // ==============================
index 06054f5205cf7d38b01b179ee5365230120ccc89..2437638cff8aa2bc9491a8af4ed86e4605f07ff8 100644 (file)
@@ -91,12 +91,12 @@ func (e *KeepError) Error() string {
 // Initialized by the --volumes flag (or by FindKeepVolumes).
 var KeepVM VolumeManager
 
-// The pull list manager is a singleton pull list (a list of blocks
+// The pull list queue is a singleton pull list (a list of blocks
 // that the current keepstore process should be pulling from remote
 // keepstore servers in order to increase data replication) with
 // atomic update methods that are safe to use from multiple
 // goroutines.
-var pullmgr *BlockWorkList
+var pullq *WorkQueue
 
 // TODO(twp): continue moving as much code as possible out of main
 // so it can be effectively tested. Esp. handling and postprocessing
similarity index 80%
rename from services/keepstore/block_work_list.go
rename to services/keepstore/work_queue.go
index 4c5aeb1803942718fea8cc74415fb233baeb0ec8..9509cacd774f5acd64372b55b1841bab55e5b023 100644 (file)
@@ -1,19 +1,19 @@
 package main
 
-/* A BlockWorkList is an asynchronous thread-safe queue manager.  It
+/* A WorkQueue is an asynchronous thread-safe queue manager.  It
    provides a channel from which items can be read off the queue, and
    permits replacing the contents of the queue at any time.
 
-   The overall work flow for a BlockWorkList is as follows:
+   The overall work flow for a WorkQueue is as follows:
 
-     1. A BlockWorkList is created with NewBlockWorkList().  This
-        function instantiates a new BlockWorkList and starts a manager
+     1. A WorkQueue is created with NewWorkQueue().  This
+        function instantiates a new WorkQueue and starts a manager
         goroutine.  The manager listens on an input channel
         (manager.newlist) and an output channel (manager.NextItem).
 
      2. The manager first waits for a new list of requests on the
         newlist channel.  When another goroutine calls
-        manager.ReplaceList(lst), it sends lst over the newlist
+        manager.ReplaceQueue(lst), it sends lst over the newlist
         channel to the manager.  The manager goroutine now has
         ownership of the list.
 
@@ -34,24 +34,24 @@ package main
              output channel (signalling any workers to quit) and
              terminates.
 
-   Tasks currently handled by BlockWorkList:
+   Tasks currently handled by WorkQueue:
      * the pull list
      * the trash list
 
    Example usage:
 
         // Any kind of user-defined type can be used with the
-        // BlockWorkList.
+        // WorkQueue.
                type FrobRequest struct {
                        frob string
                }
 
                // Make a work list.
-               froblist := NewBlockWorkList()
+               froblist := NewWorkQueue()
 
                // Start a concurrent worker to read items from the NextItem
                // channel until it is closed, deleting each one.
-               go func(list BlockWorkList) {
+               go func(list WorkQueue) {
                        for i := range list.NextItem {
                                req := i.(FrobRequest)
                                frob.Run(req)
@@ -65,16 +65,16 @@ package main
                                // of FrobRequests, and give this list to the
                                // frob manager.
                                newfrobs := parseBody(req.Body)
-                               froblist.ReplaceList(newfrobs)
+                               froblist.ReplaceQueue(newfrobs)
                        }).Methods("PUT")
 
-   Methods available on a BlockWorkList:
+   Methods available on a WorkQueue:
 
-               ReplaceList(list)
+               ReplaceQueue(list)
                        Replaces the current item list with a new one.  The list
             manager discards any unprocessed items on the existing
             list and replaces it with the new one. If the worker is
-            processing a list item when ReplaceList is called, it
+            processing a list item when ReplaceQueue is called, it
             finishes processing before receiving items from the new
             list.
                Close()
@@ -84,16 +84,16 @@ package main
 
 import "container/list"
 
-type BlockWorkList struct {
+type WorkQueue struct {
        newlist  chan *list.List
        NextItem chan interface{}
 }
 
-// NewBlockWorkList returns a new worklist, and launches a listener
+// NewWorkQueue returns a new worklist, and launches a listener
 // goroutine that waits for work and farms it out to workers.
 //
-func NewBlockWorkList() *BlockWorkList {
-       b := BlockWorkList{
+func NewWorkQueue() *WorkQueue {
+       b := WorkQueue{
                newlist:  make(chan *list.List),
                NextItem: make(chan interface{}),
        }
@@ -101,11 +101,11 @@ func NewBlockWorkList() *BlockWorkList {
        return &b
 }
 
-// ReplaceList sends a new list of pull requests to the manager goroutine.
+// ReplaceQueue sends a new list of pull requests to the manager goroutine.
 // The manager will discard any outstanding pull list and begin
 // working on the new list.
 //
-func (b *BlockWorkList) ReplaceList(list *list.List) {
+func (b *WorkQueue) ReplaceQueue(list *list.List) {
        b.newlist <- list
 }
 
@@ -113,7 +113,7 @@ func (b *BlockWorkList) ReplaceList(list *list.List) {
 // completes any pull request in progress and abandons any pending
 // requests.
 //
-func (b *BlockWorkList) Close() {
+func (b *WorkQueue) Close() {
        close(b.newlist)
 }
 
@@ -127,7 +127,7 @@ func (b *BlockWorkList) Close() {
 // doubly linked list, holding on to the current item will keep
 // it from garbage collection.
 //
-func (b *BlockWorkList) listen() {
+func (b *WorkQueue) listen() {
        var current_item *list.Element
 
        // When we're done, close the output channel to shut down any
similarity index 83%
rename from services/keepstore/block_work_list_test.go
rename to services/keepstore/work_queue_test.go
index c3df40002d3eab1834fb40956f29e684ff16cc76..144e4c252be9ba1a069d8cf862f72c4284a28479 100644 (file)
@@ -49,12 +49,12 @@ func expectFromChannel(t *testing.T, c <-chan interface{}, expected []int) {
        }
 }
 
-// Create a BlockWorkList, generate a list for it, and instantiate a worker.
-func TestBlockWorkListReadWrite(t *testing.T) {
+// Create a WorkQueue, generate a list for it, and instantiate a worker.
+func TestWorkQueueReadWrite(t *testing.T) {
        var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
 
-       b := NewBlockWorkList()
-       b.ReplaceList(makeTestWorkList(input))
+       b := NewWorkQueue()
+       b.ReplaceQueue(makeTestWorkList(input))
 
        expectFromChannel(t, b.NextItem, input)
        expectChannelEmpty(t, b.NextItem)
@@ -62,10 +62,10 @@ func TestBlockWorkListReadWrite(t *testing.T) {
 }
 
 // Start a worker before the list has any input.
-func TestBlockWorkListEarlyRead(t *testing.T) {
+func TestWorkQueueEarlyRead(t *testing.T) {
        var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
 
-       b := NewBlockWorkList()
+       b := NewWorkQueue()
 
        // First, demonstrate that nothing is available on the NextItem
        // channel.
@@ -83,7 +83,7 @@ func TestBlockWorkListEarlyRead(t *testing.T) {
 
        // Feed the blocklist a new worklist, and wait for the worker to
        // finish.
-       b.ReplaceList(makeTestWorkList(input))
+       b.ReplaceQueue(makeTestWorkList(input))
        <-done
 
        expectChannelClosed(t, b.NextItem)
@@ -92,13 +92,13 @@ func TestBlockWorkListEarlyRead(t *testing.T) {
 // Show that a reader may block when the manager's list is exhausted,
 // and that the reader resumes automatically when new data is
 // available.
-func TestBlockWorkListReaderBlocks(t *testing.T) {
+func TestWorkQueueReaderBlocks(t *testing.T) {
        var (
                inputBeforeBlock = []int{1, 2, 3, 4, 5}
                inputAfterBlock  = []int{6, 7, 8, 9, 10}
        )
 
-       b := NewBlockWorkList()
+       b := NewWorkQueue()
        sendmore := make(chan int)
        done := make(chan int)
        go func() {
@@ -117,22 +117,22 @@ func TestBlockWorkListReaderBlocks(t *testing.T) {
 
        // Write a slice of the first five elements and wait for the
        // reader to signal that it's ready for us to send more input.
-       b.ReplaceList(makeTestWorkList(inputBeforeBlock))
+       b.ReplaceQueue(makeTestWorkList(inputBeforeBlock))
        <-sendmore
 
-       b.ReplaceList(makeTestWorkList(inputAfterBlock))
+       b.ReplaceQueue(makeTestWorkList(inputAfterBlock))
 
        // Wait for the reader to complete.
        <-done
 }
 
 // Replace one active work list with another.
-func TestBlockWorkListReplaceList(t *testing.T) {
+func TestWorkQueueReplaceQueue(t *testing.T) {
        var firstInput = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
        var replaceInput = []int{1, 4, 9, 16, 25, 36, 49, 64, 81}
 
-       b := NewBlockWorkList()
-       b.ReplaceList(makeTestWorkList(firstInput))
+       b := NewWorkQueue()
+       b.ReplaceQueue(makeTestWorkList(firstInput))
 
        // Read just the first five elements from the work list.
        // Confirm that the channel is not empty.
@@ -142,7 +142,7 @@ func TestBlockWorkListReplaceList(t *testing.T) {
        // Replace the work list and read five more elements.
        // The old list should have been discarded and all new
        // elements come from the new list.
-       b.ReplaceList(makeTestWorkList(replaceInput))
+       b.ReplaceQueue(makeTestWorkList(replaceInput))
        expectFromChannel(t, b.NextItem, replaceInput[0:5])
 
        b.Close()