X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/4de0af809ffbef43d89cd1751e5d611a4b5445e9..b4209cce618b79a1cc022a85f860268a988201f1:/services/keepstore/work_queue.go diff --git a/services/keepstore/work_queue.go b/services/keepstore/work_queue.go index 6ec52742a5..56c6376ad3 100644 --- a/services/keepstore/work_queue.go +++ b/services/keepstore/work_queue.go @@ -1,3 +1,7 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main /* A WorkQueue is an asynchronous thread-safe queue manager. It @@ -84,18 +88,23 @@ package main import "container/list" +// WorkQueue definition type WorkQueue struct { - countInProgress chan int - countOutstanding chan int - countQueued chan int - newlist chan *list.List + getStatus chan WorkQueueStatus + newlist chan *list.List // Workers get work items by reading from this channel. NextItem <-chan interface{} - // Each worker must send struct{}{} to ReportDone exactly once + // Each worker must send struct{}{} to DoneItem exactly once // for each work item received from NextItem, when it stops // working on that item (regardless of whether the work was // successful). - ReportDone chan<- struct{} + DoneItem chan<- struct{} +} + +// WorkQueueStatus reflects the queue status. +type WorkQueueStatus struct { + InProgress int + Queued int } // NewWorkQueue returns a new empty WorkQueue. @@ -105,33 +114,34 @@ func NewWorkQueue() *WorkQueue { reportDone := make(chan struct{}) newList := make(chan *list.List) b := WorkQueue{ - countQueued: make(chan int), - countInProgress: make(chan int), - countOutstanding: make(chan int), - newlist: newList, - NextItem: nextItem, - ReportDone: reportDone, + getStatus: make(chan WorkQueueStatus), + newlist: newList, + NextItem: nextItem, + DoneItem: reportDone, } go func() { // Read new work lists from the newlist channel. - // Reply to "length" and "get next item" queries by - // sending to the countQueued and nextItem channels + // Reply to "status" and "get next item" queries by + // sending to the getStatus and nextItem channels // respectively. Return when the newlist channel // closes. todo := &list.List{} - countInProgress := 0 + status := WorkQueueStatus{} // When we're done, close the output channel; workers will // shut down next time they ask for new work. defer close(nextItem) - defer close(b.countInProgress) - defer close(b.countOutstanding) - defer close(b.countQueued) + defer close(b.getStatus) + // nextChan and nextVal are both nil when we have + // nothing to send; otherwise they are, respectively, + // the nextItem channel and the next work item to send + // to it. var nextChan chan interface{} var nextVal interface{} - for newList != nil || countInProgress > 0 { + + for newList != nil || status.InProgress > 0 { select { case p, ok := <-newList: if !ok { @@ -142,7 +152,8 @@ func NewWorkQueue() *WorkQueue { if todo == nil { todo = &list.List{} } - if todo.Len() == 0 { + status.Queued = todo.Len() + if status.Queued == 0 { // Stop sending work nextChan = nil nextVal = nil @@ -151,9 +162,10 @@ func NewWorkQueue() *WorkQueue { nextVal = todo.Front().Value } case nextChan <- nextVal: - countInProgress++ todo.Remove(todo.Front()) - if todo.Len() == 0 { + status.InProgress++ + status.Queued-- + if status.Queued == 0 { // Stop sending work nextChan = nil nextVal = nil @@ -161,10 +173,8 @@ func NewWorkQueue() *WorkQueue { nextVal = todo.Front().Value } case <-reportDone: - countInProgress-- - case b.countInProgress <- countInProgress: - case b.countOutstanding <- todo.Len() + countInProgress: - case b.countQueued <- todo.Len(): + status.InProgress-- + case b.getStatus <- status: } } }() @@ -184,31 +194,19 @@ func (b *WorkQueue) ReplaceQueue(list *list.List) { // abandons any pending requests, but allows any pull request already // in progress to continue. // -// After Close, CountX methods will return correct values, NextItem -// will be closed, and ReplaceQueue will panic. +// After Close, Status will return correct values, NextItem will be +// closed, and ReplaceQueue will panic. // func (b *WorkQueue) Close() { close(b.newlist) } -// CountOutstanding returns the number of items in the queue or in -// progress. A return value of 0 guarantees all existing work (work -// that was sent to ReplaceQueue before CountOutstanding was called) -// has completed. -// -func (b *WorkQueue) CountOutstanding() int { - // If the channel is closed, we get zero, which is correct. - return <-b.countOutstanding -} - -// CountQueued returns the number of items in the current queue. -// -func (b *WorkQueue) CountQueued() int { - return <-b.countQueued -} - -// Len returns the number of items in progress. +// Status returns an up-to-date WorkQueueStatus reflecting the current +// queue status. // -func (b *WorkQueue) CountInProgress() int { - return <-b.countInProgress +func (b *WorkQueue) Status() WorkQueueStatus { + // If the channel is closed, we get the nil value of + // WorkQueueStatus, which is an accurate description of a + // finished queue. + return <-b.getStatus }