Refactor the multi-host salt install page.
[arvados.git] / services / keepstore / work_queue.go
index 9509cacd774f5acd64372b55b1841bab55e5b023..4c46ec8e65f6cf546324d8f0f6e25056d65de9a9 100644 (file)
@@ -1,4 +1,8 @@
-package main
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package keepstore
 
 /* A WorkQueue is an asynchronous thread-safe queue manager.  It
    provides a channel from which items can be read off the queue, and
@@ -84,77 +88,125 @@ package main
 
 import "container/list"
 
+// WorkQueue definition
 type WorkQueue struct {
-       newlist  chan *list.List
-       NextItem chan interface{}
+       getStatus chan WorkQueueStatus
+       newlist   chan *list.List
+       // Workers get work items by reading from this channel.
+       NextItem <-chan interface{}
+       // Each worker must send struct{}{} to DoneItem exactly once
+       // for each work item received from NextItem, when it stops
+       // working on that item (regardless of whether the work was
+       // successful).
+       DoneItem chan<- struct{}
+}
+
+// WorkQueueStatus reflects the queue status.
+type WorkQueueStatus struct {
+       InProgress int
+       Queued     int
 }
 
-// NewWorkQueue returns a new worklist, and launches a listener
-// goroutine that waits for work and farms it out to workers.
+// NewWorkQueue returns a new empty WorkQueue.
 //
 func NewWorkQueue() *WorkQueue {
+       nextItem := make(chan interface{})
+       reportDone := make(chan struct{})
+       newList := make(chan *list.List)
        b := WorkQueue{
-               newlist:  make(chan *list.List),
-               NextItem: make(chan interface{}),
+               getStatus: make(chan WorkQueueStatus),
+               newlist:   newList,
+               NextItem:  nextItem,
+               DoneItem:  reportDone,
        }
-       go b.listen()
+       go func() {
+               // Read new work lists from the newlist channel.
+               // Reply to "status" and "get next item" queries by
+               // sending to the getStatus and nextItem channels
+               // respectively. Return when the newlist channel
+               // closes.
+
+               todo := &list.List{}
+               status := WorkQueueStatus{}
+
+               // When we're done, close the output channel; workers will
+               // shut down next time they ask for new work.
+               defer close(nextItem)
+               defer close(b.getStatus)
+
+               // nextChan and nextVal are both nil when we have
+               // nothing to send; otherwise they are, respectively,
+               // the nextItem channel and the next work item to send
+               // to it.
+               var nextChan chan interface{}
+               var nextVal interface{}
+
+               for newList != nil || status.InProgress > 0 {
+                       select {
+                       case p, ok := <-newList:
+                               if !ok {
+                                       // Closed, stop receiving
+                                       newList = nil
+                               }
+                               todo = p
+                               if todo == nil {
+                                       todo = &list.List{}
+                               }
+                               status.Queued = todo.Len()
+                               if status.Queued == 0 {
+                                       // Stop sending work
+                                       nextChan = nil
+                                       nextVal = nil
+                               } else {
+                                       nextChan = nextItem
+                                       nextVal = todo.Front().Value
+                               }
+                       case nextChan <- nextVal:
+                               todo.Remove(todo.Front())
+                               status.InProgress++
+                               status.Queued--
+                               if status.Queued == 0 {
+                                       // Stop sending work
+                                       nextChan = nil
+                                       nextVal = nil
+                               } else {
+                                       nextVal = todo.Front().Value
+                               }
+                       case <-reportDone:
+                               status.InProgress--
+                       case b.getStatus <- status:
+                       }
+               }
+       }()
        return &b
 }
 
-// ReplaceQueue sends a new list of pull requests to the manager goroutine.
-// The manager will discard any outstanding pull list and begin
-// working on the new list.
+// ReplaceQueue abandons any work items left in the existing queue,
+// and starts giving workers items from the given list. After giving
+// it to ReplaceQueue, the caller must not read or write the given
+// list.
 //
 func (b *WorkQueue) ReplaceQueue(list *list.List) {
        b.newlist <- list
 }
 
 // Close shuts down the manager and terminates the goroutine, which
-// completes any pull request in progress and abandons any pending
-// requests.
+// abandons any pending requests, but allows any pull request already
+// in progress to continue.
+//
+// After Close, Status will return correct values, NextItem will be
+// closed, and ReplaceQueue will panic.
 //
 func (b *WorkQueue) Close() {
        close(b.newlist)
 }
 
-// listen is run in a goroutine. It reads new pull lists from its
-// input queue until the queue is closed.
-// listen takes ownership of the list that is passed to it.
+// Status returns an up-to-date WorkQueueStatus reflecting the current
+// queue status.
 //
-// Note that the routine does not ever need to access the list
-// itself once the current_item has been initialized, so we do
-// not bother to keep a pointer to the list. Because it is a
-// doubly linked list, holding on to the current item will keep
-// it from garbage collection.
-//
-func (b *WorkQueue) listen() {
-       var current_item *list.Element
-
-       // When we're done, close the output channel to shut down any
-       // workers.
-       defer close(b.NextItem)
-
-       for {
-               // If the current list is empty, wait for a new list before
-               // even checking if workers are ready.
-               if current_item == nil {
-                       if p, ok := <-b.newlist; ok {
-                               current_item = p.Front()
-                       } else {
-                               // The channel was closed; shut down.
-                               return
-                       }
-               }
-               select {
-               case p, ok := <-b.newlist:
-                       if ok {
-                               current_item = p.Front()
-                       } else {
-                               // The input channel is closed; time to shut down
-                               return
-                       }
-               case b.NextItem <- current_item.Value:
-                       current_item = current_item.Next()
-               }
-       }
+func (b *WorkQueue) Status() WorkQueueStatus {
+       // If the channel is closed, we get the nil value of
+       // WorkQueueStatus, which is an accurate description of a
+       // finished queue.
+       return <-b.getStatus
 }