17170: Merge branch 'master'
[arvados.git] / lib / boot / supervisor.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package boot
6
7 import (
8         "bytes"
9         "context"
10         "crypto/rand"
11         "encoding/json"
12         "errors"
13         "fmt"
14         "io"
15         "io/ioutil"
16         "net"
17         "net/url"
18         "os"
19         "os/exec"
20         "os/signal"
21         "os/user"
22         "path/filepath"
23         "reflect"
24         "strconv"
25         "strings"
26         "sync"
27         "syscall"
28         "time"
29
30         "git.arvados.org/arvados.git/lib/config"
31         "git.arvados.org/arvados.git/lib/service"
32         "git.arvados.org/arvados.git/sdk/go/arvados"
33         "git.arvados.org/arvados.git/sdk/go/ctxlog"
34         "git.arvados.org/arvados.git/sdk/go/health"
35         "github.com/fsnotify/fsnotify"
36         "github.com/sirupsen/logrus"
37 )
38
39 type Supervisor struct {
40         SourcePath           string // e.g., /home/username/src/arvados
41         SourceVersion        string // e.g., acbd1324...
42         ClusterType          string // e.g., production
43         ListenHost           string // e.g., localhost
44         ControllerAddr       string // e.g., 127.0.0.1:8000
45         OwnTemporaryDatabase bool
46         Stderr               io.Writer
47
48         logger  logrus.FieldLogger
49         cluster *arvados.Cluster
50
51         ctx           context.Context
52         cancel        context.CancelFunc
53         done          chan struct{} // closed when child procs/services have shut down
54         err           error         // error that caused shutdown (valid when done is closed)
55         healthChecker *health.Aggregator
56         tasksReady    map[string]chan bool
57         waitShutdown  sync.WaitGroup
58
59         bindir     string
60         tempdir    string
61         wwwtempdir string
62         configfile string
63         environ    []string // for child processes
64 }
65
66 func (super *Supervisor) Cluster() *arvados.Cluster { return super.cluster }
67
68 func (super *Supervisor) Start(ctx context.Context, cfg *arvados.Config, cfgPath string) {
69         super.ctx, super.cancel = context.WithCancel(ctx)
70         super.done = make(chan struct{})
71
72         go func() {
73                 defer close(super.done)
74
75                 sigch := make(chan os.Signal)
76                 signal.Notify(sigch, syscall.SIGINT, syscall.SIGTERM)
77                 defer signal.Stop(sigch)
78                 go func() {
79                         for sig := range sigch {
80                                 super.logger.WithField("signal", sig).Info("caught signal")
81                                 if super.err == nil {
82                                         super.err = fmt.Errorf("caught signal %s", sig)
83                                 }
84                                 super.cancel()
85                         }
86                 }()
87
88                 hupch := make(chan os.Signal)
89                 signal.Notify(hupch, syscall.SIGHUP)
90                 defer signal.Stop(hupch)
91                 go func() {
92                         for sig := range hupch {
93                                 super.logger.WithField("signal", sig).Info("caught signal")
94                                 if super.err == nil {
95                                         super.err = errNeedConfigReload
96                                 }
97                                 super.cancel()
98                         }
99                 }()
100
101                 if cfgPath != "" && cfgPath != "-" && cfg.AutoReloadConfig {
102                         go watchConfig(super.ctx, super.logger, cfgPath, copyConfig(cfg), func() {
103                                 if super.err == nil {
104                                         super.err = errNeedConfigReload
105                                 }
106                                 super.cancel()
107                         })
108                 }
109
110                 err := super.run(cfg)
111                 if err != nil {
112                         super.logger.WithError(err).Warn("supervisor shut down")
113                         if super.err == nil {
114                                 super.err = err
115                         }
116                 }
117         }()
118 }
119
120 func (super *Supervisor) Wait() error {
121         <-super.done
122         return super.err
123 }
124
125 func (super *Supervisor) run(cfg *arvados.Config) error {
126         defer super.cancel()
127
128         cwd, err := os.Getwd()
129         if err != nil {
130                 return err
131         }
132         if !strings.HasPrefix(super.SourcePath, "/") {
133                 super.SourcePath = filepath.Join(cwd, super.SourcePath)
134         }
135         super.SourcePath, err = filepath.EvalSymlinks(super.SourcePath)
136         if err != nil {
137                 return err
138         }
139
140         // Choose bin and temp dirs: /var/lib/arvados/... in
141         // production, transient tempdir otherwise.
142         if super.ClusterType == "production" {
143                 // These dirs have already been created by
144                 // "arvados-server install" (or by extracting a
145                 // package).
146                 super.tempdir = "/var/lib/arvados/tmp"
147                 super.wwwtempdir = "/var/lib/arvados/wwwtmp"
148                 super.bindir = "/var/lib/arvados/bin"
149         } else {
150                 super.tempdir, err = ioutil.TempDir("", "arvados-server-boot-")
151                 if err != nil {
152                         return err
153                 }
154                 defer os.RemoveAll(super.tempdir)
155                 super.wwwtempdir = super.tempdir
156                 super.bindir = filepath.Join(super.tempdir, "bin")
157                 if err := os.Mkdir(super.bindir, 0755); err != nil {
158                         return err
159                 }
160         }
161
162         // Fill in any missing config keys, and write the resulting
163         // config in the temp dir for child services to use.
164         err = super.autofillConfig(cfg)
165         if err != nil {
166                 return err
167         }
168         conffile, err := os.OpenFile(filepath.Join(super.wwwtempdir, "config.yml"), os.O_CREATE|os.O_WRONLY, 0644)
169         if err != nil {
170                 return err
171         }
172         defer conffile.Close()
173         err = json.NewEncoder(conffile).Encode(cfg)
174         if err != nil {
175                 return err
176         }
177         err = conffile.Close()
178         if err != nil {
179                 return err
180         }
181         super.configfile = conffile.Name()
182
183         super.environ = os.Environ()
184         super.cleanEnv([]string{"ARVADOS_"})
185         super.setEnv("ARVADOS_CONFIG", super.configfile)
186         super.setEnv("RAILS_ENV", super.ClusterType)
187         super.setEnv("TMPDIR", super.tempdir)
188         super.prependEnv("PATH", "/var/lib/arvados/bin:")
189         if super.ClusterType != "production" {
190                 super.prependEnv("PATH", super.tempdir+"/bin:")
191         }
192
193         super.cluster, err = cfg.GetCluster("")
194         if err != nil {
195                 return err
196         }
197         // Now that we have the config, replace the bootstrap logger
198         // with a new one according to the logging config.
199         loglevel := super.cluster.SystemLogs.LogLevel
200         if s := os.Getenv("ARVADOS_DEBUG"); s != "" && s != "0" {
201                 loglevel = "debug"
202         }
203         super.logger = ctxlog.New(super.Stderr, super.cluster.SystemLogs.Format, loglevel).WithFields(logrus.Fields{
204                 "PID": os.Getpid(),
205         })
206
207         if super.SourceVersion == "" && super.ClusterType == "production" {
208                 // don't need SourceVersion
209         } else if super.SourceVersion == "" {
210                 // Find current source tree version.
211                 var buf bytes.Buffer
212                 err = super.RunProgram(super.ctx, ".", runOptions{output: &buf}, "git", "diff", "--shortstat")
213                 if err != nil {
214                         return err
215                 }
216                 dirty := buf.Len() > 0
217                 buf.Reset()
218                 err = super.RunProgram(super.ctx, ".", runOptions{output: &buf}, "git", "log", "-n1", "--format=%H")
219                 if err != nil {
220                         return err
221                 }
222                 super.SourceVersion = strings.TrimSpace(buf.String())
223                 if dirty {
224                         super.SourceVersion += "+uncommitted"
225                 }
226         } else {
227                 return errors.New("specifying a version to run is not yet supported")
228         }
229
230         _, err = super.installGoProgram(super.ctx, "cmd/arvados-server")
231         if err != nil {
232                 return err
233         }
234         err = super.setupRubyEnv()
235         if err != nil {
236                 return err
237         }
238
239         tasks := []supervisedTask{
240                 createCertificates{},
241                 runPostgreSQL{},
242                 runNginx{},
243                 runServiceCommand{name: "controller", svc: super.cluster.Services.Controller, depends: []supervisedTask{seedDatabase{}}},
244                 runGoProgram{src: "services/arv-git-httpd", svc: super.cluster.Services.GitHTTP},
245                 runGoProgram{src: "services/health", svc: super.cluster.Services.Health},
246                 runGoProgram{src: "services/keepproxy", svc: super.cluster.Services.Keepproxy, depends: []supervisedTask{runPassenger{src: "services/api"}}},
247                 runGoProgram{src: "services/keepstore", svc: super.cluster.Services.Keepstore},
248                 runGoProgram{src: "services/keep-web", svc: super.cluster.Services.WebDAV},
249                 runServiceCommand{name: "ws", svc: super.cluster.Services.Websocket, depends: []supervisedTask{seedDatabase{}}},
250                 installPassenger{src: "services/api"},
251                 runPassenger{src: "services/api", varlibdir: "railsapi", svc: super.cluster.Services.RailsAPI, depends: []supervisedTask{createCertificates{}, seedDatabase{}, installPassenger{src: "services/api"}}},
252                 installPassenger{src: "apps/workbench", depends: []supervisedTask{seedDatabase{}}}, // dependency ensures workbench doesn't delay api install/startup
253                 runPassenger{src: "apps/workbench", varlibdir: "workbench1", svc: super.cluster.Services.Workbench1, depends: []supervisedTask{installPassenger{src: "apps/workbench"}}},
254                 seedDatabase{},
255         }
256         if super.ClusterType != "test" {
257                 tasks = append(tasks,
258                         runServiceCommand{name: "dispatch-cloud", svc: super.cluster.Services.DispatchCloud},
259                         runGoProgram{src: "services/keep-balance", svc: super.cluster.Services.Keepbalance},
260                 )
261         }
262         super.tasksReady = map[string]chan bool{}
263         for _, task := range tasks {
264                 super.tasksReady[task.String()] = make(chan bool)
265         }
266         for _, task := range tasks {
267                 task := task
268                 fail := func(err error) {
269                         if super.ctx.Err() != nil {
270                                 return
271                         }
272                         super.cancel()
273                         super.logger.WithField("task", task.String()).WithError(err).Error("task failed")
274                 }
275                 go func() {
276                         super.logger.WithField("task", task.String()).Info("starting")
277                         err := task.Run(super.ctx, fail, super)
278                         if err != nil {
279                                 fail(err)
280                                 return
281                         }
282                         close(super.tasksReady[task.String()])
283                 }()
284         }
285         err = super.wait(super.ctx, tasks...)
286         if err != nil {
287                 return err
288         }
289         super.logger.Info("all startup tasks are complete; starting health checks")
290         super.healthChecker = &health.Aggregator{Cluster: super.cluster}
291         <-super.ctx.Done()
292         super.logger.Info("shutting down")
293         super.waitShutdown.Wait()
294         return super.ctx.Err()
295 }
296
297 func (super *Supervisor) wait(ctx context.Context, tasks ...supervisedTask) error {
298         for _, task := range tasks {
299                 ch, ok := super.tasksReady[task.String()]
300                 if !ok {
301                         return fmt.Errorf("no such task: %s", task)
302                 }
303                 super.logger.WithField("task", task.String()).Info("waiting")
304                 select {
305                 case <-ch:
306                         super.logger.WithField("task", task.String()).Info("ready")
307                 case <-ctx.Done():
308                         super.logger.WithField("task", task.String()).Info("task was never ready")
309                         return ctx.Err()
310                 }
311         }
312         return nil
313 }
314
315 func (super *Supervisor) Stop() {
316         super.cancel()
317         <-super.done
318 }
319
320 func (super *Supervisor) WaitReady() (*arvados.URL, bool) {
321         ticker := time.NewTicker(time.Second)
322         defer ticker.Stop()
323         for waiting := "all"; waiting != ""; {
324                 select {
325                 case <-ticker.C:
326                 case <-super.ctx.Done():
327                         return nil, false
328                 }
329                 if super.healthChecker == nil {
330                         // not set up yet
331                         continue
332                 }
333                 resp := super.healthChecker.ClusterHealth()
334                 // The overall health check (resp.Health=="OK") might
335                 // never pass due to missing components (like
336                 // arvados-dispatch-cloud in a test cluster), so
337                 // instead we wait for all configured components to
338                 // pass.
339                 waiting = ""
340                 for target, check := range resp.Checks {
341                         if check.Health != "OK" {
342                                 waiting += " " + target
343                         }
344                 }
345                 if waiting != "" {
346                         super.logger.WithField("targets", waiting[1:]).Info("waiting")
347                 }
348         }
349         u := super.cluster.Services.Controller.ExternalURL
350         return &u, true
351 }
352
353 func (super *Supervisor) prependEnv(key, prepend string) {
354         for i, s := range super.environ {
355                 if strings.HasPrefix(s, key+"=") {
356                         super.environ[i] = key + "=" + prepend + s[len(key)+1:]
357                         return
358                 }
359         }
360         super.environ = append(super.environ, key+"="+prepend)
361 }
362
363 func (super *Supervisor) cleanEnv(prefixes []string) {
364         var cleaned []string
365         for _, s := range super.environ {
366                 drop := false
367                 for _, p := range prefixes {
368                         if strings.HasPrefix(s, p) {
369                                 drop = true
370                                 break
371                         }
372                 }
373                 if !drop {
374                         cleaned = append(cleaned, s)
375                 }
376         }
377         super.environ = cleaned
378 }
379
380 func (super *Supervisor) setEnv(key, val string) {
381         for i, s := range super.environ {
382                 if strings.HasPrefix(s, key+"=") {
383                         super.environ[i] = key + "=" + val
384                         return
385                 }
386         }
387         super.environ = append(super.environ, key+"="+val)
388 }
389
390 // Remove all but the first occurrence of each env var.
391 func dedupEnv(in []string) []string {
392         saw := map[string]bool{}
393         var out []string
394         for _, kv := range in {
395                 if split := strings.Index(kv, "="); split < 1 {
396                         panic("invalid environment var: " + kv)
397                 } else if saw[kv[:split]] {
398                         continue
399                 } else {
400                         saw[kv[:split]] = true
401                         out = append(out, kv)
402                 }
403         }
404         return out
405 }
406
407 func (super *Supervisor) installGoProgram(ctx context.Context, srcpath string) (string, error) {
408         _, basename := filepath.Split(srcpath)
409         binfile := filepath.Join(super.bindir, basename)
410         if super.ClusterType == "production" {
411                 return binfile, nil
412         }
413         err := super.RunProgram(ctx, filepath.Join(super.SourcePath, srcpath), runOptions{env: []string{"GOBIN=" + super.bindir}}, "go", "install", "-ldflags", "-X git.arvados.org/arvados.git/lib/cmd.version="+super.SourceVersion+" -X main.version="+super.SourceVersion)
414         return binfile, err
415 }
416
417 func (super *Supervisor) usingRVM() bool {
418         return os.Getenv("rvm_path") != ""
419 }
420
421 func (super *Supervisor) setupRubyEnv() error {
422         if !super.usingRVM() {
423                 // (If rvm is in use, assume the caller has everything
424                 // set up as desired)
425                 super.cleanEnv([]string{
426                         "GEM_HOME=",
427                         "GEM_PATH=",
428                 })
429                 gem := "gem"
430                 if _, err := os.Stat("/var/lib/arvados/bin/gem"); err == nil || super.ClusterType == "production" {
431                         gem = "/var/lib/arvados/bin/gem"
432                 }
433                 cmd := exec.Command(gem, "env", "gempath")
434                 if super.ClusterType == "production" {
435                         cmd.Args = append([]string{"sudo", "-u", "www-data", "-E", "HOME=/var/www"}, cmd.Args...)
436                         path, err := exec.LookPath("sudo")
437                         if err != nil {
438                                 return fmt.Errorf("LookPath(\"sudo\"): %w", err)
439                         }
440                         cmd.Path = path
441                 }
442                 cmd.Stderr = super.Stderr
443                 cmd.Env = super.environ
444                 buf, err := cmd.Output() // /var/lib/arvados/.gem/ruby/2.5.0/bin:...
445                 if err != nil || len(buf) == 0 {
446                         return fmt.Errorf("gem env gempath: %w", err)
447                 }
448                 gempath := string(bytes.Split(buf, []byte{':'})[0])
449                 super.prependEnv("PATH", gempath+"/bin:")
450                 super.setEnv("GEM_HOME", gempath)
451                 super.setEnv("GEM_PATH", gempath)
452         }
453         // Passenger install doesn't work unless $HOME is ~user
454         u, err := user.Current()
455         if err != nil {
456                 return err
457         }
458         super.setEnv("HOME", u.HomeDir)
459         return nil
460 }
461
462 func (super *Supervisor) lookPath(prog string) string {
463         for _, val := range super.environ {
464                 if strings.HasPrefix(val, "PATH=") {
465                         for _, dir := range filepath.SplitList(val[5:]) {
466                                 path := filepath.Join(dir, prog)
467                                 if fi, err := os.Stat(path); err == nil && fi.Mode()&0111 != 0 {
468                                         return path
469                                 }
470                         }
471                 }
472         }
473         return prog
474 }
475
476 type runOptions struct {
477         output io.Writer // attach stdout
478         env    []string  // add/replace environment variables
479         user   string    // run as specified user
480 }
481
482 // RunProgram runs prog with args, using dir as working directory. If ctx is
483 // cancelled while the child is running, RunProgram terminates the child, waits
484 // for it to exit, then returns.
485 //
486 // Child's environment will have our env vars, plus any given in env.
487 //
488 // Child's stdout will be written to output if non-nil, otherwise the
489 // boot command's stderr.
490 func (super *Supervisor) RunProgram(ctx context.Context, dir string, opts runOptions, prog string, args ...string) error {
491         cmdline := fmt.Sprintf("%s", append([]string{prog}, args...))
492         super.logger.WithField("command", cmdline).WithField("dir", dir).Info("executing")
493
494         logprefix := prog
495         {
496                 innerargs := args
497                 if logprefix == "sudo" {
498                         for i := 0; i < len(args); i++ {
499                                 if args[i] == "-u" {
500                                         i++
501                                 } else if args[i] == "-E" || strings.Contains(args[i], "=") {
502                                 } else {
503                                         logprefix = args[i]
504                                         innerargs = args[i+1:]
505                                         break
506                                 }
507                         }
508                 }
509                 logprefix = strings.TrimPrefix(logprefix, "/var/lib/arvados/bin/")
510                 logprefix = strings.TrimPrefix(logprefix, super.tempdir+"/bin/")
511                 if logprefix == "bundle" && len(innerargs) > 2 && innerargs[0] == "exec" {
512                         _, dirbase := filepath.Split(dir)
513                         logprefix = innerargs[1] + "@" + dirbase
514                 } else if logprefix == "arvados-server" && len(args) > 1 {
515                         logprefix = args[0]
516                 }
517                 if !strings.HasPrefix(dir, "/") {
518                         logprefix = dir + ": " + logprefix
519                 }
520         }
521
522         cmd := exec.Command(super.lookPath(prog), args...)
523         stdout, err := cmd.StdoutPipe()
524         if err != nil {
525                 return err
526         }
527         stderr, err := cmd.StderrPipe()
528         if err != nil {
529                 return err
530         }
531         logwriter := &service.LogPrefixer{Writer: super.Stderr, Prefix: []byte("[" + logprefix + "] ")}
532         var copiers sync.WaitGroup
533         copiers.Add(1)
534         go func() {
535                 io.Copy(logwriter, stderr)
536                 copiers.Done()
537         }()
538         copiers.Add(1)
539         go func() {
540                 if opts.output == nil {
541                         io.Copy(logwriter, stdout)
542                 } else {
543                         io.Copy(opts.output, stdout)
544                 }
545                 copiers.Done()
546         }()
547
548         if strings.HasPrefix(dir, "/") {
549                 cmd.Dir = dir
550         } else {
551                 cmd.Dir = filepath.Join(super.SourcePath, dir)
552         }
553         env := append([]string(nil), opts.env...)
554         env = append(env, super.environ...)
555         cmd.Env = dedupEnv(env)
556
557         if opts.user != "" {
558                 // Note: We use this approach instead of "sudo"
559                 // because in certain circumstances (we are pid 1 in a
560                 // docker container, and our passenger child process
561                 // changes to pgid 1) the intermediate sudo process
562                 // notices we have the same pgid as our child and
563                 // refuses to propagate signals from us to our child,
564                 // so we can't signal/shutdown our passenger/rails
565                 // apps. "chpst" or "setuidgid" would work, but these
566                 // few lines avoid depending on runit/daemontools.
567                 u, err := user.Lookup(opts.user)
568                 if err != nil {
569                         return fmt.Errorf("user.Lookup(%q): %w", opts.user, err)
570                 }
571                 uid, _ := strconv.Atoi(u.Uid)
572                 gid, _ := strconv.Atoi(u.Gid)
573                 cmd.SysProcAttr = &syscall.SysProcAttr{
574                         Credential: &syscall.Credential{
575                                 Uid: uint32(uid),
576                                 Gid: uint32(gid),
577                         },
578                 }
579         }
580
581         exited := false
582         defer func() { exited = true }()
583         go func() {
584                 <-ctx.Done()
585                 log := ctxlog.FromContext(ctx).WithFields(logrus.Fields{"dir": dir, "cmdline": cmdline})
586                 for !exited {
587                         if cmd.Process == nil {
588                                 log.Debug("waiting for child process to start")
589                                 time.Sleep(time.Second / 2)
590                         } else {
591                                 log.WithField("PID", cmd.Process.Pid).Debug("sending SIGTERM")
592                                 cmd.Process.Signal(syscall.SIGTERM)
593                                 time.Sleep(5 * time.Second)
594                                 if !exited {
595                                         stdout.Close()
596                                         stderr.Close()
597                                         log.WithField("PID", cmd.Process.Pid).Warn("still waiting for child process to exit 5s after SIGTERM")
598                                 }
599                         }
600                 }
601         }()
602
603         err = cmd.Start()
604         if err != nil {
605                 return err
606         }
607         copiers.Wait()
608         err = cmd.Wait()
609         if ctx.Err() != nil {
610                 // Return "context canceled", instead of the "killed"
611                 // error that was probably caused by the context being
612                 // canceled.
613                 return ctx.Err()
614         } else if err != nil {
615                 return fmt.Errorf("%s: error: %v", cmdline, err)
616         }
617         return nil
618 }
619
620 func (super *Supervisor) autofillConfig(cfg *arvados.Config) error {
621         cluster, err := cfg.GetCluster("")
622         if err != nil {
623                 return err
624         }
625         usedPort := map[string]bool{}
626         nextPort := func(host string) string {
627                 for {
628                         port, err := availablePort(host)
629                         if err != nil {
630                                 panic(err)
631                         }
632                         if usedPort[port] {
633                                 continue
634                         }
635                         usedPort[port] = true
636                         return port
637                 }
638         }
639         if cluster.Services.Controller.ExternalURL.Host == "" {
640                 h, p, err := net.SplitHostPort(super.ControllerAddr)
641                 if err != nil {
642                         return err
643                 }
644                 if h == "" {
645                         h = super.ListenHost
646                 }
647                 if p == "0" {
648                         p = nextPort(h)
649                 }
650                 cluster.Services.Controller.ExternalURL = arvados.URL{Scheme: "https", Host: net.JoinHostPort(h, p), Path: "/"}
651         }
652         for _, svc := range []*arvados.Service{
653                 &cluster.Services.Controller,
654                 &cluster.Services.DispatchCloud,
655                 &cluster.Services.GitHTTP,
656                 &cluster.Services.Health,
657                 &cluster.Services.Keepproxy,
658                 &cluster.Services.Keepstore,
659                 &cluster.Services.RailsAPI,
660                 &cluster.Services.WebDAV,
661                 &cluster.Services.WebDAVDownload,
662                 &cluster.Services.Websocket,
663                 &cluster.Services.Workbench1,
664         } {
665                 if svc == &cluster.Services.DispatchCloud && super.ClusterType == "test" {
666                         continue
667                 }
668                 if svc.ExternalURL.Host == "" {
669                         if svc == &cluster.Services.Controller ||
670                                 svc == &cluster.Services.GitHTTP ||
671                                 svc == &cluster.Services.Health ||
672                                 svc == &cluster.Services.Keepproxy ||
673                                 svc == &cluster.Services.WebDAV ||
674                                 svc == &cluster.Services.WebDAVDownload ||
675                                 svc == &cluster.Services.Workbench1 {
676                                 svc.ExternalURL = arvados.URL{Scheme: "https", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost)), Path: "/"}
677                         } else if svc == &cluster.Services.Websocket {
678                                 svc.ExternalURL = arvados.URL{Scheme: "wss", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost)), Path: "/websocket"}
679                         }
680                 }
681                 if len(svc.InternalURLs) == 0 {
682                         svc.InternalURLs = map[arvados.URL]arvados.ServiceInstance{
683                                 {Scheme: "http", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost)), Path: "/"}: {},
684                         }
685                 }
686         }
687         if super.ClusterType != "production" {
688                 if cluster.SystemRootToken == "" {
689                         cluster.SystemRootToken = randomHexString(64)
690                 }
691                 if cluster.ManagementToken == "" {
692                         cluster.ManagementToken = randomHexString(64)
693                 }
694                 if cluster.Collections.BlobSigningKey == "" {
695                         cluster.Collections.BlobSigningKey = randomHexString(64)
696                 }
697                 if cluster.Users.AnonymousUserToken == "" {
698                         cluster.Users.AnonymousUserToken = randomHexString(64)
699                 }
700                 if cluster.Containers.DispatchPrivateKey == "" {
701                         buf, err := ioutil.ReadFile(filepath.Join(super.SourcePath, "lib", "dispatchcloud", "test", "sshkey_dispatch"))
702                         if err != nil {
703                                 return err
704                         }
705                         cluster.Containers.DispatchPrivateKey = string(buf)
706                 }
707                 cluster.TLS.Insecure = true
708         }
709         if super.ClusterType == "test" {
710                 // Add a second keepstore process.
711                 cluster.Services.Keepstore.InternalURLs[arvados.URL{Scheme: "http", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost)), Path: "/"}] = arvados.ServiceInstance{}
712
713                 // Create a directory-backed volume for each keepstore
714                 // process.
715                 cluster.Volumes = map[string]arvados.Volume{}
716                 for url := range cluster.Services.Keepstore.InternalURLs {
717                         volnum := len(cluster.Volumes)
718                         datadir := fmt.Sprintf("%s/keep%d.data", super.tempdir, volnum)
719                         if _, err = os.Stat(datadir + "/."); err == nil {
720                         } else if !os.IsNotExist(err) {
721                                 return err
722                         } else if err = os.Mkdir(datadir, 0755); err != nil {
723                                 return err
724                         }
725                         cluster.Volumes[fmt.Sprintf(cluster.ClusterID+"-nyw5e-%015d", volnum)] = arvados.Volume{
726                                 Driver:           "Directory",
727                                 DriverParameters: json.RawMessage(fmt.Sprintf(`{"Root":%q}`, datadir)),
728                                 AccessViaHosts: map[arvados.URL]arvados.VolumeAccess{
729                                         url: {},
730                                 },
731                         }
732                 }
733         }
734         if super.OwnTemporaryDatabase {
735                 cluster.PostgreSQL.Connection = arvados.PostgreSQLConnection{
736                         "client_encoding": "utf8",
737                         "host":            "localhost",
738                         "port":            nextPort(super.ListenHost),
739                         "dbname":          "arvados_test",
740                         "user":            "arvados",
741                         "password":        "insecure_arvados_test",
742                 }
743         }
744
745         cfg.Clusters[cluster.ClusterID] = *cluster
746         return nil
747 }
748
749 func addrIsLocal(addr string) (bool, error) {
750         return true, nil
751         listener, err := net.Listen("tcp", addr)
752         if err == nil {
753                 listener.Close()
754                 return true, nil
755         } else if strings.Contains(err.Error(), "cannot assign requested address") {
756                 return false, nil
757         } else {
758                 return false, err
759         }
760 }
761
762 func randomHexString(chars int) string {
763         b := make([]byte, chars/2)
764         _, err := rand.Read(b)
765         if err != nil {
766                 panic(err)
767         }
768         return fmt.Sprintf("%x", b)
769 }
770
771 func internalPort(svc arvados.Service) (string, error) {
772         if len(svc.InternalURLs) > 1 {
773                 return "", errors.New("internalPort() doesn't work with multiple InternalURLs")
774         }
775         for u := range svc.InternalURLs {
776                 u := url.URL(u)
777                 if p := u.Port(); p != "" {
778                         return p, nil
779                 } else if u.Scheme == "https" || u.Scheme == "ws" {
780                         return "443", nil
781                 } else {
782                         return "80", nil
783                 }
784         }
785         return "", fmt.Errorf("service has no InternalURLs")
786 }
787
788 func externalPort(svc arvados.Service) (string, error) {
789         u := url.URL(svc.ExternalURL)
790         if p := u.Port(); p != "" {
791                 return p, nil
792         } else if u.Scheme == "https" || u.Scheme == "wss" {
793                 return "443", nil
794         } else {
795                 return "80", nil
796         }
797 }
798
799 func availablePort(host string) (string, error) {
800         ln, err := net.Listen("tcp", net.JoinHostPort(host, "0"))
801         if err != nil {
802                 return "", err
803         }
804         defer ln.Close()
805         _, port, err := net.SplitHostPort(ln.Addr().String())
806         if err != nil {
807                 return "", err
808         }
809         return port, nil
810 }
811
812 // Try to connect to addr until it works, then close ch. Give up if
813 // ctx cancels.
814 func waitForConnect(ctx context.Context, addr string) error {
815         dialer := net.Dialer{Timeout: time.Second}
816         for ctx.Err() == nil {
817                 conn, err := dialer.DialContext(ctx, "tcp", addr)
818                 if err != nil {
819                         time.Sleep(time.Second / 10)
820                         continue
821                 }
822                 conn.Close()
823                 return nil
824         }
825         return ctx.Err()
826 }
827
828 func copyConfig(cfg *arvados.Config) *arvados.Config {
829         pr, pw := io.Pipe()
830         go func() {
831                 err := json.NewEncoder(pw).Encode(cfg)
832                 if err != nil {
833                         panic(err)
834                 }
835                 pw.Close()
836         }()
837         cfg2 := new(arvados.Config)
838         err := json.NewDecoder(pr).Decode(cfg2)
839         if err != nil {
840                 panic(err)
841         }
842         return cfg2
843 }
844
845 func watchConfig(ctx context.Context, logger logrus.FieldLogger, cfgPath string, prevcfg *arvados.Config, fn func()) {
846         watcher, err := fsnotify.NewWatcher()
847         if err != nil {
848                 logger.WithError(err).Error("fsnotify setup failed")
849                 return
850         }
851         defer watcher.Close()
852
853         err = watcher.Add(cfgPath)
854         if err != nil {
855                 logger.WithError(err).Error("fsnotify watcher failed")
856                 return
857         }
858
859         for {
860                 select {
861                 case <-ctx.Done():
862                         return
863                 case err, ok := <-watcher.Errors:
864                         if !ok {
865                                 return
866                         }
867                         logger.WithError(err).Warn("fsnotify watcher reported error")
868                 case _, ok := <-watcher.Events:
869                         if !ok {
870                                 return
871                         }
872                         for len(watcher.Events) > 0 {
873                                 <-watcher.Events
874                         }
875                         loader := config.NewLoader(&bytes.Buffer{}, &logrus.Logger{Out: ioutil.Discard})
876                         loader.Path = cfgPath
877                         loader.SkipAPICalls = true
878                         cfg, err := loader.Load()
879                         if err != nil {
880                                 logger.WithError(err).Warn("error reloading config file after change detected; ignoring new config for now")
881                         } else if reflect.DeepEqual(cfg, prevcfg) {
882                                 logger.Debug("config file changed but is still DeepEqual to the existing config")
883                         } else {
884                                 logger.Debug("config changed, notifying supervisor")
885                                 fn()
886                                 prevcfg = cfg
887                         }
888                 }
889         }
890 }