X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/ca812f58e63bd4673bb62aa8528e07d6020bfc9a..c0f97a0f0617e1b6d2da1ab0e256a226b8ed810a:/services/ws/event_source_test.go diff --git a/services/ws/event_source_test.go b/services/ws/event_source_test.go index ee1da08bf1..172f74ce7a 100644 --- a/services/ws/event_source_test.go +++ b/services/ws/event_source_test.go @@ -1,12 +1,21 @@ -package main +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package ws import ( "database/sql" "fmt" + "io/ioutil" + "sort" "sync" "time" - "git.curoverse.com/arvados.git/sdk/go/config" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/ctxlog" + "github.com/ghodss/yaml" + "github.com/prometheus/client_golang/prometheus" check "gopkg.in/check.v1" ) @@ -14,44 +23,46 @@ var _ = check.Suite(&eventSourceSuite{}) type eventSourceSuite struct{} -func testDBConfig() (pgConfig, error) { - var railsDB struct { - Test struct { - Database string - Username string - Password string - Host string - } - } - err := config.LoadFile(&railsDB, "../api/config/database.yml") +func testDBConfig() arvados.PostgreSQLConnection { + cfg, err := arvados.GetConfig(arvados.DefaultConfigFile) if err != nil { - return nil, err + panic(err) } - cfg := pgConfig{ - "dbname": railsDB.Test.Database, - "host": railsDB.Test.Host, - "password": railsDB.Test.Password, - "user": railsDB.Test.Username, + cc, err := cfg.GetCluster("zzzzz") + if err != nil { + panic(err) } - return cfg, nil + return cc.PostgreSQL.Connection } -func testDB() (*sql.DB, error) { - cfg, err := testDBConfig() +func testDB() *sql.DB { + db, err := sql.Open("postgres", testDBConfig().String()) if err != nil { - return nil, err + panic(err) } - return sql.Open("postgres", cfg.ConnectionString()) + return db } func (*eventSourceSuite) TestEventSource(c *check.C) { - cfg, err := testDBConfig() - if err != nil { - c.Fatal(err) + var logfixtures map[string]struct { + ID int + } + yamldata, err := ioutil.ReadFile("../api/test/fixtures/logs.yml") + c.Assert(err, check.IsNil) + c.Assert(yaml.Unmarshal(yamldata, &logfixtures), check.IsNil) + var logIDs []int + for _, logfixture := range logfixtures { + logIDs = append(logIDs, logfixture.ID) } + sort.Ints(logIDs) + + cfg := testDBConfig() + db := testDB() pges := &pgEventSource{ - DataSource: cfg.ConnectionString(), + DataSource: cfg.String(), QueueSize: 4, + Logger: ctxlog.TestLogger(c), + Reg: prometheus.NewRegistry(), } go pges.Run() sinks := make([]eventSink, 18) @@ -59,21 +70,14 @@ func (*eventSourceSuite) TestEventSource(c *check.C) { sinks[i] = pges.NewSink() } - // wait for listener to start, as evidenced by queue channel - // appearing (relying on internal implementation detail here) - for deadline := time.Now().Add(10 * time.Second); pges.queue == nil; time.Sleep(10 * time.Millisecond) { - c.Assert(time.Now().After(deadline), check.Equals, false) - } + pges.WaitReady() defer pges.cancel() - db, err := testDB() - c.Assert(err, check.IsNil) - done := make(chan bool, 1) go func() { - for i := range sinks { - _, err := db.Exec(fmt.Sprintf(`NOTIFY logs, '%d'`, i)) + for _, id := range logIDs { + _, err := db.Exec(fmt.Sprintf(`NOTIFY logs, '%d'`, id)) if err != nil { done <- true c.Fatal(err) @@ -88,17 +92,13 @@ func (*eventSourceSuite) TestEventSource(c *check.C) { go func(si int, s eventSink) { defer wg.Done() defer sinks[si].Stop() - for i := 0; i <= si; i++ { + for _, logID := range logIDs { ev := <-sinks[si].Channel() - c.Logf("sink %d received event %d", si, i) - c.Check(ev.LogID, check.Equals, uint64(i)) + c.Logf("sink %d received event %d", si, logID) + c.Check(ev.LogID, check.Equals, int64(logID)) row := ev.Detail() - if i == 0 { - // no matching row, null event - c.Check(row, check.IsNil) - } else { - c.Check(row, check.NotNil) - c.Check(row.ID, check.Equals, uint64(i)) + if c.Check(row, check.NotNil) { + c.Check(row.ID, check.Equals, int64(logID)) c.Check(row.UUID, check.Not(check.Equals), "") } } @@ -114,4 +114,6 @@ func (*eventSourceSuite) TestEventSource(c *check.C) { case <-time.After(10 * time.Second): c.Fatal("timed out") } + + c.Check(pges.DBHealth(), check.IsNil) }