Refactor the multi-host salt install page.
[arvados.git] / services / keepstore / work_queue.go
index 6ec52742a57afb826128678950d9c90000553233..4c46ec8e65f6cf546324d8f0f6e25056d65de9a9 100644 (file)
@@ -1,4 +1,8 @@
-package main
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package keepstore
 
 /* A WorkQueue is an asynchronous thread-safe queue manager.  It
    provides a channel from which items can be read off the queue, and
@@ -84,18 +88,23 @@ package main
 
 import "container/list"
 
+// WorkQueue definition
 type WorkQueue struct {
-       countInProgress  chan int
-       countOutstanding chan int
-       countQueued      chan int
-       newlist          chan *list.List
+       getStatus chan WorkQueueStatus
+       newlist   chan *list.List
        // Workers get work items by reading from this channel.
        NextItem <-chan interface{}
-       // Each worker must send struct{}{} to ReportDone exactly once
+       // Each worker must send struct{}{} to DoneItem exactly once
        // for each work item received from NextItem, when it stops
        // working on that item (regardless of whether the work was
        // successful).
-       ReportDone chan<- struct{}
+       DoneItem chan<- struct{}
+}
+
+// WorkQueueStatus reflects the queue status.
+type WorkQueueStatus struct {
+       InProgress int
+       Queued     int
 }
 
 // NewWorkQueue returns a new empty WorkQueue.
@@ -105,33 +114,34 @@ func NewWorkQueue() *WorkQueue {
        reportDone := make(chan struct{})
        newList := make(chan *list.List)
        b := WorkQueue{
-               countQueued:      make(chan int),
-               countInProgress:  make(chan int),
-               countOutstanding: make(chan int),
-               newlist:          newList,
-               NextItem:         nextItem,
-               ReportDone:       reportDone,
+               getStatus: make(chan WorkQueueStatus),
+               newlist:   newList,
+               NextItem:  nextItem,
+               DoneItem:  reportDone,
        }
        go func() {
                // Read new work lists from the newlist channel.
-               // Reply to "length" and "get next item" queries by
-               // sending to the countQueued and nextItem channels
+               // Reply to "status" and "get next item" queries by
+               // sending to the getStatus and nextItem channels
                // respectively. Return when the newlist channel
                // closes.
 
                todo := &list.List{}
-               countInProgress := 0
+               status := WorkQueueStatus{}
 
                // When we're done, close the output channel; workers will
                // shut down next time they ask for new work.
                defer close(nextItem)
-               defer close(b.countInProgress)
-               defer close(b.countOutstanding)
-               defer close(b.countQueued)
+               defer close(b.getStatus)
 
+               // nextChan and nextVal are both nil when we have
+               // nothing to send; otherwise they are, respectively,
+               // the nextItem channel and the next work item to send
+               // to it.
                var nextChan chan interface{}
                var nextVal interface{}
-               for newList != nil || countInProgress > 0 {
+
+               for newList != nil || status.InProgress > 0 {
                        select {
                        case p, ok := <-newList:
                                if !ok {
@@ -142,7 +152,8 @@ func NewWorkQueue() *WorkQueue {
                                if todo == nil {
                                        todo = &list.List{}
                                }
-                               if todo.Len() == 0 {
+                               status.Queued = todo.Len()
+                               if status.Queued == 0 {
                                        // Stop sending work
                                        nextChan = nil
                                        nextVal = nil
@@ -151,9 +162,10 @@ func NewWorkQueue() *WorkQueue {
                                        nextVal = todo.Front().Value
                                }
                        case nextChan <- nextVal:
-                               countInProgress++
                                todo.Remove(todo.Front())
-                               if todo.Len() == 0 {
+                               status.InProgress++
+                               status.Queued--
+                               if status.Queued == 0 {
                                        // Stop sending work
                                        nextChan = nil
                                        nextVal = nil
@@ -161,10 +173,8 @@ func NewWorkQueue() *WorkQueue {
                                        nextVal = todo.Front().Value
                                }
                        case <-reportDone:
-                               countInProgress--
-                       case b.countInProgress <- countInProgress:
-                       case b.countOutstanding <- todo.Len() + countInProgress:
-                       case b.countQueued <- todo.Len():
+                               status.InProgress--
+                       case b.getStatus <- status:
                        }
                }
        }()
@@ -184,31 +194,19 @@ func (b *WorkQueue) ReplaceQueue(list *list.List) {
 // abandons any pending requests, but allows any pull request already
 // in progress to continue.
 //
-// After Close, CountX methods will return correct values, NextItem
-// will be closed, and ReplaceQueue will panic.
+// After Close, Status will return correct values, NextItem will be
+// closed, and ReplaceQueue will panic.
 //
 func (b *WorkQueue) Close() {
        close(b.newlist)
 }
 
-// CountOutstanding returns the number of items in the queue or in
-// progress. A return value of 0 guarantees all existing work (work
-// that was sent to ReplaceQueue before CountOutstanding was called)
-// has completed.
-//
-func (b *WorkQueue) CountOutstanding() int {
-       // If the channel is closed, we get zero, which is correct.
-       return <-b.countOutstanding
-}
-
-// CountQueued returns the number of items in the current queue.
-//
-func (b *WorkQueue) CountQueued() int {
-       return <-b.countQueued
-}
-
-// Len returns the number of items in progress.
+// Status returns an up-to-date WorkQueueStatus reflecting the current
+// queue status.
 //
-func (b *WorkQueue) CountInProgress() int {
-       return <-b.countInProgress
+func (b *WorkQueue) Status() WorkQueueStatus {
+       // If the channel is closed, we get the nil value of
+       // WorkQueueStatus, which is an accurate description of a
+       // finished queue.
+       return <-b.getStatus
 }