Merge branch '22130-all-processes-paging' refs #22130
[arvados.git] / services / ws / event.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package ws
6
7 import (
8         "database/sql"
9         "sync"
10         "time"
11
12         "git.arvados.org/arvados.git/sdk/go/arvados"
13         "github.com/ghodss/yaml"
14         "github.com/sirupsen/logrus"
15 )
16
17 type eventSink interface {
18         Channel() <-chan *event
19         Stop()
20 }
21
22 type eventSource interface {
23         NewSink() eventSink
24         DB() *sql.DB
25         DBHealth() error
26 }
27
28 type event struct {
29         LogID    int64
30         Received time.Time
31         Ready    time.Time
32         Serial   uint64
33
34         db     *sql.DB
35         logger logrus.FieldLogger
36         logRow *arvados.Log
37         err    error
38         mtx    sync.Mutex
39 }
40
41 // Detail returns the database row corresponding to the event. It can
42 // be called safely from multiple goroutines. Only one attempt will be
43 // made. If the database row cannot be retrieved, Detail returns nil.
44 func (e *event) Detail() *arvados.Log {
45         e.mtx.Lock()
46         defer e.mtx.Unlock()
47         if e.logRow != nil || e.err != nil {
48                 return e.logRow
49         }
50         var logRow arvados.Log
51         var propYAML []byte
52         e.err = e.db.QueryRow(`SELECT id, uuid, object_uuid, COALESCE(object_owner_uuid,''), COALESCE(event_type,''), event_at, created_at, properties FROM logs WHERE id = $1`, e.LogID).Scan(
53                 &logRow.ID,
54                 &logRow.UUID,
55                 &logRow.ObjectUUID,
56                 &logRow.ObjectOwnerUUID,
57                 &logRow.EventType,
58                 &logRow.EventAt,
59                 &logRow.CreatedAt,
60                 &propYAML)
61         if e.err != nil {
62                 e.logger.WithField("LogID", e.LogID).WithError(e.err).Error("QueryRow failed")
63                 return nil
64         }
65         e.err = yaml.Unmarshal(propYAML, &logRow.Properties)
66         if e.err != nil {
67                 e.logger.WithField("LogID", e.LogID).WithError(e.err).Error("yaml decode failed")
68                 return nil
69         }
70         e.logRow = &logRow
71         return e.logRow
72 }