18700: Don't start workbench2 in integration test clusters.
[arvados.git] / lib / boot / supervisor.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package boot
6
7 import (
8         "bytes"
9         "context"
10         "crypto/rand"
11         "encoding/json"
12         "errors"
13         "fmt"
14         "io"
15         "io/ioutil"
16         "net"
17         "net/url"
18         "os"
19         "os/exec"
20         "os/signal"
21         "os/user"
22         "path/filepath"
23         "reflect"
24         "strconv"
25         "strings"
26         "sync"
27         "syscall"
28         "time"
29
30         "git.arvados.org/arvados.git/lib/config"
31         "git.arvados.org/arvados.git/lib/service"
32         "git.arvados.org/arvados.git/sdk/go/arvados"
33         "git.arvados.org/arvados.git/sdk/go/ctxlog"
34         "git.arvados.org/arvados.git/sdk/go/health"
35         "github.com/fsnotify/fsnotify"
36         "github.com/sirupsen/logrus"
37 )
38
39 type Supervisor struct {
40         SourcePath           string // e.g., /home/username/src/arvados
41         SourceVersion        string // e.g., acbd1324...
42         ClusterType          string // e.g., production
43         ListenHost           string // e.g., localhost
44         ControllerAddr       string // e.g., 127.0.0.1:8000
45         Workbench2Source     string // e.g., /home/username/src/arvados-workbench2
46         NoWorkbench1         bool
47         NoWorkbench2         bool
48         OwnTemporaryDatabase bool
49         Stderr               io.Writer
50
51         logger  logrus.FieldLogger
52         cluster *arvados.Cluster
53
54         ctx           context.Context
55         cancel        context.CancelFunc
56         done          chan struct{} // closed when child procs/services have shut down
57         err           error         // error that caused shutdown (valid when done is closed)
58         healthChecker *health.Aggregator
59         tasksReady    map[string]chan bool
60         waitShutdown  sync.WaitGroup
61
62         bindir     string
63         tempdir    string
64         wwwtempdir string
65         configfile string
66         environ    []string // for child processes
67 }
68
69 func (super *Supervisor) Cluster() *arvados.Cluster { return super.cluster }
70
71 func (super *Supervisor) Start(ctx context.Context, cfg *arvados.Config, cfgPath string) {
72         super.ctx, super.cancel = context.WithCancel(ctx)
73         super.done = make(chan struct{})
74
75         go func() {
76                 defer close(super.done)
77
78                 sigch := make(chan os.Signal)
79                 signal.Notify(sigch, syscall.SIGINT, syscall.SIGTERM)
80                 defer signal.Stop(sigch)
81                 go func() {
82                         for sig := range sigch {
83                                 super.logger.WithField("signal", sig).Info("caught signal")
84                                 if super.err == nil {
85                                         super.err = fmt.Errorf("caught signal %s", sig)
86                                 }
87                                 super.cancel()
88                         }
89                 }()
90
91                 hupch := make(chan os.Signal)
92                 signal.Notify(hupch, syscall.SIGHUP)
93                 defer signal.Stop(hupch)
94                 go func() {
95                         for sig := range hupch {
96                                 super.logger.WithField("signal", sig).Info("caught signal")
97                                 if super.err == nil {
98                                         super.err = errNeedConfigReload
99                                 }
100                                 super.cancel()
101                         }
102                 }()
103
104                 if cfgPath != "" && cfgPath != "-" && cfg.AutoReloadConfig {
105                         go watchConfig(super.ctx, super.logger, cfgPath, copyConfig(cfg), func() {
106                                 if super.err == nil {
107                                         super.err = errNeedConfigReload
108                                 }
109                                 super.cancel()
110                         })
111                 }
112
113                 err := super.run(cfg)
114                 if err != nil {
115                         super.logger.WithError(err).Warn("supervisor shut down")
116                         if super.err == nil {
117                                 super.err = err
118                         }
119                 }
120         }()
121 }
122
123 func (super *Supervisor) Wait() error {
124         <-super.done
125         return super.err
126 }
127
128 func (super *Supervisor) run(cfg *arvados.Config) error {
129         defer super.cancel()
130
131         cwd, err := os.Getwd()
132         if err != nil {
133                 return err
134         }
135         if !strings.HasPrefix(super.SourcePath, "/") {
136                 super.SourcePath = filepath.Join(cwd, super.SourcePath)
137         }
138         super.SourcePath, err = filepath.EvalSymlinks(super.SourcePath)
139         if err != nil {
140                 return err
141         }
142
143         // Choose bin and temp dirs: /var/lib/arvados/... in
144         // production, transient tempdir otherwise.
145         if super.ClusterType == "production" {
146                 // These dirs have already been created by
147                 // "arvados-server install" (or by extracting a
148                 // package).
149                 super.tempdir = "/var/lib/arvados/tmp"
150                 super.wwwtempdir = "/var/lib/arvados/wwwtmp"
151                 super.bindir = "/var/lib/arvados/bin"
152         } else {
153                 super.tempdir, err = ioutil.TempDir("", "arvados-server-boot-")
154                 if err != nil {
155                         return err
156                 }
157                 defer os.RemoveAll(super.tempdir)
158                 super.wwwtempdir = super.tempdir
159                 super.bindir = filepath.Join(super.tempdir, "bin")
160                 if err := os.Mkdir(super.bindir, 0755); err != nil {
161                         return err
162                 }
163         }
164
165         // Fill in any missing config keys, and write the resulting
166         // config in the temp dir for child services to use.
167         err = super.autofillConfig(cfg)
168         if err != nil {
169                 return err
170         }
171         conffile, err := os.OpenFile(filepath.Join(super.wwwtempdir, "config.yml"), os.O_CREATE|os.O_WRONLY, 0644)
172         if err != nil {
173                 return err
174         }
175         defer conffile.Close()
176         err = json.NewEncoder(conffile).Encode(cfg)
177         if err != nil {
178                 return err
179         }
180         err = conffile.Close()
181         if err != nil {
182                 return err
183         }
184         super.configfile = conffile.Name()
185
186         super.environ = os.Environ()
187         super.cleanEnv([]string{"ARVADOS_"})
188         super.setEnv("ARVADOS_CONFIG", super.configfile)
189         super.setEnv("RAILS_ENV", super.ClusterType)
190         super.setEnv("TMPDIR", super.tempdir)
191         super.prependEnv("PATH", "/var/lib/arvados/bin:")
192         if super.ClusterType != "production" {
193                 super.prependEnv("PATH", super.tempdir+"/bin:")
194         }
195
196         super.cluster, err = cfg.GetCluster("")
197         if err != nil {
198                 return err
199         }
200         // Now that we have the config, replace the bootstrap logger
201         // with a new one according to the logging config.
202         loglevel := super.cluster.SystemLogs.LogLevel
203         if s := os.Getenv("ARVADOS_DEBUG"); s != "" && s != "0" {
204                 loglevel = "debug"
205         }
206         super.logger = ctxlog.New(super.Stderr, super.cluster.SystemLogs.Format, loglevel).WithFields(logrus.Fields{
207                 "PID": os.Getpid(),
208         })
209
210         if super.SourceVersion == "" && super.ClusterType == "production" {
211                 // don't need SourceVersion
212         } else if super.SourceVersion == "" {
213                 // Find current source tree version.
214                 var buf bytes.Buffer
215                 err = super.RunProgram(super.ctx, ".", runOptions{output: &buf}, "git", "diff", "--shortstat")
216                 if err != nil {
217                         return err
218                 }
219                 dirty := buf.Len() > 0
220                 buf.Reset()
221                 err = super.RunProgram(super.ctx, ".", runOptions{output: &buf}, "git", "log", "-n1", "--format=%H")
222                 if err != nil {
223                         return err
224                 }
225                 super.SourceVersion = strings.TrimSpace(buf.String())
226                 if dirty {
227                         super.SourceVersion += "+uncommitted"
228                 }
229         } else {
230                 return errors.New("specifying a version to run is not yet supported")
231         }
232
233         _, err = super.installGoProgram(super.ctx, "cmd/arvados-server")
234         if err != nil {
235                 return err
236         }
237         err = super.setupRubyEnv()
238         if err != nil {
239                 return err
240         }
241
242         tasks := []supervisedTask{
243                 createCertificates{},
244                 runPostgreSQL{},
245                 runNginx{},
246                 runServiceCommand{name: "controller", svc: super.cluster.Services.Controller, depends: []supervisedTask{seedDatabase{}}},
247                 runGoProgram{src: "services/arv-git-httpd", svc: super.cluster.Services.GitHTTP},
248                 runGoProgram{src: "services/health", svc: super.cluster.Services.Health},
249                 runGoProgram{src: "services/keepproxy", svc: super.cluster.Services.Keepproxy, depends: []supervisedTask{runPassenger{src: "services/api"}}},
250                 runServiceCommand{name: "keepstore", svc: super.cluster.Services.Keepstore},
251                 runGoProgram{src: "services/keep-web", svc: super.cluster.Services.WebDAV},
252                 runServiceCommand{name: "ws", svc: super.cluster.Services.Websocket, depends: []supervisedTask{seedDatabase{}}},
253                 installPassenger{src: "services/api"},
254                 runPassenger{src: "services/api", varlibdir: "railsapi", svc: super.cluster.Services.RailsAPI, depends: []supervisedTask{createCertificates{}, seedDatabase{}, installPassenger{src: "services/api"}}},
255                 seedDatabase{},
256         }
257         if !super.NoWorkbench1 {
258                 tasks = append(tasks,
259                         installPassenger{src: "apps/workbench", depends: []supervisedTask{seedDatabase{}}}, // dependency ensures workbench doesn't delay api install/startup
260                         runPassenger{src: "apps/workbench", varlibdir: "workbench1", svc: super.cluster.Services.Workbench1, depends: []supervisedTask{installPassenger{src: "apps/workbench"}}},
261                 )
262         }
263         if !super.NoWorkbench2 {
264                 tasks = append(tasks,
265                         runWorkbench2{svc: super.cluster.Services.Workbench2},
266                 )
267         }
268         if super.ClusterType != "test" {
269                 tasks = append(tasks,
270                         runServiceCommand{name: "dispatch-cloud", svc: super.cluster.Services.DispatchCloud},
271                         runGoProgram{src: "services/keep-balance", svc: super.cluster.Services.Keepbalance},
272                 )
273         }
274         super.tasksReady = map[string]chan bool{}
275         for _, task := range tasks {
276                 super.tasksReady[task.String()] = make(chan bool)
277         }
278         for _, task := range tasks {
279                 task := task
280                 fail := func(err error) {
281                         if super.ctx.Err() != nil {
282                                 return
283                         }
284                         super.cancel()
285                         super.logger.WithField("task", task.String()).WithError(err).Error("task failed")
286                 }
287                 go func() {
288                         super.logger.WithField("task", task.String()).Info("starting")
289                         err := task.Run(super.ctx, fail, super)
290                         if err != nil {
291                                 fail(err)
292                                 return
293                         }
294                         close(super.tasksReady[task.String()])
295                 }()
296         }
297         err = super.wait(super.ctx, tasks...)
298         if err != nil {
299                 return err
300         }
301         super.logger.Info("all startup tasks are complete; starting health checks")
302         super.healthChecker = &health.Aggregator{Cluster: super.cluster}
303         <-super.ctx.Done()
304         super.logger.Info("shutting down")
305         super.waitShutdown.Wait()
306         return super.ctx.Err()
307 }
308
309 func (super *Supervisor) wait(ctx context.Context, tasks ...supervisedTask) error {
310         for _, task := range tasks {
311                 ch, ok := super.tasksReady[task.String()]
312                 if !ok {
313                         return fmt.Errorf("no such task: %s", task)
314                 }
315                 super.logger.WithField("task", task.String()).Info("waiting")
316                 select {
317                 case <-ch:
318                         super.logger.WithField("task", task.String()).Info("ready")
319                 case <-ctx.Done():
320                         super.logger.WithField("task", task.String()).Info("task was never ready")
321                         return ctx.Err()
322                 }
323         }
324         return nil
325 }
326
327 func (super *Supervisor) Stop() {
328         super.cancel()
329         <-super.done
330 }
331
332 func (super *Supervisor) WaitReady() (*arvados.URL, bool) {
333         ticker := time.NewTicker(time.Second)
334         defer ticker.Stop()
335         for waiting := "all"; waiting != ""; {
336                 select {
337                 case <-ticker.C:
338                 case <-super.ctx.Done():
339                         return nil, false
340                 }
341                 if super.healthChecker == nil {
342                         // not set up yet
343                         continue
344                 }
345                 resp := super.healthChecker.ClusterHealth()
346                 // The overall health check (resp.Health=="OK") might
347                 // never pass due to missing components (like
348                 // arvados-dispatch-cloud in a test cluster), so
349                 // instead we wait for all configured components to
350                 // pass.
351                 waiting = ""
352                 for target, check := range resp.Checks {
353                         if check.Health != "OK" {
354                                 waiting += " " + target
355                         }
356                 }
357                 if waiting != "" {
358                         super.logger.WithField("targets", waiting[1:]).Info("waiting")
359                 }
360         }
361         u := super.cluster.Services.Controller.ExternalURL
362         return &u, true
363 }
364
365 func (super *Supervisor) prependEnv(key, prepend string) {
366         for i, s := range super.environ {
367                 if strings.HasPrefix(s, key+"=") {
368                         super.environ[i] = key + "=" + prepend + s[len(key)+1:]
369                         return
370                 }
371         }
372         super.environ = append(super.environ, key+"="+prepend)
373 }
374
375 func (super *Supervisor) cleanEnv(prefixes []string) {
376         var cleaned []string
377         for _, s := range super.environ {
378                 drop := false
379                 for _, p := range prefixes {
380                         if strings.HasPrefix(s, p) {
381                                 drop = true
382                                 break
383                         }
384                 }
385                 if !drop {
386                         cleaned = append(cleaned, s)
387                 }
388         }
389         super.environ = cleaned
390 }
391
392 func (super *Supervisor) setEnv(key, val string) {
393         for i, s := range super.environ {
394                 if strings.HasPrefix(s, key+"=") {
395                         super.environ[i] = key + "=" + val
396                         return
397                 }
398         }
399         super.environ = append(super.environ, key+"="+val)
400 }
401
402 // Remove all but the first occurrence of each env var.
403 func dedupEnv(in []string) []string {
404         saw := map[string]bool{}
405         var out []string
406         for _, kv := range in {
407                 if split := strings.Index(kv, "="); split < 1 {
408                         panic("invalid environment var: " + kv)
409                 } else if saw[kv[:split]] {
410                         continue
411                 } else {
412                         saw[kv[:split]] = true
413                         out = append(out, kv)
414                 }
415         }
416         return out
417 }
418
419 func (super *Supervisor) installGoProgram(ctx context.Context, srcpath string) (string, error) {
420         _, basename := filepath.Split(srcpath)
421         binfile := filepath.Join(super.bindir, basename)
422         if super.ClusterType == "production" {
423                 return binfile, nil
424         }
425         err := super.RunProgram(ctx, filepath.Join(super.SourcePath, srcpath), runOptions{env: []string{"GOBIN=" + super.bindir}}, "go", "install", "-ldflags", "-X git.arvados.org/arvados.git/lib/cmd.version="+super.SourceVersion+" -X main.version="+super.SourceVersion)
426         return binfile, err
427 }
428
429 func (super *Supervisor) usingRVM() bool {
430         return os.Getenv("rvm_path") != ""
431 }
432
433 func (super *Supervisor) setupRubyEnv() error {
434         if !super.usingRVM() {
435                 // (If rvm is in use, assume the caller has everything
436                 // set up as desired)
437                 super.cleanEnv([]string{
438                         "GEM_HOME=",
439                         "GEM_PATH=",
440                 })
441                 gem := "gem"
442                 if _, err := os.Stat("/var/lib/arvados/bin/gem"); err == nil || super.ClusterType == "production" {
443                         gem = "/var/lib/arvados/bin/gem"
444                 }
445                 cmd := exec.Command(gem, "env", "gempath")
446                 if super.ClusterType == "production" {
447                         cmd.Args = append([]string{"sudo", "-u", "www-data", "-E", "HOME=/var/www"}, cmd.Args...)
448                         path, err := exec.LookPath("sudo")
449                         if err != nil {
450                                 return fmt.Errorf("LookPath(\"sudo\"): %w", err)
451                         }
452                         cmd.Path = path
453                 }
454                 cmd.Stderr = super.Stderr
455                 cmd.Env = super.environ
456                 buf, err := cmd.Output() // /var/lib/arvados/.gem/ruby/2.5.0/bin:...
457                 if err != nil || len(buf) == 0 {
458                         return fmt.Errorf("gem env gempath: %w", err)
459                 }
460                 gempath := string(bytes.Split(buf, []byte{':'})[0])
461                 super.prependEnv("PATH", gempath+"/bin:")
462                 super.setEnv("GEM_HOME", gempath)
463                 super.setEnv("GEM_PATH", gempath)
464         }
465         // Passenger install doesn't work unless $HOME is ~user
466         u, err := user.Current()
467         if err != nil {
468                 return err
469         }
470         super.setEnv("HOME", u.HomeDir)
471         return nil
472 }
473
474 func (super *Supervisor) lookPath(prog string) string {
475         for _, val := range super.environ {
476                 if strings.HasPrefix(val, "PATH=") {
477                         for _, dir := range filepath.SplitList(val[5:]) {
478                                 path := filepath.Join(dir, prog)
479                                 if fi, err := os.Stat(path); err == nil && fi.Mode()&0111 != 0 {
480                                         return path
481                                 }
482                         }
483                 }
484         }
485         return prog
486 }
487
488 type runOptions struct {
489         output io.Writer // attach stdout
490         env    []string  // add/replace environment variables
491         user   string    // run as specified user
492         stdin  io.Reader
493 }
494
495 // RunProgram runs prog with args, using dir as working directory. If ctx is
496 // cancelled while the child is running, RunProgram terminates the child, waits
497 // for it to exit, then returns.
498 //
499 // Child's environment will have our env vars, plus any given in env.
500 //
501 // Child's stdout will be written to output if non-nil, otherwise the
502 // boot command's stderr.
503 func (super *Supervisor) RunProgram(ctx context.Context, dir string, opts runOptions, prog string, args ...string) error {
504         cmdline := fmt.Sprintf("%s", append([]string{prog}, args...))
505         super.logger.WithField("command", cmdline).WithField("dir", dir).Info("executing")
506
507         logprefix := prog
508         {
509                 innerargs := args
510                 if logprefix == "sudo" {
511                         for i := 0; i < len(args); i++ {
512                                 if args[i] == "-u" {
513                                         i++
514                                 } else if args[i] == "-E" || strings.Contains(args[i], "=") {
515                                 } else {
516                                         logprefix = args[i]
517                                         innerargs = args[i+1:]
518                                         break
519                                 }
520                         }
521                 }
522                 logprefix = strings.TrimPrefix(logprefix, "/var/lib/arvados/bin/")
523                 logprefix = strings.TrimPrefix(logprefix, super.tempdir+"/bin/")
524                 if logprefix == "bundle" && len(innerargs) > 2 && innerargs[0] == "exec" {
525                         _, dirbase := filepath.Split(dir)
526                         logprefix = innerargs[1] + "@" + dirbase
527                 } else if logprefix == "arvados-server" && len(args) > 1 {
528                         logprefix = args[0]
529                 }
530                 if !strings.HasPrefix(dir, "/") {
531                         logprefix = dir + ": " + logprefix
532                 }
533         }
534
535         cmd := exec.Command(super.lookPath(prog), args...)
536         cmd.Stdin = opts.stdin
537         stdout, err := cmd.StdoutPipe()
538         if err != nil {
539                 return err
540         }
541         stderr, err := cmd.StderrPipe()
542         if err != nil {
543                 return err
544         }
545         logwriter := &service.LogPrefixer{Writer: super.Stderr, Prefix: []byte("[" + logprefix + "] ")}
546         var copiers sync.WaitGroup
547         copiers.Add(1)
548         go func() {
549                 io.Copy(logwriter, stderr)
550                 copiers.Done()
551         }()
552         copiers.Add(1)
553         go func() {
554                 if opts.output == nil {
555                         io.Copy(logwriter, stdout)
556                 } else {
557                         io.Copy(opts.output, stdout)
558                 }
559                 copiers.Done()
560         }()
561
562         if strings.HasPrefix(dir, "/") {
563                 cmd.Dir = dir
564         } else {
565                 cmd.Dir = filepath.Join(super.SourcePath, dir)
566         }
567         env := append([]string(nil), opts.env...)
568         env = append(env, super.environ...)
569         cmd.Env = dedupEnv(env)
570
571         if opts.user != "" {
572                 // Note: We use this approach instead of "sudo"
573                 // because in certain circumstances (we are pid 1 in a
574                 // docker container, and our passenger child process
575                 // changes to pgid 1) the intermediate sudo process
576                 // notices we have the same pgid as our child and
577                 // refuses to propagate signals from us to our child,
578                 // so we can't signal/shutdown our passenger/rails
579                 // apps. "chpst" or "setuidgid" would work, but these
580                 // few lines avoid depending on runit/daemontools.
581                 u, err := user.Lookup(opts.user)
582                 if err != nil {
583                         return fmt.Errorf("user.Lookup(%q): %w", opts.user, err)
584                 }
585                 uid, _ := strconv.Atoi(u.Uid)
586                 gid, _ := strconv.Atoi(u.Gid)
587                 cmd.SysProcAttr = &syscall.SysProcAttr{
588                         Credential: &syscall.Credential{
589                                 Uid: uint32(uid),
590                                 Gid: uint32(gid),
591                         },
592                 }
593         }
594
595         exited := false
596         defer func() { exited = true }()
597         go func() {
598                 <-ctx.Done()
599                 log := ctxlog.FromContext(ctx).WithFields(logrus.Fields{"dir": dir, "cmdline": cmdline})
600                 for !exited {
601                         if cmd.Process == nil {
602                                 log.Debug("waiting for child process to start")
603                                 time.Sleep(time.Second / 2)
604                         } else {
605                                 log.WithField("PID", cmd.Process.Pid).Debug("sending SIGTERM")
606                                 cmd.Process.Signal(syscall.SIGTERM)
607                                 time.Sleep(5 * time.Second)
608                                 if !exited {
609                                         stdout.Close()
610                                         stderr.Close()
611                                         log.WithField("PID", cmd.Process.Pid).Warn("still waiting for child process to exit 5s after SIGTERM")
612                                 }
613                         }
614                 }
615         }()
616
617         err = cmd.Start()
618         if err != nil {
619                 return err
620         }
621         copiers.Wait()
622         err = cmd.Wait()
623         if ctx.Err() != nil {
624                 // Return "context canceled", instead of the "killed"
625                 // error that was probably caused by the context being
626                 // canceled.
627                 return ctx.Err()
628         } else if err != nil {
629                 return fmt.Errorf("%s: error: %v", cmdline, err)
630         }
631         return nil
632 }
633
634 func (super *Supervisor) autofillConfig(cfg *arvados.Config) error {
635         cluster, err := cfg.GetCluster("")
636         if err != nil {
637                 return err
638         }
639         usedPort := map[string]bool{}
640         nextPort := func(host string) (string, error) {
641                 for {
642                         port, err := availablePort(host)
643                         if err != nil {
644                                 port, err = availablePort(super.ListenHost)
645                         }
646                         if err != nil {
647                                 return "", err
648                         }
649                         if usedPort[port] {
650                                 continue
651                         }
652                         usedPort[port] = true
653                         return port, nil
654                 }
655         }
656         if cluster.Services.Controller.ExternalURL.Host == "" {
657                 h, p, err := net.SplitHostPort(super.ControllerAddr)
658                 if err != nil {
659                         return fmt.Errorf("SplitHostPort(ControllerAddr): %w", err)
660                 }
661                 if h == "" {
662                         h = super.ListenHost
663                 }
664                 if p == "0" {
665                         p, err = nextPort(h)
666                         if err != nil {
667                                 return err
668                         }
669                 }
670                 cluster.Services.Controller.ExternalURL = arvados.URL{Scheme: "https", Host: net.JoinHostPort(h, p), Path: "/"}
671         }
672         defaultExtHost, _, err := net.SplitHostPort(cluster.Services.Controller.ExternalURL.Host)
673         if err != nil {
674                 return fmt.Errorf("SplitHostPort(Controller.ExternalURL.Host): %w", err)
675         }
676         for _, svc := range []*arvados.Service{
677                 &cluster.Services.Controller,
678                 &cluster.Services.DispatchCloud,
679                 &cluster.Services.GitHTTP,
680                 &cluster.Services.Health,
681                 &cluster.Services.Keepproxy,
682                 &cluster.Services.Keepstore,
683                 &cluster.Services.RailsAPI,
684                 &cluster.Services.WebDAV,
685                 &cluster.Services.WebDAVDownload,
686                 &cluster.Services.Websocket,
687                 &cluster.Services.Workbench1,
688                 &cluster.Services.Workbench2,
689         } {
690                 if svc == &cluster.Services.DispatchCloud && super.ClusterType == "test" {
691                         continue
692                 }
693                 if svc.ExternalURL.Host == "" {
694                         port, err := nextPort(defaultExtHost)
695                         if err != nil {
696                                 return err
697                         }
698                         host := net.JoinHostPort(defaultExtHost, port)
699                         if svc == &cluster.Services.Controller ||
700                                 svc == &cluster.Services.GitHTTP ||
701                                 svc == &cluster.Services.Health ||
702                                 svc == &cluster.Services.Keepproxy ||
703                                 svc == &cluster.Services.WebDAV ||
704                                 svc == &cluster.Services.WebDAVDownload ||
705                                 svc == &cluster.Services.Workbench1 ||
706                                 svc == &cluster.Services.Workbench2 {
707                                 svc.ExternalURL = arvados.URL{Scheme: "https", Host: host, Path: "/"}
708                         } else if svc == &cluster.Services.Websocket {
709                                 svc.ExternalURL = arvados.URL{Scheme: "wss", Host: host, Path: "/websocket"}
710                         }
711                 }
712                 if super.NoWorkbench1 && svc == &cluster.Services.Workbench1 ||
713                         super.NoWorkbench2 && svc == &cluster.Services.Workbench2 {
714                         // When workbench1 is disabled, it gets an
715                         // ExternalURL (so we have a valid listening
716                         // port to write in our Nginx config) but no
717                         // InternalURLs (so health checker doesn't
718                         // complain).
719                         continue
720                 }
721                 if len(svc.InternalURLs) == 0 {
722                         port, err := nextPort(super.ListenHost)
723                         if err != nil {
724                                 return err
725                         }
726                         host := net.JoinHostPort(super.ListenHost, port)
727                         svc.InternalURLs = map[arvados.URL]arvados.ServiceInstance{
728                                 {Scheme: "http", Host: host, Path: "/"}: {},
729                         }
730                 }
731         }
732         if super.ClusterType != "production" {
733                 if cluster.SystemRootToken == "" {
734                         cluster.SystemRootToken = randomHexString(64)
735                 }
736                 if cluster.ManagementToken == "" {
737                         cluster.ManagementToken = randomHexString(64)
738                 }
739                 if cluster.Collections.BlobSigningKey == "" {
740                         cluster.Collections.BlobSigningKey = randomHexString(64)
741                 }
742                 if cluster.Users.AnonymousUserToken == "" {
743                         cluster.Users.AnonymousUserToken = randomHexString(64)
744                 }
745                 if cluster.Containers.DispatchPrivateKey == "" {
746                         buf, err := ioutil.ReadFile(filepath.Join(super.SourcePath, "lib", "dispatchcloud", "test", "sshkey_dispatch"))
747                         if err != nil {
748                                 return err
749                         }
750                         cluster.Containers.DispatchPrivateKey = string(buf)
751                 }
752                 cluster.TLS.Insecure = true
753         }
754         if super.ClusterType == "test" {
755                 // Add a second keepstore process.
756                 port, err := nextPort(super.ListenHost)
757                 if err != nil {
758                         return err
759                 }
760                 host := net.JoinHostPort(super.ListenHost, port)
761                 cluster.Services.Keepstore.InternalURLs[arvados.URL{Scheme: "http", Host: host, Path: "/"}] = arvados.ServiceInstance{}
762
763                 // Create a directory-backed volume for each keepstore
764                 // process.
765                 cluster.Volumes = map[string]arvados.Volume{}
766                 for url := range cluster.Services.Keepstore.InternalURLs {
767                         volnum := len(cluster.Volumes)
768                         datadir := fmt.Sprintf("%s/keep%d.data", super.tempdir, volnum)
769                         if _, err = os.Stat(datadir + "/."); err == nil {
770                         } else if !os.IsNotExist(err) {
771                                 return err
772                         } else if err = os.Mkdir(datadir, 0755); err != nil {
773                                 return err
774                         }
775                         cluster.Volumes[fmt.Sprintf(cluster.ClusterID+"-nyw5e-%015d", volnum)] = arvados.Volume{
776                                 Driver:           "Directory",
777                                 DriverParameters: json.RawMessage(fmt.Sprintf(`{"Root":%q}`, datadir)),
778                                 AccessViaHosts: map[arvados.URL]arvados.VolumeAccess{
779                                         url: {},
780                                 },
781                                 StorageClasses: map[string]bool{
782                                         "default": true,
783                                         "foo":     true,
784                                         "bar":     true,
785                                 },
786                         }
787                 }
788                 cluster.StorageClasses = map[string]arvados.StorageClassConfig{
789                         "default": {Default: true},
790                         "foo":     {},
791                         "bar":     {},
792                 }
793         }
794         if super.OwnTemporaryDatabase {
795                 port, err := nextPort("localhost")
796                 if err != nil {
797                         return err
798                 }
799                 cluster.PostgreSQL.Connection = arvados.PostgreSQLConnection{
800                         "client_encoding": "utf8",
801                         "host":            "localhost",
802                         "port":            port,
803                         "dbname":          "arvados_test",
804                         "user":            "arvados",
805                         "password":        "insecure_arvados_test",
806                 }
807         }
808
809         cfg.Clusters[cluster.ClusterID] = *cluster
810         return nil
811 }
812
813 func addrIsLocal(addr string) (bool, error) {
814         return true, nil
815         listener, err := net.Listen("tcp", addr)
816         if err == nil {
817                 listener.Close()
818                 return true, nil
819         } else if strings.Contains(err.Error(), "cannot assign requested address") {
820                 return false, nil
821         } else {
822                 return false, err
823         }
824 }
825
826 func randomHexString(chars int) string {
827         b := make([]byte, chars/2)
828         _, err := rand.Read(b)
829         if err != nil {
830                 panic(err)
831         }
832         return fmt.Sprintf("%x", b)
833 }
834
835 func internalPort(svc arvados.Service) (host, port string, err error) {
836         if len(svc.InternalURLs) > 1 {
837                 return "", "", errors.New("internalPort() doesn't work with multiple InternalURLs")
838         }
839         for u := range svc.InternalURLs {
840                 u := url.URL(u)
841                 host, port = u.Hostname(), u.Port()
842                 switch {
843                 case port != "":
844                 case u.Scheme == "https", u.Scheme == "ws":
845                         port = "443"
846                 default:
847                         port = "80"
848                 }
849                 return
850         }
851         return "", "", fmt.Errorf("service has no InternalURLs")
852 }
853
854 func externalPort(svc arvados.Service) (string, error) {
855         u := url.URL(svc.ExternalURL)
856         if p := u.Port(); p != "" {
857                 return p, nil
858         } else if u.Scheme == "https" || u.Scheme == "wss" {
859                 return "443", nil
860         } else {
861                 return "80", nil
862         }
863 }
864
865 func availablePort(host string) (string, error) {
866         ln, err := net.Listen("tcp", net.JoinHostPort(host, "0"))
867         if err != nil {
868                 return "", err
869         }
870         defer ln.Close()
871         _, port, err := net.SplitHostPort(ln.Addr().String())
872         if err != nil {
873                 return "", err
874         }
875         return port, nil
876 }
877
878 // Try to connect to addr until it works, then close ch. Give up if
879 // ctx cancels.
880 func waitForConnect(ctx context.Context, addr string) error {
881         dialer := net.Dialer{Timeout: time.Second}
882         for ctx.Err() == nil {
883                 conn, err := dialer.DialContext(ctx, "tcp", addr)
884                 if err != nil {
885                         time.Sleep(time.Second / 10)
886                         continue
887                 }
888                 conn.Close()
889                 return nil
890         }
891         return ctx.Err()
892 }
893
894 func copyConfig(cfg *arvados.Config) *arvados.Config {
895         pr, pw := io.Pipe()
896         go func() {
897                 err := json.NewEncoder(pw).Encode(cfg)
898                 if err != nil {
899                         panic(err)
900                 }
901                 pw.Close()
902         }()
903         cfg2 := new(arvados.Config)
904         err := json.NewDecoder(pr).Decode(cfg2)
905         if err != nil {
906                 panic(err)
907         }
908         return cfg2
909 }
910
911 func watchConfig(ctx context.Context, logger logrus.FieldLogger, cfgPath string, prevcfg *arvados.Config, fn func()) {
912         watcher, err := fsnotify.NewWatcher()
913         if err != nil {
914                 logger.WithError(err).Error("fsnotify setup failed")
915                 return
916         }
917         defer watcher.Close()
918
919         err = watcher.Add(cfgPath)
920         if err != nil {
921                 logger.WithError(err).Error("fsnotify watcher failed")
922                 return
923         }
924
925         for {
926                 select {
927                 case <-ctx.Done():
928                         return
929                 case err, ok := <-watcher.Errors:
930                         if !ok {
931                                 return
932                         }
933                         logger.WithError(err).Warn("fsnotify watcher reported error")
934                 case _, ok := <-watcher.Events:
935                         if !ok {
936                                 return
937                         }
938                         for len(watcher.Events) > 0 {
939                                 <-watcher.Events
940                         }
941                         loader := config.NewLoader(&bytes.Buffer{}, &logrus.Logger{Out: ioutil.Discard})
942                         loader.Path = cfgPath
943                         loader.SkipAPICalls = true
944                         cfg, err := loader.Load()
945                         if err != nil {
946                                 logger.WithError(err).Warn("error reloading config file after change detected; ignoring new config for now")
947                         } else if reflect.DeepEqual(cfg, prevcfg) {
948                                 logger.Debug("config file changed but is still DeepEqual to the existing config")
949                         } else {
950                                 logger.Debug("config changed, notifying supervisor")
951                                 fn()
952                                 prevcfg = cfg
953                         }
954                 }
955         }
956 }