+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
/* A WorkQueue is an asynchronous thread-safe queue manager. It
import "container/list"
+// WorkQueue definition
type WorkQueue struct {
- newlist chan *list.List
- NextItem chan interface{}
+ getStatus chan WorkQueueStatus
+ newlist chan *list.List
+ // Workers get work items by reading from this channel.
+ NextItem <-chan interface{}
+ // Each worker must send struct{}{} to DoneItem exactly once
+ // for each work item received from NextItem, when it stops
+ // working on that item (regardless of whether the work was
+ // successful).
+ DoneItem chan<- struct{}
+}
+
+// WorkQueueStatus reflects the queue status.
+type WorkQueueStatus struct {
+ InProgress int
+ Queued int
}
-// NewWorkQueue returns a new worklist, and launches a listener
-// goroutine that waits for work and farms it out to workers.
+// NewWorkQueue returns a new empty WorkQueue.
//
func NewWorkQueue() *WorkQueue {
+ nextItem := make(chan interface{})
+ reportDone := make(chan struct{})
+ newList := make(chan *list.List)
b := WorkQueue{
- newlist: make(chan *list.List),
- NextItem: make(chan interface{}),
+ getStatus: make(chan WorkQueueStatus),
+ newlist: newList,
+ NextItem: nextItem,
+ DoneItem: reportDone,
}
- go b.listen()
+ go func() {
+ // Read new work lists from the newlist channel.
+ // Reply to "status" and "get next item" queries by
+ // sending to the getStatus and nextItem channels
+ // respectively. Return when the newlist channel
+ // closes.
+
+ todo := &list.List{}
+ status := WorkQueueStatus{}
+
+ // When we're done, close the output channel; workers will
+ // shut down next time they ask for new work.
+ defer close(nextItem)
+ defer close(b.getStatus)
+
+ // nextChan and nextVal are both nil when we have
+ // nothing to send; otherwise they are, respectively,
+ // the nextItem channel and the next work item to send
+ // to it.
+ var nextChan chan interface{}
+ var nextVal interface{}
+
+ for newList != nil || status.InProgress > 0 {
+ select {
+ case p, ok := <-newList:
+ if !ok {
+ // Closed, stop receiving
+ newList = nil
+ }
+ todo = p
+ if todo == nil {
+ todo = &list.List{}
+ }
+ status.Queued = todo.Len()
+ if status.Queued == 0 {
+ // Stop sending work
+ nextChan = nil
+ nextVal = nil
+ } else {
+ nextChan = nextItem
+ nextVal = todo.Front().Value
+ }
+ case nextChan <- nextVal:
+ todo.Remove(todo.Front())
+ status.InProgress++
+ status.Queued--
+ if status.Queued == 0 {
+ // Stop sending work
+ nextChan = nil
+ nextVal = nil
+ } else {
+ nextVal = todo.Front().Value
+ }
+ case <-reportDone:
+ status.InProgress--
+ case b.getStatus <- status:
+ }
+ }
+ }()
return &b
}
-// ReplaceQueue sends a new list of pull requests to the manager goroutine.
-// The manager will discard any outstanding pull list and begin
-// working on the new list.
+// ReplaceQueue abandons any work items left in the existing queue,
+// and starts giving workers items from the given list. After giving
+// it to ReplaceQueue, the caller must not read or write the given
+// list.
//
func (b *WorkQueue) ReplaceQueue(list *list.List) {
b.newlist <- list
}
// Close shuts down the manager and terminates the goroutine, which
-// completes any pull request in progress and abandons any pending
-// requests.
+// abandons any pending requests, but allows any pull request already
+// in progress to continue.
+//
+// After Close, Status will return correct values, NextItem will be
+// closed, and ReplaceQueue will panic.
//
func (b *WorkQueue) Close() {
close(b.newlist)
}
-// listen is run in a goroutine. It reads new pull lists from its
-// input queue until the queue is closed.
-// listen takes ownership of the list that is passed to it.
+// Status returns an up-to-date WorkQueueStatus reflecting the current
+// queue status.
//
-// Note that the routine does not ever need to access the list
-// itself once the current_item has been initialized, so we do
-// not bother to keep a pointer to the list. Because it is a
-// doubly linked list, holding on to the current item will keep
-// it from garbage collection.
-//
-func (b *WorkQueue) listen() {
- var current_item *list.Element
-
- // When we're done, close the output channel to shut down any
- // workers.
- defer close(b.NextItem)
-
- for {
- // If the current list is empty, wait for a new list before
- // even checking if workers are ready.
- if current_item == nil {
- if p, ok := <-b.newlist; ok {
- current_item = p.Front()
- } else {
- // The channel was closed; shut down.
- return
- }
- }
- select {
- case p, ok := <-b.newlist:
- if ok {
- current_item = p.Front()
- } else {
- // The input channel is closed; time to shut down
- return
- }
- case b.NextItem <- current_item.Value:
- current_item = current_item.Next()
- }
- }
+func (b *WorkQueue) Status() WorkQueueStatus {
+ // If the channel is closed, we get the nil value of
+ // WorkQueueStatus, which is an accurate description of a
+ // finished queue.
+ return <-b.getStatus
}