X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/4de0af809ffbef43d89cd1751e5d611a4b5445e9..d90fffe9d937d3e05c04106904b18dc4da235bc6:/services/keepstore/work_queue.go diff --git a/services/keepstore/work_queue.go b/services/keepstore/work_queue.go index 6ec52742a5..be3d118ff0 100644 --- a/services/keepstore/work_queue.go +++ b/services/keepstore/work_queue.go @@ -1,4 +1,8 @@ -package main +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package keepstore /* A WorkQueue is an asynchronous thread-safe queue manager. It provides a channel from which items can be read off the queue, and @@ -84,54 +88,59 @@ package main import "container/list" +// WorkQueue definition type WorkQueue struct { - countInProgress chan int - countOutstanding chan int - countQueued chan int - newlist chan *list.List + getStatus chan WorkQueueStatus + newlist chan *list.List // Workers get work items by reading from this channel. NextItem <-chan interface{} - // Each worker must send struct{}{} to ReportDone exactly once + // Each worker must send struct{}{} to DoneItem exactly once // for each work item received from NextItem, when it stops // working on that item (regardless of whether the work was // successful). - ReportDone chan<- struct{} + DoneItem chan<- struct{} +} + +// WorkQueueStatus reflects the queue status. +type WorkQueueStatus struct { + InProgress int + Queued int } // NewWorkQueue returns a new empty WorkQueue. -// func NewWorkQueue() *WorkQueue { nextItem := make(chan interface{}) reportDone := make(chan struct{}) newList := make(chan *list.List) b := WorkQueue{ - countQueued: make(chan int), - countInProgress: make(chan int), - countOutstanding: make(chan int), - newlist: newList, - NextItem: nextItem, - ReportDone: reportDone, + getStatus: make(chan WorkQueueStatus), + newlist: newList, + NextItem: nextItem, + DoneItem: reportDone, } go func() { // Read new work lists from the newlist channel. - // Reply to "length" and "get next item" queries by - // sending to the countQueued and nextItem channels + // Reply to "status" and "get next item" queries by + // sending to the getStatus and nextItem channels // respectively. Return when the newlist channel // closes. todo := &list.List{} - countInProgress := 0 + status := WorkQueueStatus{} // When we're done, close the output channel; workers will // shut down next time they ask for new work. defer close(nextItem) - defer close(b.countInProgress) - defer close(b.countOutstanding) - defer close(b.countQueued) + defer close(b.getStatus) + // nextChan and nextVal are both nil when we have + // nothing to send; otherwise they are, respectively, + // the nextItem channel and the next work item to send + // to it. var nextChan chan interface{} var nextVal interface{} - for newList != nil || countInProgress > 0 { + + for newList != nil || status.InProgress > 0 { select { case p, ok := <-newList: if !ok { @@ -142,7 +151,8 @@ func NewWorkQueue() *WorkQueue { if todo == nil { todo = &list.List{} } - if todo.Len() == 0 { + status.Queued = todo.Len() + if status.Queued == 0 { // Stop sending work nextChan = nil nextVal = nil @@ -151,9 +161,10 @@ func NewWorkQueue() *WorkQueue { nextVal = todo.Front().Value } case nextChan <- nextVal: - countInProgress++ todo.Remove(todo.Front()) - if todo.Len() == 0 { + status.InProgress++ + status.Queued-- + if status.Queued == 0 { // Stop sending work nextChan = nil nextVal = nil @@ -161,10 +172,8 @@ func NewWorkQueue() *WorkQueue { nextVal = todo.Front().Value } case <-reportDone: - countInProgress-- - case b.countInProgress <- countInProgress: - case b.countOutstanding <- todo.Len() + countInProgress: - case b.countQueued <- todo.Len(): + status.InProgress-- + case b.getStatus <- status: } } }() @@ -175,7 +184,6 @@ func NewWorkQueue() *WorkQueue { // and starts giving workers items from the given list. After giving // it to ReplaceQueue, the caller must not read or write the given // list. -// func (b *WorkQueue) ReplaceQueue(list *list.List) { b.newlist <- list } @@ -184,31 +192,17 @@ func (b *WorkQueue) ReplaceQueue(list *list.List) { // abandons any pending requests, but allows any pull request already // in progress to continue. // -// After Close, CountX methods will return correct values, NextItem -// will be closed, and ReplaceQueue will panic. -// +// After Close, Status will return correct values, NextItem will be +// closed, and ReplaceQueue will panic. func (b *WorkQueue) Close() { close(b.newlist) } -// CountOutstanding returns the number of items in the queue or in -// progress. A return value of 0 guarantees all existing work (work -// that was sent to ReplaceQueue before CountOutstanding was called) -// has completed. -// -func (b *WorkQueue) CountOutstanding() int { - // If the channel is closed, we get zero, which is correct. - return <-b.countOutstanding -} - -// CountQueued returns the number of items in the current queue. -// -func (b *WorkQueue) CountQueued() int { - return <-b.countQueued -} - -// Len returns the number of items in progress. -// -func (b *WorkQueue) CountInProgress() int { - return <-b.countInProgress +// Status returns an up-to-date WorkQueueStatus reflecting the current +// queue status. +func (b *WorkQueue) Status() WorkQueueStatus { + // If the channel is closed, we get the nil value of + // WorkQueueStatus, which is an accurate description of a + // finished queue. + return <-b.getStatus }