Merge branch '16265-security-updates' into dependabot/bundler/apps/workbench/loofah...
[arvados.git] / services / ws / server.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "net"
9         "net/http"
10         "sync"
11         "time"
12
13         "git.arvados.org/arvados.git/sdk/go/arvados"
14         "github.com/coreos/go-systemd/daemon"
15 )
16
17 type server struct {
18         httpServer  *http.Server
19         listener    net.Listener
20         cluster     *arvados.Cluster
21         eventSource *pgEventSource
22         setupOnce   sync.Once
23 }
24
25 func (srv *server) Close() {
26         srv.WaitReady()
27         srv.eventSource.Close()
28         srv.httpServer.Close()
29         srv.listener.Close()
30 }
31
32 func (srv *server) WaitReady() {
33         srv.setupOnce.Do(srv.setup)
34         srv.eventSource.WaitReady()
35 }
36
37 func (srv *server) Run() error {
38         srv.setupOnce.Do(srv.setup)
39         return srv.httpServer.Serve(srv.listener)
40 }
41
42 func (srv *server) setup() {
43         log := logger(nil)
44
45         var listen arvados.URL
46         for listen, _ = range srv.cluster.Services.Websocket.InternalURLs {
47                 break
48         }
49         ln, err := net.Listen("tcp", listen.Host)
50         if err != nil {
51                 log.WithField("Listen", listen).Fatal(err)
52         }
53         log.WithField("Listen", ln.Addr().String()).Info("listening")
54
55         client := arvados.Client{}
56         client.APIHost = srv.cluster.Services.Controller.ExternalURL.Host
57         client.AuthToken = srv.cluster.SystemRootToken
58         client.Insecure = srv.cluster.TLS.Insecure
59
60         srv.listener = ln
61         srv.eventSource = &pgEventSource{
62                 DataSource:   srv.cluster.PostgreSQL.Connection.String(),
63                 MaxOpenConns: srv.cluster.PostgreSQL.ConnectionPool,
64                 QueueSize:    srv.cluster.API.WebsocketServerEventQueue,
65         }
66
67         srv.httpServer = &http.Server{
68                 Addr:           listen.Host,
69                 ReadTimeout:    time.Minute,
70                 WriteTimeout:   time.Minute,
71                 MaxHeaderBytes: 1 << 20,
72                 Handler: &router{
73                         cluster:        srv.cluster,
74                         client:         client,
75                         eventSource:    srv.eventSource,
76                         newPermChecker: func() permChecker { return newPermChecker(client) },
77                 },
78         }
79
80         go func() {
81                 srv.eventSource.Run()
82                 log.Info("event source stopped")
83                 srv.Close()
84         }()
85
86         if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
87                 log.WithError(err).Warn("error notifying init daemon")
88         }
89 }