15944: Merge branch 'master' into 15944-arvbox-publicdev-fix
[arvados.git] / services / ws / server.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "net"
9         "net/http"
10         "sync"
11         "time"
12
13         "git.arvados.org/arvados.git/sdk/go/arvados"
14         "github.com/coreos/go-systemd/daemon"
15 )
16
17 type server struct {
18         httpServer  *http.Server
19         listener    net.Listener
20         cluster     *arvados.Cluster
21         eventSource *pgEventSource
22         setupOnce   sync.Once
23 }
24
25 func (srv *server) Close() {
26         srv.WaitReady()
27         srv.eventSource.Close()
28         srv.listener.Close()
29 }
30
31 func (srv *server) WaitReady() {
32         srv.setupOnce.Do(srv.setup)
33         srv.eventSource.WaitReady()
34 }
35
36 func (srv *server) Run() error {
37         srv.setupOnce.Do(srv.setup)
38         return srv.httpServer.Serve(srv.listener)
39 }
40
41 func (srv *server) setup() {
42         log := logger(nil)
43
44         var listen arvados.URL
45         for listen, _ = range srv.cluster.Services.Websocket.InternalURLs {
46                 break
47         }
48         ln, err := net.Listen("tcp", listen.Host)
49         if err != nil {
50                 log.WithField("Listen", listen).Fatal(err)
51         }
52         log.WithField("Listen", ln.Addr().String()).Info("listening")
53
54         client := arvados.Client{}
55         client.APIHost = srv.cluster.Services.Controller.ExternalURL.Host
56         client.AuthToken = srv.cluster.SystemRootToken
57         client.Insecure = srv.cluster.TLS.Insecure
58
59         srv.listener = ln
60         srv.eventSource = &pgEventSource{
61                 DataSource:   srv.cluster.PostgreSQL.Connection.String(),
62                 MaxOpenConns: srv.cluster.PostgreSQL.ConnectionPool,
63                 QueueSize:    srv.cluster.API.WebsocketServerEventQueue,
64         }
65
66         srv.httpServer = &http.Server{
67                 Addr:           listen.Host,
68                 ReadTimeout:    time.Minute,
69                 WriteTimeout:   time.Minute,
70                 MaxHeaderBytes: 1 << 20,
71                 Handler: &router{
72                         cluster:        srv.cluster,
73                         client:         client,
74                         eventSource:    srv.eventSource,
75                         newPermChecker: func() permChecker { return newPermChecker(client) },
76                 },
77         }
78
79         go func() {
80                 srv.eventSource.Run()
81                 log.Info("event source stopped")
82                 srv.Close()
83         }()
84
85         if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
86                 log.WithError(err).Warn("error notifying init daemon")
87         }
88 }