-package main
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package keepstore
/* A WorkQueue is an asynchronous thread-safe queue manager. It
provides a channel from which items can be read off the queue, and
import "container/list"
+// WorkQueue definition
type WorkQueue struct {
getStatus chan WorkQueueStatus
newlist chan *list.List
// Workers get work items by reading from this channel.
NextItem <-chan interface{}
- // Each worker must send struct{}{} to ReportDone exactly once
+ // Each worker must send struct{}{} to DoneItem exactly once
// for each work item received from NextItem, when it stops
// working on that item (regardless of whether the work was
// successful).
- ReportDone chan<- struct{}
+ DoneItem chan<- struct{}
}
+// WorkQueueStatus reflects the queue status.
type WorkQueueStatus struct {
InProgress int
Queued int
}
// NewWorkQueue returns a new empty WorkQueue.
-//
func NewWorkQueue() *WorkQueue {
nextItem := make(chan interface{})
reportDone := make(chan struct{})
newList := make(chan *list.List)
b := WorkQueue{
- getStatus: make(chan WorkQueueStatus),
- newlist: newList,
- NextItem: nextItem,
- ReportDone: reportDone,
+ getStatus: make(chan WorkQueueStatus),
+ newlist: newList,
+ NextItem: nextItem,
+ DoneItem: reportDone,
}
go func() {
// Read new work lists from the newlist channel.
// and starts giving workers items from the given list. After giving
// it to ReplaceQueue, the caller must not read or write the given
// list.
-//
func (b *WorkQueue) ReplaceQueue(list *list.List) {
b.newlist <- list
}
//
// After Close, Status will return correct values, NextItem will be
// closed, and ReplaceQueue will panic.
-//
func (b *WorkQueue) Close() {
close(b.newlist)
}
// Status returns an up-to-date WorkQueueStatus reflecting the current
// queue status.
-//
func (b *WorkQueue) Status() WorkQueueStatus {
// If the channel is closed, we get the nil value of
// WorkQueueStatus, which is an accurate description of a