X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/964ab3dd90ff1508efc0c77378cde2b3a4da1029..8a27fe370239ecb8e50d53f46b45ed61203a35ca:/services/keepstore/work_queue.go diff --git a/services/keepstore/work_queue.go b/services/keepstore/work_queue.go index 9509cacd77..4c46ec8e65 100644 --- a/services/keepstore/work_queue.go +++ b/services/keepstore/work_queue.go @@ -1,4 +1,8 @@ -package main +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package keepstore /* A WorkQueue is an asynchronous thread-safe queue manager. It provides a channel from which items can be read off the queue, and @@ -84,77 +88,125 @@ package main import "container/list" +// WorkQueue definition type WorkQueue struct { - newlist chan *list.List - NextItem chan interface{} + getStatus chan WorkQueueStatus + newlist chan *list.List + // Workers get work items by reading from this channel. + NextItem <-chan interface{} + // Each worker must send struct{}{} to DoneItem exactly once + // for each work item received from NextItem, when it stops + // working on that item (regardless of whether the work was + // successful). + DoneItem chan<- struct{} +} + +// WorkQueueStatus reflects the queue status. +type WorkQueueStatus struct { + InProgress int + Queued int } -// NewWorkQueue returns a new worklist, and launches a listener -// goroutine that waits for work and farms it out to workers. +// NewWorkQueue returns a new empty WorkQueue. // func NewWorkQueue() *WorkQueue { + nextItem := make(chan interface{}) + reportDone := make(chan struct{}) + newList := make(chan *list.List) b := WorkQueue{ - newlist: make(chan *list.List), - NextItem: make(chan interface{}), + getStatus: make(chan WorkQueueStatus), + newlist: newList, + NextItem: nextItem, + DoneItem: reportDone, } - go b.listen() + go func() { + // Read new work lists from the newlist channel. + // Reply to "status" and "get next item" queries by + // sending to the getStatus and nextItem channels + // respectively. Return when the newlist channel + // closes. + + todo := &list.List{} + status := WorkQueueStatus{} + + // When we're done, close the output channel; workers will + // shut down next time they ask for new work. + defer close(nextItem) + defer close(b.getStatus) + + // nextChan and nextVal are both nil when we have + // nothing to send; otherwise they are, respectively, + // the nextItem channel and the next work item to send + // to it. + var nextChan chan interface{} + var nextVal interface{} + + for newList != nil || status.InProgress > 0 { + select { + case p, ok := <-newList: + if !ok { + // Closed, stop receiving + newList = nil + } + todo = p + if todo == nil { + todo = &list.List{} + } + status.Queued = todo.Len() + if status.Queued == 0 { + // Stop sending work + nextChan = nil + nextVal = nil + } else { + nextChan = nextItem + nextVal = todo.Front().Value + } + case nextChan <- nextVal: + todo.Remove(todo.Front()) + status.InProgress++ + status.Queued-- + if status.Queued == 0 { + // Stop sending work + nextChan = nil + nextVal = nil + } else { + nextVal = todo.Front().Value + } + case <-reportDone: + status.InProgress-- + case b.getStatus <- status: + } + } + }() return &b } -// ReplaceQueue sends a new list of pull requests to the manager goroutine. -// The manager will discard any outstanding pull list and begin -// working on the new list. +// ReplaceQueue abandons any work items left in the existing queue, +// and starts giving workers items from the given list. After giving +// it to ReplaceQueue, the caller must not read or write the given +// list. // func (b *WorkQueue) ReplaceQueue(list *list.List) { b.newlist <- list } // Close shuts down the manager and terminates the goroutine, which -// completes any pull request in progress and abandons any pending -// requests. +// abandons any pending requests, but allows any pull request already +// in progress to continue. +// +// After Close, Status will return correct values, NextItem will be +// closed, and ReplaceQueue will panic. // func (b *WorkQueue) Close() { close(b.newlist) } -// listen is run in a goroutine. It reads new pull lists from its -// input queue until the queue is closed. -// listen takes ownership of the list that is passed to it. +// Status returns an up-to-date WorkQueueStatus reflecting the current +// queue status. // -// Note that the routine does not ever need to access the list -// itself once the current_item has been initialized, so we do -// not bother to keep a pointer to the list. Because it is a -// doubly linked list, holding on to the current item will keep -// it from garbage collection. -// -func (b *WorkQueue) listen() { - var current_item *list.Element - - // When we're done, close the output channel to shut down any - // workers. - defer close(b.NextItem) - - for { - // If the current list is empty, wait for a new list before - // even checking if workers are ready. - if current_item == nil { - if p, ok := <-b.newlist; ok { - current_item = p.Front() - } else { - // The channel was closed; shut down. - return - } - } - select { - case p, ok := <-b.newlist: - if ok { - current_item = p.Front() - } else { - // The input channel is closed; time to shut down - return - } - case b.NextItem <- current_item.Value: - current_item = current_item.Next() - } - } +func (b *WorkQueue) Status() WorkQueueStatus { + // If the channel is closed, we get the nil value of + // WorkQueueStatus, which is an accurate description of a + // finished queue. + return <-b.getStatus }