Merge branch '21535-multi-wf-delete'
[arvados.git] / services / ws / event_source_test.go
index ac5d130d61bdd85dfc568bf91c37b983994ae40c..172f74ce7abea6bda96b1250e040ec01b90f0a55 100644 (file)
@@ -2,17 +2,20 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package ws
 
 import (
        "database/sql"
        "fmt"
-       "os"
-       "path/filepath"
+       "io/ioutil"
+       "sort"
        "sync"
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "github.com/ghodss/yaml"
+       "github.com/prometheus/client_golang/prometheus"
        check "gopkg.in/check.v1"
 )
 
@@ -21,7 +24,7 @@ var _ = check.Suite(&eventSourceSuite{})
 type eventSourceSuite struct{}
 
 func testDBConfig() arvados.PostgreSQLConnection {
-       cfg, err := arvados.GetConfig(filepath.Join(os.Getenv("WORKSPACE"), "tmp", "arvados.yml"))
+       cfg, err := arvados.GetConfig(arvados.DefaultConfigFile)
        if err != nil {
                panic(err)
        }
@@ -41,11 +44,25 @@ func testDB() *sql.DB {
 }
 
 func (*eventSourceSuite) TestEventSource(c *check.C) {
+       var logfixtures map[string]struct {
+               ID int
+       }
+       yamldata, err := ioutil.ReadFile("../api/test/fixtures/logs.yml")
+       c.Assert(err, check.IsNil)
+       c.Assert(yaml.Unmarshal(yamldata, &logfixtures), check.IsNil)
+       var logIDs []int
+       for _, logfixture := range logfixtures {
+               logIDs = append(logIDs, logfixture.ID)
+       }
+       sort.Ints(logIDs)
+
        cfg := testDBConfig()
        db := testDB()
        pges := &pgEventSource{
                DataSource: cfg.String(),
                QueueSize:  4,
+               Logger:     ctxlog.TestLogger(c),
+               Reg:        prometheus.NewRegistry(),
        }
        go pges.Run()
        sinks := make([]eventSink, 18)
@@ -59,8 +76,8 @@ func (*eventSourceSuite) TestEventSource(c *check.C) {
        done := make(chan bool, 1)
 
        go func() {
-               for i := range sinks {
-                       _, err := db.Exec(fmt.Sprintf(`NOTIFY logs, '%d'`, i))
+               for _, id := range logIDs {
+                       _, err := db.Exec(fmt.Sprintf(`NOTIFY logs, '%d'`, id))
                        if err != nil {
                                done <- true
                                c.Fatal(err)
@@ -75,17 +92,13 @@ func (*eventSourceSuite) TestEventSource(c *check.C) {
                go func(si int, s eventSink) {
                        defer wg.Done()
                        defer sinks[si].Stop()
-                       for i := 0; i <= si; i++ {
+                       for _, logID := range logIDs {
                                ev := <-sinks[si].Channel()
-                               c.Logf("sink %d received event %d", si, i)
-                               c.Check(ev.LogID, check.Equals, uint64(i))
+                               c.Logf("sink %d received event %d", si, logID)
+                               c.Check(ev.LogID, check.Equals, int64(logID))
                                row := ev.Detail()
-                               if i == 0 {
-                                       // no matching row, null event
-                                       c.Check(row, check.IsNil)
-                               } else {
-                                       c.Check(row, check.NotNil)
-                                       c.Check(row.ID, check.Equals, uint64(i))
+                               if c.Check(row, check.NotNil) {
+                                       c.Check(row.ID, check.Equals, int64(logID))
                                        c.Check(row.UUID, check.Not(check.Equals), "")
                                }
                        }