21535: removed .only and added more assertions Arvados-DCO-1.1-Signed-off-by: Lisa...
[arvados.git] / services / ws / service.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package ws
6
7 import (
8         "context"
9         "fmt"
10         "time"
11
12         "git.arvados.org/arvados.git/lib/cmd"
13         "git.arvados.org/arvados.git/lib/service"
14         "git.arvados.org/arvados.git/sdk/go/arvados"
15         "git.arvados.org/arvados.git/sdk/go/ctxlog"
16         "github.com/prometheus/client_golang/prometheus"
17 )
18
19 var testMode = false
20
21 var Command cmd.Handler = service.Command(arvados.ServiceNameWebsocket, newHandler)
22
23 func newHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler {
24         client, err := arvados.NewClientFromConfig(cluster)
25         if err != nil {
26                 return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
27         }
28         client.Timeout = time.Minute
29         eventSource := &pgEventSource{
30                 DataSource:   cluster.PostgreSQL.Connection.String(),
31                 MaxOpenConns: cluster.PostgreSQL.ConnectionPool,
32                 QueueSize:    cluster.API.WebsocketServerEventQueue,
33                 Logger:       ctxlog.FromContext(ctx),
34                 Reg:          reg,
35         }
36         done := make(chan struct{})
37         go func() {
38                 eventSource.Run()
39                 ctxlog.FromContext(ctx).Error("event source stopped")
40                 close(done)
41         }()
42         eventSource.WaitReady()
43         if err := eventSource.DBHealth(); err != nil {
44                 return service.ErrorHandler(ctx, cluster, err)
45         }
46         rtr := &router{
47                 cluster:        cluster,
48                 client:         client,
49                 eventSource:    eventSource,
50                 newPermChecker: func() permChecker { return newPermChecker(client) },
51                 done:           done,
52                 reg:            reg,
53         }
54         return rtr
55 }