2960: Refactor keepstore into a streaming server.
[arvados.git] / services / keepstore / work_queue_test.go
diff --git a/services/keepstore/work_queue_test.go b/services/keepstore/work_queue_test.go
deleted file mode 100644 (file)
index 254f96c..0000000
+++ /dev/null
@@ -1,244 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package keepstore
-
-import (
-       "container/list"
-       "runtime"
-       "testing"
-       "time"
-)
-
-type fatalfer interface {
-       Fatalf(string, ...interface{})
-}
-
-func makeTestWorkList(ary []interface{}) *list.List {
-       l := list.New()
-       for _, n := range ary {
-               l.PushBack(n)
-       }
-       return l
-}
-
-func expectChannelEmpty(t fatalfer, c <-chan interface{}) {
-       select {
-       case item, ok := <-c:
-               if ok {
-                       t.Fatalf("Received value (%+v) from channel that we expected to be empty", item)
-               }
-       default:
-       }
-}
-
-func expectChannelNotEmpty(t fatalfer, c <-chan interface{}) interface{} {
-       select {
-       case item, ok := <-c:
-               if !ok {
-                       t.Fatalf("expected data on a closed channel")
-               }
-               return item
-       case <-time.After(time.Second):
-               t.Fatalf("expected data on an empty channel")
-               return nil
-       }
-}
-
-func expectChannelClosedWithin(t fatalfer, timeout time.Duration, c <-chan interface{}) {
-       select {
-       case received, ok := <-c:
-               if ok {
-                       t.Fatalf("Expected channel to be closed, but received %+v instead", received)
-               }
-       case <-time.After(timeout):
-               t.Fatalf("Expected channel to be closed, but it is still open after %v", timeout)
-       }
-}
-
-func doWorkItems(t fatalfer, q *WorkQueue, expected []interface{}) {
-       for i := range expected {
-               actual, ok := <-q.NextItem
-               if !ok {
-                       t.Fatalf("Expected %+v but channel was closed after receiving %+v as expected.", expected, expected[:i])
-               }
-               q.DoneItem <- struct{}{}
-               if actual.(int) != expected[i] {
-                       t.Fatalf("Expected %+v but received %+v after receiving %+v as expected.", expected[i], actual, expected[:i])
-               }
-       }
-}
-
-func expectEqualWithin(t fatalfer, timeout time.Duration, expect interface{}, f func() interface{}) {
-       ok := make(chan struct{})
-       giveup := false
-       go func() {
-               for f() != expect && !giveup {
-                       time.Sleep(time.Millisecond)
-               }
-               close(ok)
-       }()
-       select {
-       case <-ok:
-       case <-time.After(timeout):
-               giveup = true
-               _, file, line, _ := runtime.Caller(1)
-               t.Fatalf("Still getting %+v, timed out waiting for %+v\n%s:%d", f(), expect, file, line)
-       }
-}
-
-func expectQueued(t fatalfer, b *WorkQueue, expectQueued int) {
-       if l := b.Status().Queued; l != expectQueued {
-               t.Fatalf("Got Queued==%d, expected %d", l, expectQueued)
-       }
-}
-
-func TestWorkQueueDoneness(t *testing.T) {
-       b := NewWorkQueue()
-       defer b.Close()
-       b.ReplaceQueue(makeTestWorkList([]interface{}{1, 2, 3}))
-       expectQueued(t, b, 3)
-       gate := make(chan struct{})
-       go func() {
-               <-gate
-               for range b.NextItem {
-                       <-gate
-                       time.Sleep(time.Millisecond)
-                       b.DoneItem <- struct{}{}
-               }
-       }()
-       expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
-       b.ReplaceQueue(makeTestWorkList([]interface{}{4, 5, 6}))
-       for i := 1; i <= 3; i++ {
-               gate <- struct{}{}
-               expectEqualWithin(t, time.Second, 3-i, func() interface{} { return b.Status().Queued })
-               expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress })
-       }
-       close(gate)
-       expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
-       expectChannelEmpty(t, b.NextItem)
-}
-
-// Create a WorkQueue, generate a list for it, and instantiate a worker.
-func TestWorkQueueReadWrite(t *testing.T) {
-       var input = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34}
-
-       b := NewWorkQueue()
-       expectQueued(t, b, 0)
-
-       b.ReplaceQueue(makeTestWorkList(input))
-       expectQueued(t, b, len(input))
-
-       doWorkItems(t, b, input)
-       expectChannelEmpty(t, b.NextItem)
-       b.Close()
-}
-
-// Start a worker before the list has any input.
-func TestWorkQueueEarlyRead(t *testing.T) {
-       var input = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34}
-
-       b := NewWorkQueue()
-       defer b.Close()
-
-       // First, demonstrate that nothing is available on the NextItem
-       // channel.
-       expectChannelEmpty(t, b.NextItem)
-
-       // Start a reader in a goroutine. The reader will block until the
-       // block work list has been initialized.
-       //
-       done := make(chan int)
-       go func() {
-               doWorkItems(t, b, input)
-               done <- 1
-       }()
-
-       // Feed the blocklist a new worklist, and wait for the worker to
-       // finish.
-       b.ReplaceQueue(makeTestWorkList(input))
-       <-done
-       expectQueued(t, b, 0)
-}
-
-// After Close(), NextItem closes, work finishes, then stats return zero.
-func TestWorkQueueClose(t *testing.T) {
-       b := NewWorkQueue()
-       input := []interface{}{1, 2, 3, 4, 5, 6, 7, 8}
-       mark := make(chan struct{})
-       go func() {
-               <-b.NextItem
-               mark <- struct{}{}
-               <-mark
-               b.DoneItem <- struct{}{}
-       }()
-       b.ReplaceQueue(makeTestWorkList(input))
-       // Wait for worker to take item 1
-       <-mark
-       b.Close()
-       expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress })
-       // Tell worker to report done
-       mark <- struct{}{}
-       expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
-       expectChannelClosedWithin(t, time.Second, b.NextItem)
-}
-
-// Show that a reader may block when the manager's list is exhausted,
-// and that the reader resumes automatically when new data is
-// available.
-func TestWorkQueueReaderBlocks(t *testing.T) {
-       var (
-               inputBeforeBlock = []interface{}{1, 2, 3, 4, 5}
-               inputAfterBlock  = []interface{}{6, 7, 8, 9, 10}
-       )
-
-       b := NewWorkQueue()
-       defer b.Close()
-       sendmore := make(chan int)
-       done := make(chan int)
-       go func() {
-               doWorkItems(t, b, inputBeforeBlock)
-
-               // Confirm that the channel is empty, so a subsequent read
-               // on it will block.
-               expectChannelEmpty(t, b.NextItem)
-
-               // Signal that we're ready for more input.
-               sendmore <- 1
-               doWorkItems(t, b, inputAfterBlock)
-               done <- 1
-       }()
-
-       // Write a slice of the first five elements and wait for the
-       // reader to signal that it's ready for us to send more input.
-       b.ReplaceQueue(makeTestWorkList(inputBeforeBlock))
-       <-sendmore
-
-       b.ReplaceQueue(makeTestWorkList(inputAfterBlock))
-
-       // Wait for the reader to complete.
-       <-done
-}
-
-// Replace one active work list with another.
-func TestWorkQueueReplaceQueue(t *testing.T) {
-       var firstInput = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34}
-       var replaceInput = []interface{}{1, 4, 9, 16, 25, 36, 49, 64, 81}
-
-       b := NewWorkQueue()
-       b.ReplaceQueue(makeTestWorkList(firstInput))
-
-       // Read just the first five elements from the work list.
-       // Confirm that the channel is not empty.
-       doWorkItems(t, b, firstInput[0:5])
-       expectChannelNotEmpty(t, b.NextItem)
-
-       // Replace the work list and read five more elements.
-       // The old list should have been discarded and all new
-       // elements come from the new list.
-       b.ReplaceQueue(makeTestWorkList(replaceInput))
-       doWorkItems(t, b, replaceInput[0:5])
-
-       b.Close()
-}