import (
"database/sql"
"fmt"
+ "io/ioutil"
+ "sort"
"sync"
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "github.com/ghodss/yaml"
"github.com/prometheus/client_golang/prometheus"
check "gopkg.in/check.v1"
)
}
func (*eventSourceSuite) TestEventSource(c *check.C) {
+ var logfixtures map[string]struct {
+ ID int
+ }
+ yamldata, err := ioutil.ReadFile("../api/test/fixtures/logs.yml")
+ c.Assert(err, check.IsNil)
+ c.Assert(yaml.Unmarshal(yamldata, &logfixtures), check.IsNil)
+ var logIDs []int
+ for _, logfixture := range logfixtures {
+ logIDs = append(logIDs, logfixture.ID)
+ }
+ sort.Ints(logIDs)
+
cfg := testDBConfig()
db := testDB()
pges := &pgEventSource{
done := make(chan bool, 1)
go func() {
- for i := range sinks {
- _, err := db.Exec(fmt.Sprintf(`NOTIFY logs, '%d'`, i))
+ for _, id := range logIDs {
+ _, err := db.Exec(fmt.Sprintf(`NOTIFY logs, '%d'`, id))
if err != nil {
done <- true
c.Fatal(err)
go func(si int, s eventSink) {
defer wg.Done()
defer sinks[si].Stop()
- for i := 0; i <= si; i++ {
+ for _, logID := range logIDs {
ev := <-sinks[si].Channel()
- c.Logf("sink %d received event %d", si, i)
- c.Check(ev.LogID, check.Equals, uint64(i))
+ c.Logf("sink %d received event %d", si, logID)
+ c.Check(ev.LogID, check.Equals, int64(logID))
row := ev.Detail()
- if i == 0 {
- // no matching row, null event
- c.Check(row, check.IsNil)
- } else {
- c.Check(row, check.NotNil)
- c.Check(row.ID, check.Equals, uint64(i))
+ if c.Check(row, check.NotNil) {
+ c.Check(row.ID, check.Equals, int64(logID))
c.Check(row.UUID, check.Not(check.Equals), "")
}
}