17840: Merge branch 'main'
[arvados.git] / lib / boot / supervisor.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package boot
6
7 import (
8         "bytes"
9         "context"
10         "crypto/rand"
11         "encoding/json"
12         "errors"
13         "fmt"
14         "io"
15         "io/ioutil"
16         "net"
17         "net/url"
18         "os"
19         "os/exec"
20         "os/signal"
21         "os/user"
22         "path/filepath"
23         "reflect"
24         "strconv"
25         "strings"
26         "sync"
27         "syscall"
28         "time"
29
30         "git.arvados.org/arvados.git/lib/config"
31         "git.arvados.org/arvados.git/lib/service"
32         "git.arvados.org/arvados.git/sdk/go/arvados"
33         "git.arvados.org/arvados.git/sdk/go/ctxlog"
34         "git.arvados.org/arvados.git/sdk/go/health"
35         "github.com/fsnotify/fsnotify"
36         "github.com/sirupsen/logrus"
37 )
38
39 type Supervisor struct {
40         SourcePath           string // e.g., /home/username/src/arvados
41         SourceVersion        string // e.g., acbd1324...
42         ClusterType          string // e.g., production
43         ListenHost           string // e.g., localhost
44         ControllerAddr       string // e.g., 127.0.0.1:8000
45         NoWorkbench1         bool
46         OwnTemporaryDatabase bool
47         Stderr               io.Writer
48
49         logger  logrus.FieldLogger
50         cluster *arvados.Cluster
51
52         ctx           context.Context
53         cancel        context.CancelFunc
54         done          chan struct{} // closed when child procs/services have shut down
55         err           error         // error that caused shutdown (valid when done is closed)
56         healthChecker *health.Aggregator
57         tasksReady    map[string]chan bool
58         waitShutdown  sync.WaitGroup
59
60         bindir     string
61         tempdir    string
62         wwwtempdir string
63         configfile string
64         environ    []string // for child processes
65 }
66
67 func (super *Supervisor) Cluster() *arvados.Cluster { return super.cluster }
68
69 func (super *Supervisor) Start(ctx context.Context, cfg *arvados.Config, cfgPath string) {
70         super.ctx, super.cancel = context.WithCancel(ctx)
71         super.done = make(chan struct{})
72
73         go func() {
74                 defer close(super.done)
75
76                 sigch := make(chan os.Signal)
77                 signal.Notify(sigch, syscall.SIGINT, syscall.SIGTERM)
78                 defer signal.Stop(sigch)
79                 go func() {
80                         for sig := range sigch {
81                                 super.logger.WithField("signal", sig).Info("caught signal")
82                                 if super.err == nil {
83                                         super.err = fmt.Errorf("caught signal %s", sig)
84                                 }
85                                 super.cancel()
86                         }
87                 }()
88
89                 hupch := make(chan os.Signal)
90                 signal.Notify(hupch, syscall.SIGHUP)
91                 defer signal.Stop(hupch)
92                 go func() {
93                         for sig := range hupch {
94                                 super.logger.WithField("signal", sig).Info("caught signal")
95                                 if super.err == nil {
96                                         super.err = errNeedConfigReload
97                                 }
98                                 super.cancel()
99                         }
100                 }()
101
102                 if cfgPath != "" && cfgPath != "-" && cfg.AutoReloadConfig {
103                         go watchConfig(super.ctx, super.logger, cfgPath, copyConfig(cfg), func() {
104                                 if super.err == nil {
105                                         super.err = errNeedConfigReload
106                                 }
107                                 super.cancel()
108                         })
109                 }
110
111                 err := super.run(cfg)
112                 if err != nil {
113                         super.logger.WithError(err).Warn("supervisor shut down")
114                         if super.err == nil {
115                                 super.err = err
116                         }
117                 }
118         }()
119 }
120
121 func (super *Supervisor) Wait() error {
122         <-super.done
123         return super.err
124 }
125
126 func (super *Supervisor) run(cfg *arvados.Config) error {
127         defer super.cancel()
128
129         cwd, err := os.Getwd()
130         if err != nil {
131                 return err
132         }
133         if !strings.HasPrefix(super.SourcePath, "/") {
134                 super.SourcePath = filepath.Join(cwd, super.SourcePath)
135         }
136         super.SourcePath, err = filepath.EvalSymlinks(super.SourcePath)
137         if err != nil {
138                 return err
139         }
140
141         // Choose bin and temp dirs: /var/lib/arvados/... in
142         // production, transient tempdir otherwise.
143         if super.ClusterType == "production" {
144                 // These dirs have already been created by
145                 // "arvados-server install" (or by extracting a
146                 // package).
147                 super.tempdir = "/var/lib/arvados/tmp"
148                 super.wwwtempdir = "/var/lib/arvados/wwwtmp"
149                 super.bindir = "/var/lib/arvados/bin"
150         } else {
151                 super.tempdir, err = ioutil.TempDir("", "arvados-server-boot-")
152                 if err != nil {
153                         return err
154                 }
155                 defer os.RemoveAll(super.tempdir)
156                 super.wwwtempdir = super.tempdir
157                 super.bindir = filepath.Join(super.tempdir, "bin")
158                 if err := os.Mkdir(super.bindir, 0755); err != nil {
159                         return err
160                 }
161         }
162
163         // Fill in any missing config keys, and write the resulting
164         // config in the temp dir for child services to use.
165         err = super.autofillConfig(cfg)
166         if err != nil {
167                 return err
168         }
169         conffile, err := os.OpenFile(filepath.Join(super.wwwtempdir, "config.yml"), os.O_CREATE|os.O_WRONLY, 0644)
170         if err != nil {
171                 return err
172         }
173         defer conffile.Close()
174         err = json.NewEncoder(conffile).Encode(cfg)
175         if err != nil {
176                 return err
177         }
178         err = conffile.Close()
179         if err != nil {
180                 return err
181         }
182         super.configfile = conffile.Name()
183
184         super.environ = os.Environ()
185         super.cleanEnv([]string{"ARVADOS_"})
186         super.setEnv("ARVADOS_CONFIG", super.configfile)
187         super.setEnv("RAILS_ENV", super.ClusterType)
188         super.setEnv("TMPDIR", super.tempdir)
189         super.prependEnv("PATH", "/var/lib/arvados/bin:")
190         if super.ClusterType != "production" {
191                 super.prependEnv("PATH", super.tempdir+"/bin:")
192         }
193
194         super.cluster, err = cfg.GetCluster("")
195         if err != nil {
196                 return err
197         }
198         // Now that we have the config, replace the bootstrap logger
199         // with a new one according to the logging config.
200         loglevel := super.cluster.SystemLogs.LogLevel
201         if s := os.Getenv("ARVADOS_DEBUG"); s != "" && s != "0" {
202                 loglevel = "debug"
203         }
204         super.logger = ctxlog.New(super.Stderr, super.cluster.SystemLogs.Format, loglevel).WithFields(logrus.Fields{
205                 "PID": os.Getpid(),
206         })
207
208         if super.SourceVersion == "" && super.ClusterType == "production" {
209                 // don't need SourceVersion
210         } else if super.SourceVersion == "" {
211                 // Find current source tree version.
212                 var buf bytes.Buffer
213                 err = super.RunProgram(super.ctx, ".", runOptions{output: &buf}, "git", "diff", "--shortstat")
214                 if err != nil {
215                         return err
216                 }
217                 dirty := buf.Len() > 0
218                 buf.Reset()
219                 err = super.RunProgram(super.ctx, ".", runOptions{output: &buf}, "git", "log", "-n1", "--format=%H")
220                 if err != nil {
221                         return err
222                 }
223                 super.SourceVersion = strings.TrimSpace(buf.String())
224                 if dirty {
225                         super.SourceVersion += "+uncommitted"
226                 }
227         } else {
228                 return errors.New("specifying a version to run is not yet supported")
229         }
230
231         _, err = super.installGoProgram(super.ctx, "cmd/arvados-server")
232         if err != nil {
233                 return err
234         }
235         err = super.setupRubyEnv()
236         if err != nil {
237                 return err
238         }
239
240         tasks := []supervisedTask{
241                 createCertificates{},
242                 runPostgreSQL{},
243                 runNginx{},
244                 runServiceCommand{name: "controller", svc: super.cluster.Services.Controller, depends: []supervisedTask{seedDatabase{}}},
245                 runGoProgram{src: "services/arv-git-httpd", svc: super.cluster.Services.GitHTTP},
246                 runGoProgram{src: "services/health", svc: super.cluster.Services.Health},
247                 runGoProgram{src: "services/keepproxy", svc: super.cluster.Services.Keepproxy, depends: []supervisedTask{runPassenger{src: "services/api"}}},
248                 runServiceCommand{name: "keepstore", svc: super.cluster.Services.Keepstore},
249                 runGoProgram{src: "services/keep-web", svc: super.cluster.Services.WebDAV},
250                 runServiceCommand{name: "ws", svc: super.cluster.Services.Websocket, depends: []supervisedTask{seedDatabase{}}},
251                 installPassenger{src: "services/api"},
252                 runPassenger{src: "services/api", varlibdir: "railsapi", svc: super.cluster.Services.RailsAPI, depends: []supervisedTask{createCertificates{}, seedDatabase{}, installPassenger{src: "services/api"}}},
253                 seedDatabase{},
254         }
255         if !super.NoWorkbench1 {
256                 tasks = append(tasks,
257                         installPassenger{src: "apps/workbench", depends: []supervisedTask{seedDatabase{}}}, // dependency ensures workbench doesn't delay api install/startup
258                         runPassenger{src: "apps/workbench", varlibdir: "workbench1", svc: super.cluster.Services.Workbench1, depends: []supervisedTask{installPassenger{src: "apps/workbench"}}},
259                 )
260         }
261         if super.ClusterType != "test" {
262                 tasks = append(tasks,
263                         runServiceCommand{name: "dispatch-cloud", svc: super.cluster.Services.DispatchCloud},
264                         runGoProgram{src: "services/keep-balance", svc: super.cluster.Services.Keepbalance},
265                 )
266         }
267         super.tasksReady = map[string]chan bool{}
268         for _, task := range tasks {
269                 super.tasksReady[task.String()] = make(chan bool)
270         }
271         for _, task := range tasks {
272                 task := task
273                 fail := func(err error) {
274                         if super.ctx.Err() != nil {
275                                 return
276                         }
277                         super.cancel()
278                         super.logger.WithField("task", task.String()).WithError(err).Error("task failed")
279                 }
280                 go func() {
281                         super.logger.WithField("task", task.String()).Info("starting")
282                         err := task.Run(super.ctx, fail, super)
283                         if err != nil {
284                                 fail(err)
285                                 return
286                         }
287                         close(super.tasksReady[task.String()])
288                 }()
289         }
290         err = super.wait(super.ctx, tasks...)
291         if err != nil {
292                 return err
293         }
294         super.logger.Info("all startup tasks are complete; starting health checks")
295         super.healthChecker = &health.Aggregator{Cluster: super.cluster}
296         <-super.ctx.Done()
297         super.logger.Info("shutting down")
298         super.waitShutdown.Wait()
299         return super.ctx.Err()
300 }
301
302 func (super *Supervisor) wait(ctx context.Context, tasks ...supervisedTask) error {
303         for _, task := range tasks {
304                 ch, ok := super.tasksReady[task.String()]
305                 if !ok {
306                         return fmt.Errorf("no such task: %s", task)
307                 }
308                 super.logger.WithField("task", task.String()).Info("waiting")
309                 select {
310                 case <-ch:
311                         super.logger.WithField("task", task.String()).Info("ready")
312                 case <-ctx.Done():
313                         super.logger.WithField("task", task.String()).Info("task was never ready")
314                         return ctx.Err()
315                 }
316         }
317         return nil
318 }
319
320 func (super *Supervisor) Stop() {
321         super.cancel()
322         <-super.done
323 }
324
325 func (super *Supervisor) WaitReady() (*arvados.URL, bool) {
326         ticker := time.NewTicker(time.Second)
327         defer ticker.Stop()
328         for waiting := "all"; waiting != ""; {
329                 select {
330                 case <-ticker.C:
331                 case <-super.ctx.Done():
332                         return nil, false
333                 }
334                 if super.healthChecker == nil {
335                         // not set up yet
336                         continue
337                 }
338                 resp := super.healthChecker.ClusterHealth()
339                 // The overall health check (resp.Health=="OK") might
340                 // never pass due to missing components (like
341                 // arvados-dispatch-cloud in a test cluster), so
342                 // instead we wait for all configured components to
343                 // pass.
344                 waiting = ""
345                 for target, check := range resp.Checks {
346                         if check.Health != "OK" {
347                                 waiting += " " + target
348                         }
349                 }
350                 if waiting != "" {
351                         super.logger.WithField("targets", waiting[1:]).Info("waiting")
352                 }
353         }
354         u := super.cluster.Services.Controller.ExternalURL
355         return &u, true
356 }
357
358 func (super *Supervisor) prependEnv(key, prepend string) {
359         for i, s := range super.environ {
360                 if strings.HasPrefix(s, key+"=") {
361                         super.environ[i] = key + "=" + prepend + s[len(key)+1:]
362                         return
363                 }
364         }
365         super.environ = append(super.environ, key+"="+prepend)
366 }
367
368 func (super *Supervisor) cleanEnv(prefixes []string) {
369         var cleaned []string
370         for _, s := range super.environ {
371                 drop := false
372                 for _, p := range prefixes {
373                         if strings.HasPrefix(s, p) {
374                                 drop = true
375                                 break
376                         }
377                 }
378                 if !drop {
379                         cleaned = append(cleaned, s)
380                 }
381         }
382         super.environ = cleaned
383 }
384
385 func (super *Supervisor) setEnv(key, val string) {
386         for i, s := range super.environ {
387                 if strings.HasPrefix(s, key+"=") {
388                         super.environ[i] = key + "=" + val
389                         return
390                 }
391         }
392         super.environ = append(super.environ, key+"="+val)
393 }
394
395 // Remove all but the first occurrence of each env var.
396 func dedupEnv(in []string) []string {
397         saw := map[string]bool{}
398         var out []string
399         for _, kv := range in {
400                 if split := strings.Index(kv, "="); split < 1 {
401                         panic("invalid environment var: " + kv)
402                 } else if saw[kv[:split]] {
403                         continue
404                 } else {
405                         saw[kv[:split]] = true
406                         out = append(out, kv)
407                 }
408         }
409         return out
410 }
411
412 func (super *Supervisor) installGoProgram(ctx context.Context, srcpath string) (string, error) {
413         _, basename := filepath.Split(srcpath)
414         binfile := filepath.Join(super.bindir, basename)
415         if super.ClusterType == "production" {
416                 return binfile, nil
417         }
418         err := super.RunProgram(ctx, filepath.Join(super.SourcePath, srcpath), runOptions{env: []string{"GOBIN=" + super.bindir}}, "go", "install", "-ldflags", "-X git.arvados.org/arvados.git/lib/cmd.version="+super.SourceVersion+" -X main.version="+super.SourceVersion)
419         return binfile, err
420 }
421
422 func (super *Supervisor) usingRVM() bool {
423         return os.Getenv("rvm_path") != ""
424 }
425
426 func (super *Supervisor) setupRubyEnv() error {
427         if !super.usingRVM() {
428                 // (If rvm is in use, assume the caller has everything
429                 // set up as desired)
430                 super.cleanEnv([]string{
431                         "GEM_HOME=",
432                         "GEM_PATH=",
433                 })
434                 gem := "gem"
435                 if _, err := os.Stat("/var/lib/arvados/bin/gem"); err == nil || super.ClusterType == "production" {
436                         gem = "/var/lib/arvados/bin/gem"
437                 }
438                 cmd := exec.Command(gem, "env", "gempath")
439                 if super.ClusterType == "production" {
440                         cmd.Args = append([]string{"sudo", "-u", "www-data", "-E", "HOME=/var/www"}, cmd.Args...)
441                         path, err := exec.LookPath("sudo")
442                         if err != nil {
443                                 return fmt.Errorf("LookPath(\"sudo\"): %w", err)
444                         }
445                         cmd.Path = path
446                 }
447                 cmd.Stderr = super.Stderr
448                 cmd.Env = super.environ
449                 buf, err := cmd.Output() // /var/lib/arvados/.gem/ruby/2.5.0/bin:...
450                 if err != nil || len(buf) == 0 {
451                         return fmt.Errorf("gem env gempath: %w", err)
452                 }
453                 gempath := string(bytes.Split(buf, []byte{':'})[0])
454                 super.prependEnv("PATH", gempath+"/bin:")
455                 super.setEnv("GEM_HOME", gempath)
456                 super.setEnv("GEM_PATH", gempath)
457         }
458         // Passenger install doesn't work unless $HOME is ~user
459         u, err := user.Current()
460         if err != nil {
461                 return err
462         }
463         super.setEnv("HOME", u.HomeDir)
464         return nil
465 }
466
467 func (super *Supervisor) lookPath(prog string) string {
468         for _, val := range super.environ {
469                 if strings.HasPrefix(val, "PATH=") {
470                         for _, dir := range filepath.SplitList(val[5:]) {
471                                 path := filepath.Join(dir, prog)
472                                 if fi, err := os.Stat(path); err == nil && fi.Mode()&0111 != 0 {
473                                         return path
474                                 }
475                         }
476                 }
477         }
478         return prog
479 }
480
481 type runOptions struct {
482         output io.Writer // attach stdout
483         env    []string  // add/replace environment variables
484         user   string    // run as specified user
485 }
486
487 // RunProgram runs prog with args, using dir as working directory. If ctx is
488 // cancelled while the child is running, RunProgram terminates the child, waits
489 // for it to exit, then returns.
490 //
491 // Child's environment will have our env vars, plus any given in env.
492 //
493 // Child's stdout will be written to output if non-nil, otherwise the
494 // boot command's stderr.
495 func (super *Supervisor) RunProgram(ctx context.Context, dir string, opts runOptions, prog string, args ...string) error {
496         cmdline := fmt.Sprintf("%s", append([]string{prog}, args...))
497         super.logger.WithField("command", cmdline).WithField("dir", dir).Info("executing")
498
499         logprefix := prog
500         {
501                 innerargs := args
502                 if logprefix == "sudo" {
503                         for i := 0; i < len(args); i++ {
504                                 if args[i] == "-u" {
505                                         i++
506                                 } else if args[i] == "-E" || strings.Contains(args[i], "=") {
507                                 } else {
508                                         logprefix = args[i]
509                                         innerargs = args[i+1:]
510                                         break
511                                 }
512                         }
513                 }
514                 logprefix = strings.TrimPrefix(logprefix, "/var/lib/arvados/bin/")
515                 logprefix = strings.TrimPrefix(logprefix, super.tempdir+"/bin/")
516                 if logprefix == "bundle" && len(innerargs) > 2 && innerargs[0] == "exec" {
517                         _, dirbase := filepath.Split(dir)
518                         logprefix = innerargs[1] + "@" + dirbase
519                 } else if logprefix == "arvados-server" && len(args) > 1 {
520                         logprefix = args[0]
521                 }
522                 if !strings.HasPrefix(dir, "/") {
523                         logprefix = dir + ": " + logprefix
524                 }
525         }
526
527         cmd := exec.Command(super.lookPath(prog), args...)
528         stdout, err := cmd.StdoutPipe()
529         if err != nil {
530                 return err
531         }
532         stderr, err := cmd.StderrPipe()
533         if err != nil {
534                 return err
535         }
536         logwriter := &service.LogPrefixer{Writer: super.Stderr, Prefix: []byte("[" + logprefix + "] ")}
537         var copiers sync.WaitGroup
538         copiers.Add(1)
539         go func() {
540                 io.Copy(logwriter, stderr)
541                 copiers.Done()
542         }()
543         copiers.Add(1)
544         go func() {
545                 if opts.output == nil {
546                         io.Copy(logwriter, stdout)
547                 } else {
548                         io.Copy(opts.output, stdout)
549                 }
550                 copiers.Done()
551         }()
552
553         if strings.HasPrefix(dir, "/") {
554                 cmd.Dir = dir
555         } else {
556                 cmd.Dir = filepath.Join(super.SourcePath, dir)
557         }
558         env := append([]string(nil), opts.env...)
559         env = append(env, super.environ...)
560         cmd.Env = dedupEnv(env)
561
562         if opts.user != "" {
563                 // Note: We use this approach instead of "sudo"
564                 // because in certain circumstances (we are pid 1 in a
565                 // docker container, and our passenger child process
566                 // changes to pgid 1) the intermediate sudo process
567                 // notices we have the same pgid as our child and
568                 // refuses to propagate signals from us to our child,
569                 // so we can't signal/shutdown our passenger/rails
570                 // apps. "chpst" or "setuidgid" would work, but these
571                 // few lines avoid depending on runit/daemontools.
572                 u, err := user.Lookup(opts.user)
573                 if err != nil {
574                         return fmt.Errorf("user.Lookup(%q): %w", opts.user, err)
575                 }
576                 uid, _ := strconv.Atoi(u.Uid)
577                 gid, _ := strconv.Atoi(u.Gid)
578                 cmd.SysProcAttr = &syscall.SysProcAttr{
579                         Credential: &syscall.Credential{
580                                 Uid: uint32(uid),
581                                 Gid: uint32(gid),
582                         },
583                 }
584         }
585
586         exited := false
587         defer func() { exited = true }()
588         go func() {
589                 <-ctx.Done()
590                 log := ctxlog.FromContext(ctx).WithFields(logrus.Fields{"dir": dir, "cmdline": cmdline})
591                 for !exited {
592                         if cmd.Process == nil {
593                                 log.Debug("waiting for child process to start")
594                                 time.Sleep(time.Second / 2)
595                         } else {
596                                 log.WithField("PID", cmd.Process.Pid).Debug("sending SIGTERM")
597                                 cmd.Process.Signal(syscall.SIGTERM)
598                                 time.Sleep(5 * time.Second)
599                                 if !exited {
600                                         stdout.Close()
601                                         stderr.Close()
602                                         log.WithField("PID", cmd.Process.Pid).Warn("still waiting for child process to exit 5s after SIGTERM")
603                                 }
604                         }
605                 }
606         }()
607
608         err = cmd.Start()
609         if err != nil {
610                 return err
611         }
612         copiers.Wait()
613         err = cmd.Wait()
614         if ctx.Err() != nil {
615                 // Return "context canceled", instead of the "killed"
616                 // error that was probably caused by the context being
617                 // canceled.
618                 return ctx.Err()
619         } else if err != nil {
620                 return fmt.Errorf("%s: error: %v", cmdline, err)
621         }
622         return nil
623 }
624
625 func (super *Supervisor) autofillConfig(cfg *arvados.Config) error {
626         cluster, err := cfg.GetCluster("")
627         if err != nil {
628                 return err
629         }
630         usedPort := map[string]bool{}
631         nextPort := func(host string) string {
632                 for {
633                         port, err := availablePort(host)
634                         if err != nil {
635                                 panic(err)
636                         }
637                         if usedPort[port] {
638                                 continue
639                         }
640                         usedPort[port] = true
641                         return port
642                 }
643         }
644         if cluster.Services.Controller.ExternalURL.Host == "" {
645                 h, p, err := net.SplitHostPort(super.ControllerAddr)
646                 if err != nil {
647                         return err
648                 }
649                 if h == "" {
650                         h = super.ListenHost
651                 }
652                 if p == "0" {
653                         p = nextPort(h)
654                 }
655                 cluster.Services.Controller.ExternalURL = arvados.URL{Scheme: "https", Host: net.JoinHostPort(h, p), Path: "/"}
656         }
657         for _, svc := range []*arvados.Service{
658                 &cluster.Services.Controller,
659                 &cluster.Services.DispatchCloud,
660                 &cluster.Services.GitHTTP,
661                 &cluster.Services.Health,
662                 &cluster.Services.Keepproxy,
663                 &cluster.Services.Keepstore,
664                 &cluster.Services.RailsAPI,
665                 &cluster.Services.WebDAV,
666                 &cluster.Services.WebDAVDownload,
667                 &cluster.Services.Websocket,
668                 &cluster.Services.Workbench1,
669         } {
670                 if svc == &cluster.Services.DispatchCloud && super.ClusterType == "test" {
671                         continue
672                 }
673                 if svc.ExternalURL.Host == "" {
674                         if svc == &cluster.Services.Controller ||
675                                 svc == &cluster.Services.GitHTTP ||
676                                 svc == &cluster.Services.Health ||
677                                 svc == &cluster.Services.Keepproxy ||
678                                 svc == &cluster.Services.WebDAV ||
679                                 svc == &cluster.Services.WebDAVDownload ||
680                                 svc == &cluster.Services.Workbench1 {
681                                 svc.ExternalURL = arvados.URL{Scheme: "https", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost)), Path: "/"}
682                         } else if svc == &cluster.Services.Websocket {
683                                 svc.ExternalURL = arvados.URL{Scheme: "wss", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost)), Path: "/websocket"}
684                         }
685                 }
686                 if super.NoWorkbench1 && svc == &cluster.Services.Workbench1 {
687                         // When workbench1 is disabled, it gets an
688                         // ExternalURL (so we have a valid listening
689                         // port to write in our Nginx config) but no
690                         // InternalURLs (so health checker doesn't
691                         // complain).
692                         continue
693                 }
694                 if len(svc.InternalURLs) == 0 {
695                         svc.InternalURLs = map[arvados.URL]arvados.ServiceInstance{
696                                 {Scheme: "http", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost)), Path: "/"}: {},
697                         }
698                 }
699         }
700         if super.ClusterType != "production" {
701                 if cluster.SystemRootToken == "" {
702                         cluster.SystemRootToken = randomHexString(64)
703                 }
704                 if cluster.ManagementToken == "" {
705                         cluster.ManagementToken = randomHexString(64)
706                 }
707                 if cluster.Collections.BlobSigningKey == "" {
708                         cluster.Collections.BlobSigningKey = randomHexString(64)
709                 }
710                 if cluster.Users.AnonymousUserToken == "" {
711                         cluster.Users.AnonymousUserToken = randomHexString(64)
712                 }
713                 if cluster.Containers.DispatchPrivateKey == "" {
714                         buf, err := ioutil.ReadFile(filepath.Join(super.SourcePath, "lib", "dispatchcloud", "test", "sshkey_dispatch"))
715                         if err != nil {
716                                 return err
717                         }
718                         cluster.Containers.DispatchPrivateKey = string(buf)
719                 }
720                 cluster.TLS.Insecure = true
721         }
722         if super.ClusterType == "test" {
723                 // Add a second keepstore process.
724                 cluster.Services.Keepstore.InternalURLs[arvados.URL{Scheme: "http", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost)), Path: "/"}] = arvados.ServiceInstance{}
725
726                 // Create a directory-backed volume for each keepstore
727                 // process.
728                 cluster.Volumes = map[string]arvados.Volume{}
729                 for url := range cluster.Services.Keepstore.InternalURLs {
730                         volnum := len(cluster.Volumes)
731                         datadir := fmt.Sprintf("%s/keep%d.data", super.tempdir, volnum)
732                         if _, err = os.Stat(datadir + "/."); err == nil {
733                         } else if !os.IsNotExist(err) {
734                                 return err
735                         } else if err = os.Mkdir(datadir, 0755); err != nil {
736                                 return err
737                         }
738                         cluster.Volumes[fmt.Sprintf(cluster.ClusterID+"-nyw5e-%015d", volnum)] = arvados.Volume{
739                                 Driver:           "Directory",
740                                 DriverParameters: json.RawMessage(fmt.Sprintf(`{"Root":%q}`, datadir)),
741                                 AccessViaHosts: map[arvados.URL]arvados.VolumeAccess{
742                                         url: {},
743                                 },
744                                 StorageClasses: map[string]bool{
745                                         "default": true,
746                                         "foo":     true,
747                                         "bar":     true,
748                                 },
749                         }
750                 }
751                 cluster.StorageClasses = map[string]arvados.StorageClassConfig{
752                         "default": {Default: true},
753                         "foo":     {},
754                         "bar":     {},
755                 }
756         }
757         if super.OwnTemporaryDatabase {
758                 cluster.PostgreSQL.Connection = arvados.PostgreSQLConnection{
759                         "client_encoding": "utf8",
760                         "host":            super.ListenHost,
761                         "port":            nextPort(super.ListenHost),
762                         "dbname":          "arvados_test",
763                         "user":            "arvados",
764                         "password":        "insecure_arvados_test",
765                 }
766         }
767
768         cfg.Clusters[cluster.ClusterID] = *cluster
769         return nil
770 }
771
772 func addrIsLocal(addr string) (bool, error) {
773         return true, nil
774         listener, err := net.Listen("tcp", addr)
775         if err == nil {
776                 listener.Close()
777                 return true, nil
778         } else if strings.Contains(err.Error(), "cannot assign requested address") {
779                 return false, nil
780         } else {
781                 return false, err
782         }
783 }
784
785 func randomHexString(chars int) string {
786         b := make([]byte, chars/2)
787         _, err := rand.Read(b)
788         if err != nil {
789                 panic(err)
790         }
791         return fmt.Sprintf("%x", b)
792 }
793
794 func internalPort(svc arvados.Service) (host, port string, err error) {
795         if len(svc.InternalURLs) > 1 {
796                 return "", "", errors.New("internalPort() doesn't work with multiple InternalURLs")
797         }
798         for u := range svc.InternalURLs {
799                 u := url.URL(u)
800                 host, port = u.Hostname(), u.Port()
801                 switch {
802                 case port != "":
803                 case u.Scheme == "https", u.Scheme == "ws":
804                         port = "443"
805                 default:
806                         port = "80"
807                 }
808                 return
809         }
810         return "", "", fmt.Errorf("service has no InternalURLs")
811 }
812
813 func externalPort(svc arvados.Service) (string, error) {
814         u := url.URL(svc.ExternalURL)
815         if p := u.Port(); p != "" {
816                 return p, nil
817         } else if u.Scheme == "https" || u.Scheme == "wss" {
818                 return "443", nil
819         } else {
820                 return "80", nil
821         }
822 }
823
824 func availablePort(host string) (string, error) {
825         ln, err := net.Listen("tcp", net.JoinHostPort(host, "0"))
826         if err != nil {
827                 return "", err
828         }
829         defer ln.Close()
830         _, port, err := net.SplitHostPort(ln.Addr().String())
831         if err != nil {
832                 return "", err
833         }
834         return port, nil
835 }
836
837 // Try to connect to addr until it works, then close ch. Give up if
838 // ctx cancels.
839 func waitForConnect(ctx context.Context, addr string) error {
840         dialer := net.Dialer{Timeout: time.Second}
841         for ctx.Err() == nil {
842                 conn, err := dialer.DialContext(ctx, "tcp", addr)
843                 if err != nil {
844                         time.Sleep(time.Second / 10)
845                         continue
846                 }
847                 conn.Close()
848                 return nil
849         }
850         return ctx.Err()
851 }
852
853 func copyConfig(cfg *arvados.Config) *arvados.Config {
854         pr, pw := io.Pipe()
855         go func() {
856                 err := json.NewEncoder(pw).Encode(cfg)
857                 if err != nil {
858                         panic(err)
859                 }
860                 pw.Close()
861         }()
862         cfg2 := new(arvados.Config)
863         err := json.NewDecoder(pr).Decode(cfg2)
864         if err != nil {
865                 panic(err)
866         }
867         return cfg2
868 }
869
870 func watchConfig(ctx context.Context, logger logrus.FieldLogger, cfgPath string, prevcfg *arvados.Config, fn func()) {
871         watcher, err := fsnotify.NewWatcher()
872         if err != nil {
873                 logger.WithError(err).Error("fsnotify setup failed")
874                 return
875         }
876         defer watcher.Close()
877
878         err = watcher.Add(cfgPath)
879         if err != nil {
880                 logger.WithError(err).Error("fsnotify watcher failed")
881                 return
882         }
883
884         for {
885                 select {
886                 case <-ctx.Done():
887                         return
888                 case err, ok := <-watcher.Errors:
889                         if !ok {
890                                 return
891                         }
892                         logger.WithError(err).Warn("fsnotify watcher reported error")
893                 case _, ok := <-watcher.Events:
894                         if !ok {
895                                 return
896                         }
897                         for len(watcher.Events) > 0 {
898                                 <-watcher.Events
899                         }
900                         loader := config.NewLoader(&bytes.Buffer{}, &logrus.Logger{Out: ioutil.Discard})
901                         loader.Path = cfgPath
902                         loader.SkipAPICalls = true
903                         cfg, err := loader.Load()
904                         if err != nil {
905                                 logger.WithError(err).Warn("error reloading config file after change detected; ignoring new config for now")
906                         } else if reflect.DeepEqual(cfg, prevcfg) {
907                                 logger.Debug("config file changed but is still DeepEqual to the existing config")
908                         } else {
909                                 logger.Debug("config changed, notifying supervisor")
910                                 fn()
911                                 prevcfg = cfg
912                         }
913                 }
914         }
915 }