Refactor the multi-host salt install page.
[arvados.git] / services / keepstore / work_queue_test.go
index 144e4c252be9ba1a069d8cf862f72c4284a28479..254f96cb2d853293f8e42f24f0177f55e37652fa 100644 (file)
@@ -1,11 +1,21 @@
-package main
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package keepstore
 
 import (
        "container/list"
+       "runtime"
        "testing"
+       "time"
 )
 
-func makeTestWorkList(ary []int) *list.List {
+type fatalfer interface {
+       Fatalf(string, ...interface{})
+}
+
+func makeTestWorkList(ary []interface{}) *list.List {
        l := list.New()
        for _, n := range ary {
                l.PushBack(n)
@@ -13,59 +23,124 @@ func makeTestWorkList(ary []int) *list.List {
        return l
 }
 
-func expectChannelEmpty(t *testing.T, c <-chan interface{}) {
+func expectChannelEmpty(t fatalfer, c <-chan interface{}) {
        select {
-       case item := <-c:
-               t.Fatalf("Received value (%v) from channel that we expected to be empty", item)
+       case item, ok := <-c:
+               if ok {
+                       t.Fatalf("Received value (%+v) from channel that we expected to be empty", item)
+               }
        default:
-               // no-op
        }
 }
 
-func expectChannelNotEmpty(t *testing.T, c <-chan interface{}) {
-       if item, ok := <-c; !ok {
-               t.Fatal("expected data on a closed channel")
-       } else if item == nil {
-               t.Fatal("expected data on an empty channel")
+func expectChannelNotEmpty(t fatalfer, c <-chan interface{}) interface{} {
+       select {
+       case item, ok := <-c:
+               if !ok {
+                       t.Fatalf("expected data on a closed channel")
+               }
+               return item
+       case <-time.After(time.Second):
+               t.Fatalf("expected data on an empty channel")
+               return nil
        }
 }
 
-func expectChannelClosed(t *testing.T, c <-chan interface{}) {
-       received, ok := <-c
-       if ok {
-               t.Fatalf("Expected channel to be closed, but received %v instead", received)
+func expectChannelClosedWithin(t fatalfer, timeout time.Duration, c <-chan interface{}) {
+       select {
+       case received, ok := <-c:
+               if ok {
+                       t.Fatalf("Expected channel to be closed, but received %+v instead", received)
+               }
+       case <-time.After(timeout):
+               t.Fatalf("Expected channel to be closed, but it is still open after %v", timeout)
        }
 }
 
-func expectFromChannel(t *testing.T, c <-chan interface{}, expected []int) {
+func doWorkItems(t fatalfer, q *WorkQueue, expected []interface{}) {
        for i := range expected {
-               actual, ok := <-c
-               t.Logf("received %v", actual)
+               actual, ok := <-q.NextItem
                if !ok {
-                       t.Fatalf("Expected %v but channel was closed after receiving the first %d elements correctly.", expected, i)
-               } else if actual.(int) != expected[i] {
-                       t.Fatalf("Expected %v but received '%v' after receiving the first %d elements correctly.", expected[i], actual, i)
+                       t.Fatalf("Expected %+v but channel was closed after receiving %+v as expected.", expected, expected[:i])
+               }
+               q.DoneItem <- struct{}{}
+               if actual.(int) != expected[i] {
+                       t.Fatalf("Expected %+v but received %+v after receiving %+v as expected.", expected[i], actual, expected[:i])
                }
        }
 }
 
+func expectEqualWithin(t fatalfer, timeout time.Duration, expect interface{}, f func() interface{}) {
+       ok := make(chan struct{})
+       giveup := false
+       go func() {
+               for f() != expect && !giveup {
+                       time.Sleep(time.Millisecond)
+               }
+               close(ok)
+       }()
+       select {
+       case <-ok:
+       case <-time.After(timeout):
+               giveup = true
+               _, file, line, _ := runtime.Caller(1)
+               t.Fatalf("Still getting %+v, timed out waiting for %+v\n%s:%d", f(), expect, file, line)
+       }
+}
+
+func expectQueued(t fatalfer, b *WorkQueue, expectQueued int) {
+       if l := b.Status().Queued; l != expectQueued {
+               t.Fatalf("Got Queued==%d, expected %d", l, expectQueued)
+       }
+}
+
+func TestWorkQueueDoneness(t *testing.T) {
+       b := NewWorkQueue()
+       defer b.Close()
+       b.ReplaceQueue(makeTestWorkList([]interface{}{1, 2, 3}))
+       expectQueued(t, b, 3)
+       gate := make(chan struct{})
+       go func() {
+               <-gate
+               for range b.NextItem {
+                       <-gate
+                       time.Sleep(time.Millisecond)
+                       b.DoneItem <- struct{}{}
+               }
+       }()
+       expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
+       b.ReplaceQueue(makeTestWorkList([]interface{}{4, 5, 6}))
+       for i := 1; i <= 3; i++ {
+               gate <- struct{}{}
+               expectEqualWithin(t, time.Second, 3-i, func() interface{} { return b.Status().Queued })
+               expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress })
+       }
+       close(gate)
+       expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
+       expectChannelEmpty(t, b.NextItem)
+}
+
 // Create a WorkQueue, generate a list for it, and instantiate a worker.
 func TestWorkQueueReadWrite(t *testing.T) {
-       var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
+       var input = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34}
 
        b := NewWorkQueue()
+       expectQueued(t, b, 0)
+
        b.ReplaceQueue(makeTestWorkList(input))
+       expectQueued(t, b, len(input))
 
-       expectFromChannel(t, b.NextItem, input)
+       doWorkItems(t, b, input)
        expectChannelEmpty(t, b.NextItem)
        b.Close()
 }
 
 // Start a worker before the list has any input.
 func TestWorkQueueEarlyRead(t *testing.T) {
-       var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
+       var input = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34}
 
        b := NewWorkQueue()
+       defer b.Close()
 
        // First, demonstrate that nothing is available on the NextItem
        // channel.
@@ -76,8 +151,7 @@ func TestWorkQueueEarlyRead(t *testing.T) {
        //
        done := make(chan int)
        go func() {
-               expectFromChannel(t, b.NextItem, input)
-               b.Close()
+               doWorkItems(t, b, input)
                done <- 1
        }()
 
@@ -85,8 +159,29 @@ func TestWorkQueueEarlyRead(t *testing.T) {
        // finish.
        b.ReplaceQueue(makeTestWorkList(input))
        <-done
+       expectQueued(t, b, 0)
+}
 
-       expectChannelClosed(t, b.NextItem)
+// After Close(), NextItem closes, work finishes, then stats return zero.
+func TestWorkQueueClose(t *testing.T) {
+       b := NewWorkQueue()
+       input := []interface{}{1, 2, 3, 4, 5, 6, 7, 8}
+       mark := make(chan struct{})
+       go func() {
+               <-b.NextItem
+               mark <- struct{}{}
+               <-mark
+               b.DoneItem <- struct{}{}
+       }()
+       b.ReplaceQueue(makeTestWorkList(input))
+       // Wait for worker to take item 1
+       <-mark
+       b.Close()
+       expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress })
+       // Tell worker to report done
+       mark <- struct{}{}
+       expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
+       expectChannelClosedWithin(t, time.Second, b.NextItem)
 }
 
 // Show that a reader may block when the manager's list is exhausted,
@@ -94,15 +189,16 @@ func TestWorkQueueEarlyRead(t *testing.T) {
 // available.
 func TestWorkQueueReaderBlocks(t *testing.T) {
        var (
-               inputBeforeBlock = []int{1, 2, 3, 4, 5}
-               inputAfterBlock  = []int{6, 7, 8, 9, 10}
+               inputBeforeBlock = []interface{}{1, 2, 3, 4, 5}
+               inputAfterBlock  = []interface{}{6, 7, 8, 9, 10}
        )
 
        b := NewWorkQueue()
+       defer b.Close()
        sendmore := make(chan int)
        done := make(chan int)
        go func() {
-               expectFromChannel(t, b.NextItem, inputBeforeBlock)
+               doWorkItems(t, b, inputBeforeBlock)
 
                // Confirm that the channel is empty, so a subsequent read
                // on it will block.
@@ -110,8 +206,7 @@ func TestWorkQueueReaderBlocks(t *testing.T) {
 
                // Signal that we're ready for more input.
                sendmore <- 1
-               expectFromChannel(t, b.NextItem, inputAfterBlock)
-               b.Close()
+               doWorkItems(t, b, inputAfterBlock)
                done <- 1
        }()
 
@@ -128,22 +223,22 @@ func TestWorkQueueReaderBlocks(t *testing.T) {
 
 // Replace one active work list with another.
 func TestWorkQueueReplaceQueue(t *testing.T) {
-       var firstInput = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
-       var replaceInput = []int{1, 4, 9, 16, 25, 36, 49, 64, 81}
+       var firstInput = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34}
+       var replaceInput = []interface{}{1, 4, 9, 16, 25, 36, 49, 64, 81}
 
        b := NewWorkQueue()
        b.ReplaceQueue(makeTestWorkList(firstInput))
 
        // Read just the first five elements from the work list.
        // Confirm that the channel is not empty.
-       expectFromChannel(t, b.NextItem, firstInput[0:5])
+       doWorkItems(t, b, firstInput[0:5])
        expectChannelNotEmpty(t, b.NextItem)
 
        // Replace the work list and read five more elements.
        // The old list should have been discarded and all new
        // elements come from the new list.
        b.ReplaceQueue(makeTestWorkList(replaceInput))
-       expectFromChannel(t, b.NextItem, replaceInput[0:5])
+       doWorkItems(t, b, replaceInput[0:5])
 
        b.Close()
 }