16578: Few more copy script tweaks
[arvados.git] / lib / boot / supervisor.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package boot
6
7 import (
8         "bytes"
9         "context"
10         "crypto/rand"
11         "encoding/json"
12         "errors"
13         "fmt"
14         "io"
15         "io/ioutil"
16         "net"
17         "os"
18         "os/exec"
19         "os/signal"
20         "os/user"
21         "path/filepath"
22         "reflect"
23         "strings"
24         "sync"
25         "syscall"
26         "time"
27
28         "git.arvados.org/arvados.git/lib/config"
29         "git.arvados.org/arvados.git/lib/service"
30         "git.arvados.org/arvados.git/sdk/go/arvados"
31         "git.arvados.org/arvados.git/sdk/go/ctxlog"
32         "git.arvados.org/arvados.git/sdk/go/health"
33         "github.com/fsnotify/fsnotify"
34         "github.com/sirupsen/logrus"
35 )
36
37 type Supervisor struct {
38         SourcePath           string // e.g., /home/username/src/arvados
39         SourceVersion        string // e.g., acbd1324...
40         ClusterType          string // e.g., production
41         ListenHost           string // e.g., localhost
42         ControllerAddr       string // e.g., 127.0.0.1:8000
43         OwnTemporaryDatabase bool
44         Stderr               io.Writer
45
46         logger  logrus.FieldLogger
47         cluster *arvados.Cluster
48
49         ctx           context.Context
50         cancel        context.CancelFunc
51         done          chan struct{} // closed when child procs/services have shut down
52         err           error         // error that caused shutdown (valid when done is closed)
53         healthChecker *health.Aggregator
54         tasksReady    map[string]chan bool
55         waitShutdown  sync.WaitGroup
56
57         tempdir    string
58         configfile string
59         environ    []string // for child processes
60 }
61
62 func (super *Supervisor) Start(ctx context.Context, cfg *arvados.Config, cfgPath string) {
63         super.ctx, super.cancel = context.WithCancel(ctx)
64         super.done = make(chan struct{})
65
66         go func() {
67                 defer close(super.done)
68
69                 sigch := make(chan os.Signal)
70                 signal.Notify(sigch, syscall.SIGINT, syscall.SIGTERM)
71                 defer signal.Stop(sigch)
72                 go func() {
73                         for sig := range sigch {
74                                 super.logger.WithField("signal", sig).Info("caught signal")
75                                 if super.err == nil {
76                                         super.err = fmt.Errorf("caught signal %s", sig)
77                                 }
78                                 super.cancel()
79                         }
80                 }()
81
82                 hupch := make(chan os.Signal)
83                 signal.Notify(hupch, syscall.SIGHUP)
84                 defer signal.Stop(hupch)
85                 go func() {
86                         for sig := range hupch {
87                                 super.logger.WithField("signal", sig).Info("caught signal")
88                                 if super.err == nil {
89                                         super.err = errNeedConfigReload
90                                 }
91                                 super.cancel()
92                         }
93                 }()
94
95                 if cfgPath != "" && cfgPath != "-" && cfg.AutoReloadConfig {
96                         go watchConfig(super.ctx, super.logger, cfgPath, copyConfig(cfg), func() {
97                                 if super.err == nil {
98                                         super.err = errNeedConfigReload
99                                 }
100                                 super.cancel()
101                         })
102                 }
103
104                 err := super.run(cfg)
105                 if err != nil {
106                         super.logger.WithError(err).Warn("supervisor shut down")
107                         if super.err == nil {
108                                 super.err = err
109                         }
110                 }
111         }()
112 }
113
114 func (super *Supervisor) Wait() error {
115         <-super.done
116         return super.err
117 }
118
119 func (super *Supervisor) run(cfg *arvados.Config) error {
120         defer super.cancel()
121
122         cwd, err := os.Getwd()
123         if err != nil {
124                 return err
125         }
126         if !strings.HasPrefix(super.SourcePath, "/") {
127                 super.SourcePath = filepath.Join(cwd, super.SourcePath)
128         }
129         super.SourcePath, err = filepath.EvalSymlinks(super.SourcePath)
130         if err != nil {
131                 return err
132         }
133
134         super.tempdir, err = ioutil.TempDir("", "arvados-server-boot-")
135         if err != nil {
136                 return err
137         }
138         defer os.RemoveAll(super.tempdir)
139         if err := os.Mkdir(filepath.Join(super.tempdir, "bin"), 0755); err != nil {
140                 return err
141         }
142
143         // Fill in any missing config keys, and write the resulting
144         // config in the temp dir for child services to use.
145         err = super.autofillConfig(cfg)
146         if err != nil {
147                 return err
148         }
149         conffile, err := os.OpenFile(filepath.Join(super.tempdir, "config.yml"), os.O_CREATE|os.O_WRONLY, 0644)
150         if err != nil {
151                 return err
152         }
153         defer conffile.Close()
154         err = json.NewEncoder(conffile).Encode(cfg)
155         if err != nil {
156                 return err
157         }
158         err = conffile.Close()
159         if err != nil {
160                 return err
161         }
162         super.configfile = conffile.Name()
163
164         super.environ = os.Environ()
165         super.cleanEnv([]string{"ARVADOS_"})
166         super.setEnv("ARVADOS_CONFIG", super.configfile)
167         super.setEnv("RAILS_ENV", super.ClusterType)
168         super.setEnv("TMPDIR", super.tempdir)
169         super.prependEnv("PATH", super.tempdir+"/bin:/var/lib/arvados/bin:")
170
171         super.cluster, err = cfg.GetCluster("")
172         if err != nil {
173                 return err
174         }
175         // Now that we have the config, replace the bootstrap logger
176         // with a new one according to the logging config.
177         loglevel := super.cluster.SystemLogs.LogLevel
178         if s := os.Getenv("ARVADOS_DEBUG"); s != "" && s != "0" {
179                 loglevel = "debug"
180         }
181         super.logger = ctxlog.New(super.Stderr, super.cluster.SystemLogs.Format, loglevel).WithFields(logrus.Fields{
182                 "PID": os.Getpid(),
183         })
184
185         if super.SourceVersion == "" {
186                 // Find current source tree version.
187                 var buf bytes.Buffer
188                 err = super.RunProgram(super.ctx, ".", &buf, nil, "git", "diff", "--shortstat")
189                 if err != nil {
190                         return err
191                 }
192                 dirty := buf.Len() > 0
193                 buf.Reset()
194                 err = super.RunProgram(super.ctx, ".", &buf, nil, "git", "log", "-n1", "--format=%H")
195                 if err != nil {
196                         return err
197                 }
198                 super.SourceVersion = strings.TrimSpace(buf.String())
199                 if dirty {
200                         super.SourceVersion += "+uncommitted"
201                 }
202         } else {
203                 return errors.New("specifying a version to run is not yet supported")
204         }
205
206         _, err = super.installGoProgram(super.ctx, "cmd/arvados-server")
207         if err != nil {
208                 return err
209         }
210         err = super.setupRubyEnv()
211         if err != nil {
212                 return err
213         }
214
215         tasks := []supervisedTask{
216                 createCertificates{},
217                 runPostgreSQL{},
218                 runNginx{},
219                 runServiceCommand{name: "controller", svc: super.cluster.Services.Controller, depends: []supervisedTask{runPostgreSQL{}}},
220                 runGoProgram{src: "services/arv-git-httpd", svc: super.cluster.Services.GitHTTP},
221                 runGoProgram{src: "services/health", svc: super.cluster.Services.Health},
222                 runGoProgram{src: "services/keepproxy", svc: super.cluster.Services.Keepproxy, depends: []supervisedTask{runPassenger{src: "services/api"}}},
223                 runGoProgram{src: "services/keepstore", svc: super.cluster.Services.Keepstore},
224                 runGoProgram{src: "services/keep-web", svc: super.cluster.Services.WebDAV},
225                 runServiceCommand{name: "ws", svc: super.cluster.Services.Websocket, depends: []supervisedTask{runPostgreSQL{}}},
226                 installPassenger{src: "services/api"},
227                 runPassenger{src: "services/api", svc: super.cluster.Services.RailsAPI, depends: []supervisedTask{createCertificates{}, runPostgreSQL{}, installPassenger{src: "services/api"}}},
228                 installPassenger{src: "apps/workbench", depends: []supervisedTask{installPassenger{src: "services/api"}}}, // dependency ensures workbench doesn't delay api startup
229                 runPassenger{src: "apps/workbench", svc: super.cluster.Services.Workbench1, depends: []supervisedTask{installPassenger{src: "apps/workbench"}}},
230                 seedDatabase{},
231         }
232         if super.ClusterType != "test" {
233                 tasks = append(tasks,
234                         runServiceCommand{name: "dispatch-cloud", svc: super.cluster.Services.Controller},
235                         runGoProgram{src: "services/keep-balance"},
236                 )
237         }
238         super.tasksReady = map[string]chan bool{}
239         for _, task := range tasks {
240                 super.tasksReady[task.String()] = make(chan bool)
241         }
242         for _, task := range tasks {
243                 task := task
244                 fail := func(err error) {
245                         if super.ctx.Err() != nil {
246                                 return
247                         }
248                         super.cancel()
249                         super.logger.WithField("task", task.String()).WithError(err).Error("task failed")
250                 }
251                 go func() {
252                         super.logger.WithField("task", task.String()).Info("starting")
253                         err := task.Run(super.ctx, fail, super)
254                         if err != nil {
255                                 fail(err)
256                                 return
257                         }
258                         close(super.tasksReady[task.String()])
259                 }()
260         }
261         err = super.wait(super.ctx, tasks...)
262         if err != nil {
263                 return err
264         }
265         super.logger.Info("all startup tasks are complete; starting health checks")
266         super.healthChecker = &health.Aggregator{Cluster: super.cluster}
267         <-super.ctx.Done()
268         super.logger.Info("shutting down")
269         super.waitShutdown.Wait()
270         return super.ctx.Err()
271 }
272
273 func (super *Supervisor) wait(ctx context.Context, tasks ...supervisedTask) error {
274         for _, task := range tasks {
275                 ch, ok := super.tasksReady[task.String()]
276                 if !ok {
277                         return fmt.Errorf("no such task: %s", task)
278                 }
279                 super.logger.WithField("task", task.String()).Info("waiting")
280                 select {
281                 case <-ch:
282                         super.logger.WithField("task", task.String()).Info("ready")
283                 case <-ctx.Done():
284                         super.logger.WithField("task", task.String()).Info("task was never ready")
285                         return ctx.Err()
286                 }
287         }
288         return nil
289 }
290
291 func (super *Supervisor) Stop() {
292         super.cancel()
293         <-super.done
294 }
295
296 func (super *Supervisor) WaitReady() (*arvados.URL, bool) {
297         ticker := time.NewTicker(time.Second)
298         defer ticker.Stop()
299         for waiting := "all"; waiting != ""; {
300                 select {
301                 case <-ticker.C:
302                 case <-super.ctx.Done():
303                         return nil, false
304                 }
305                 if super.healthChecker == nil {
306                         // not set up yet
307                         continue
308                 }
309                 resp := super.healthChecker.ClusterHealth()
310                 // The overall health check (resp.Health=="OK") might
311                 // never pass due to missing components (like
312                 // arvados-dispatch-cloud in a test cluster), so
313                 // instead we wait for all configured components to
314                 // pass.
315                 waiting = ""
316                 for target, check := range resp.Checks {
317                         if check.Health != "OK" {
318                                 waiting += " " + target
319                         }
320                 }
321                 if waiting != "" {
322                         super.logger.WithField("targets", waiting[1:]).Info("waiting")
323                 }
324         }
325         u := super.cluster.Services.Controller.ExternalURL
326         return &u, true
327 }
328
329 func (super *Supervisor) prependEnv(key, prepend string) {
330         for i, s := range super.environ {
331                 if strings.HasPrefix(s, key+"=") {
332                         super.environ[i] = key + "=" + prepend + s[len(key)+1:]
333                         return
334                 }
335         }
336         super.environ = append(super.environ, key+"="+prepend)
337 }
338
339 func (super *Supervisor) cleanEnv(prefixes []string) {
340         var cleaned []string
341         for _, s := range super.environ {
342                 drop := false
343                 for _, p := range prefixes {
344                         if strings.HasPrefix(s, p) {
345                                 drop = true
346                                 break
347                         }
348                 }
349                 if !drop {
350                         cleaned = append(cleaned, s)
351                 }
352         }
353         super.environ = cleaned
354 }
355
356 func (super *Supervisor) setEnv(key, val string) {
357         for i, s := range super.environ {
358                 if strings.HasPrefix(s, key+"=") {
359                         super.environ[i] = key + "=" + val
360                         return
361                 }
362         }
363         super.environ = append(super.environ, key+"="+val)
364 }
365
366 // Remove all but the first occurrence of each env var.
367 func dedupEnv(in []string) []string {
368         saw := map[string]bool{}
369         var out []string
370         for _, kv := range in {
371                 if split := strings.Index(kv, "="); split < 1 {
372                         panic("invalid environment var: " + kv)
373                 } else if saw[kv[:split]] {
374                         continue
375                 } else {
376                         saw[kv[:split]] = true
377                         out = append(out, kv)
378                 }
379         }
380         return out
381 }
382
383 func (super *Supervisor) installGoProgram(ctx context.Context, srcpath string) (string, error) {
384         _, basename := filepath.Split(srcpath)
385         bindir := filepath.Join(super.tempdir, "bin")
386         binfile := filepath.Join(bindir, basename)
387         err := super.RunProgram(ctx, filepath.Join(super.SourcePath, srcpath), nil, []string{"GOBIN=" + bindir}, "go", "install", "-ldflags", "-X git.arvados.org/arvados.git/lib/cmd.version="+super.SourceVersion+" -X main.version="+super.SourceVersion)
388         return binfile, err
389 }
390
391 func (super *Supervisor) usingRVM() bool {
392         return os.Getenv("rvm_path") != ""
393 }
394
395 func (super *Supervisor) setupRubyEnv() error {
396         if !super.usingRVM() {
397                 // (If rvm is in use, assume the caller has everything
398                 // set up as desired)
399                 super.cleanEnv([]string{
400                         "GEM_HOME=",
401                         "GEM_PATH=",
402                 })
403                 gem := "gem"
404                 if _, err := os.Stat("/var/lib/arvados/bin/gem"); err == nil {
405                         gem = "/var/lib/arvados/bin/gem"
406                 }
407                 cmd := exec.Command(gem, "env", "gempath")
408                 cmd.Env = super.environ
409                 buf, err := cmd.Output() // /var/lib/arvados/.gem/ruby/2.5.0/bin:...
410                 if err != nil || len(buf) == 0 {
411                         return fmt.Errorf("gem env gempath: %v", err)
412                 }
413                 gempath := string(bytes.Split(buf, []byte{':'})[0])
414                 super.prependEnv("PATH", gempath+"/bin:")
415                 super.setEnv("GEM_HOME", gempath)
416                 super.setEnv("GEM_PATH", gempath)
417         }
418         // Passenger install doesn't work unless $HOME is ~user
419         u, err := user.Current()
420         if err != nil {
421                 return err
422         }
423         super.setEnv("HOME", u.HomeDir)
424         return nil
425 }
426
427 func (super *Supervisor) lookPath(prog string) string {
428         for _, val := range super.environ {
429                 if strings.HasPrefix(val, "PATH=") {
430                         for _, dir := range filepath.SplitList(val[5:]) {
431                                 path := filepath.Join(dir, prog)
432                                 if fi, err := os.Stat(path); err == nil && fi.Mode()&0111 != 0 {
433                                         return path
434                                 }
435                         }
436                 }
437         }
438         return prog
439 }
440
441 // Run prog with args, using dir as working directory. If ctx is
442 // cancelled while the child is running, RunProgram terminates the
443 // child, waits for it to exit, then returns.
444 //
445 // Child's environment will have our env vars, plus any given in env.
446 //
447 // Child's stdout will be written to output if non-nil, otherwise the
448 // boot command's stderr.
449 func (super *Supervisor) RunProgram(ctx context.Context, dir string, output io.Writer, env []string, prog string, args ...string) error {
450         cmdline := fmt.Sprintf("%s", append([]string{prog}, args...))
451         super.logger.WithField("command", cmdline).WithField("dir", dir).Info("executing")
452
453         logprefix := prog
454         if logprefix == "setuidgid" && len(args) >= 3 {
455                 logprefix = args[2]
456         }
457         logprefix = strings.TrimPrefix(logprefix, super.tempdir+"/bin/")
458         if logprefix == "bundle" && len(args) > 2 && args[0] == "exec" {
459                 logprefix = args[1]
460         } else if logprefix == "arvados-server" && len(args) > 1 {
461                 logprefix = args[0]
462         }
463         if !strings.HasPrefix(dir, "/") {
464                 logprefix = dir + ": " + logprefix
465         }
466
467         cmd := exec.Command(super.lookPath(prog), args...)
468         stdout, err := cmd.StdoutPipe()
469         if err != nil {
470                 return err
471         }
472         stderr, err := cmd.StderrPipe()
473         if err != nil {
474                 return err
475         }
476         logwriter := &service.LogPrefixer{Writer: super.Stderr, Prefix: []byte("[" + logprefix + "] ")}
477         var copiers sync.WaitGroup
478         copiers.Add(1)
479         go func() {
480                 io.Copy(logwriter, stderr)
481                 copiers.Done()
482         }()
483         copiers.Add(1)
484         go func() {
485                 if output == nil {
486                         io.Copy(logwriter, stdout)
487                 } else {
488                         io.Copy(output, stdout)
489                 }
490                 copiers.Done()
491         }()
492
493         if strings.HasPrefix(dir, "/") {
494                 cmd.Dir = dir
495         } else {
496                 cmd.Dir = filepath.Join(super.SourcePath, dir)
497         }
498         env = append([]string(nil), env...)
499         env = append(env, super.environ...)
500         cmd.Env = dedupEnv(env)
501
502         exited := false
503         defer func() { exited = true }()
504         go func() {
505                 <-ctx.Done()
506                 log := ctxlog.FromContext(ctx).WithFields(logrus.Fields{"dir": dir, "cmdline": cmdline})
507                 for !exited {
508                         if cmd.Process == nil {
509                                 log.Debug("waiting for child process to start")
510                                 time.Sleep(time.Second / 2)
511                         } else {
512                                 log.WithField("PID", cmd.Process.Pid).Debug("sending SIGTERM")
513                                 cmd.Process.Signal(syscall.SIGTERM)
514                                 time.Sleep(5 * time.Second)
515                                 if !exited {
516                                         stdout.Close()
517                                         stderr.Close()
518                                         log.WithField("PID", cmd.Process.Pid).Warn("still waiting for child process to exit 5s after SIGTERM")
519                                 }
520                         }
521                 }
522         }()
523
524         err = cmd.Start()
525         if err != nil {
526                 return err
527         }
528         copiers.Wait()
529         err = cmd.Wait()
530         if ctx.Err() != nil {
531                 // Return "context canceled", instead of the "killed"
532                 // error that was probably caused by the context being
533                 // canceled.
534                 return ctx.Err()
535         } else if err != nil {
536                 return fmt.Errorf("%s: error: %v", cmdline, err)
537         }
538         return nil
539 }
540
541 func (super *Supervisor) autofillConfig(cfg *arvados.Config) error {
542         cluster, err := cfg.GetCluster("")
543         if err != nil {
544                 return err
545         }
546         usedPort := map[string]bool{}
547         nextPort := func(host string) string {
548                 for {
549                         port, err := availablePort(host)
550                         if err != nil {
551                                 panic(err)
552                         }
553                         if usedPort[port] {
554                                 continue
555                         }
556                         usedPort[port] = true
557                         return port
558                 }
559         }
560         if cluster.Services.Controller.ExternalURL.Host == "" {
561                 h, p, err := net.SplitHostPort(super.ControllerAddr)
562                 if err != nil {
563                         return err
564                 }
565                 if h == "" {
566                         h = super.ListenHost
567                 }
568                 if p == "0" {
569                         p = nextPort(h)
570                 }
571                 cluster.Services.Controller.ExternalURL = arvados.URL{Scheme: "https", Host: net.JoinHostPort(h, p), Path: "/"}
572         }
573         for _, svc := range []*arvados.Service{
574                 &cluster.Services.Controller,
575                 &cluster.Services.DispatchCloud,
576                 &cluster.Services.GitHTTP,
577                 &cluster.Services.Health,
578                 &cluster.Services.Keepproxy,
579                 &cluster.Services.Keepstore,
580                 &cluster.Services.RailsAPI,
581                 &cluster.Services.WebDAV,
582                 &cluster.Services.WebDAVDownload,
583                 &cluster.Services.Websocket,
584                 &cluster.Services.Workbench1,
585         } {
586                 if svc == &cluster.Services.DispatchCloud && super.ClusterType == "test" {
587                         continue
588                 }
589                 if svc.ExternalURL.Host == "" {
590                         if svc == &cluster.Services.Controller ||
591                                 svc == &cluster.Services.GitHTTP ||
592                                 svc == &cluster.Services.Health ||
593                                 svc == &cluster.Services.Keepproxy ||
594                                 svc == &cluster.Services.WebDAV ||
595                                 svc == &cluster.Services.WebDAVDownload ||
596                                 svc == &cluster.Services.Workbench1 {
597                                 svc.ExternalURL = arvados.URL{Scheme: "https", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost)), Path: "/"}
598                         } else if svc == &cluster.Services.Websocket {
599                                 svc.ExternalURL = arvados.URL{Scheme: "wss", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost)), Path: "/websocket"}
600                         }
601                 }
602                 if len(svc.InternalURLs) == 0 {
603                         svc.InternalURLs = map[arvados.URL]arvados.ServiceInstance{
604                                 {Scheme: "http", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost)), Path: "/"}: {},
605                         }
606                 }
607         }
608         if cluster.SystemRootToken == "" {
609                 cluster.SystemRootToken = randomHexString(64)
610         }
611         if cluster.ManagementToken == "" {
612                 cluster.ManagementToken = randomHexString(64)
613         }
614         if cluster.API.RailsSessionSecretToken == "" {
615                 cluster.API.RailsSessionSecretToken = randomHexString(64)
616         }
617         if cluster.Collections.BlobSigningKey == "" {
618                 cluster.Collections.BlobSigningKey = randomHexString(64)
619         }
620         if cluster.Users.AnonymousUserToken == "" {
621                 cluster.Users.AnonymousUserToken = randomHexString(64)
622         }
623
624         if super.ClusterType != "production" && cluster.Containers.DispatchPrivateKey == "" {
625                 buf, err := ioutil.ReadFile(filepath.Join(super.SourcePath, "lib", "dispatchcloud", "test", "sshkey_dispatch"))
626                 if err != nil {
627                         return err
628                 }
629                 cluster.Containers.DispatchPrivateKey = string(buf)
630         }
631         if super.ClusterType != "production" {
632                 cluster.TLS.Insecure = true
633         }
634         if super.ClusterType == "test" {
635                 // Add a second keepstore process.
636                 cluster.Services.Keepstore.InternalURLs[arvados.URL{Scheme: "http", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost)), Path: "/"}] = arvados.ServiceInstance{}
637
638                 // Create a directory-backed volume for each keepstore
639                 // process.
640                 cluster.Volumes = map[string]arvados.Volume{}
641                 for url := range cluster.Services.Keepstore.InternalURLs {
642                         volnum := len(cluster.Volumes)
643                         datadir := fmt.Sprintf("%s/keep%d.data", super.tempdir, volnum)
644                         if _, err = os.Stat(datadir + "/."); err == nil {
645                         } else if !os.IsNotExist(err) {
646                                 return err
647                         } else if err = os.Mkdir(datadir, 0755); err != nil {
648                                 return err
649                         }
650                         cluster.Volumes[fmt.Sprintf(cluster.ClusterID+"-nyw5e-%015d", volnum)] = arvados.Volume{
651                                 Driver:           "Directory",
652                                 DriverParameters: json.RawMessage(fmt.Sprintf(`{"Root":%q}`, datadir)),
653                                 AccessViaHosts: map[arvados.URL]arvados.VolumeAccess{
654                                         url: {},
655                                 },
656                         }
657                 }
658         }
659         if super.OwnTemporaryDatabase {
660                 cluster.PostgreSQL.Connection = arvados.PostgreSQLConnection{
661                         "client_encoding": "utf8",
662                         "host":            "localhost",
663                         "port":            nextPort(super.ListenHost),
664                         "dbname":          "arvados_test",
665                         "user":            "arvados",
666                         "password":        "insecure_arvados_test",
667                 }
668         }
669
670         cfg.Clusters[cluster.ClusterID] = *cluster
671         return nil
672 }
673
674 func addrIsLocal(addr string) (bool, error) {
675         return true, nil
676         listener, err := net.Listen("tcp", addr)
677         if err == nil {
678                 listener.Close()
679                 return true, nil
680         } else if strings.Contains(err.Error(), "cannot assign requested address") {
681                 return false, nil
682         } else {
683                 return false, err
684         }
685 }
686
687 func randomHexString(chars int) string {
688         b := make([]byte, chars/2)
689         _, err := rand.Read(b)
690         if err != nil {
691                 panic(err)
692         }
693         return fmt.Sprintf("%x", b)
694 }
695
696 func internalPort(svc arvados.Service) (string, error) {
697         if len(svc.InternalURLs) > 1 {
698                 return "", errors.New("internalPort() doesn't work with multiple InternalURLs")
699         }
700         for u := range svc.InternalURLs {
701                 if _, p, err := net.SplitHostPort(u.Host); err != nil {
702                         return "", err
703                 } else if p != "" {
704                         return p, nil
705                 } else if u.Scheme == "https" {
706                         return "443", nil
707                 } else {
708                         return "80", nil
709                 }
710         }
711         return "", fmt.Errorf("service has no InternalURLs")
712 }
713
714 func externalPort(svc arvados.Service) (string, error) {
715         if _, p, err := net.SplitHostPort(svc.ExternalURL.Host); err != nil {
716                 return "", err
717         } else if p != "" {
718                 return p, nil
719         } else if svc.ExternalURL.Scheme == "https" {
720                 return "443", nil
721         } else {
722                 return "80", nil
723         }
724 }
725
726 func availablePort(host string) (string, error) {
727         ln, err := net.Listen("tcp", net.JoinHostPort(host, "0"))
728         if err != nil {
729                 return "", err
730         }
731         defer ln.Close()
732         _, port, err := net.SplitHostPort(ln.Addr().String())
733         if err != nil {
734                 return "", err
735         }
736         return port, nil
737 }
738
739 // Try to connect to addr until it works, then close ch. Give up if
740 // ctx cancels.
741 func waitForConnect(ctx context.Context, addr string) error {
742         dialer := net.Dialer{Timeout: time.Second}
743         for ctx.Err() == nil {
744                 conn, err := dialer.DialContext(ctx, "tcp", addr)
745                 if err != nil {
746                         time.Sleep(time.Second / 10)
747                         continue
748                 }
749                 conn.Close()
750                 return nil
751         }
752         return ctx.Err()
753 }
754
755 func copyConfig(cfg *arvados.Config) *arvados.Config {
756         pr, pw := io.Pipe()
757         go func() {
758                 err := json.NewEncoder(pw).Encode(cfg)
759                 if err != nil {
760                         panic(err)
761                 }
762                 pw.Close()
763         }()
764         cfg2 := new(arvados.Config)
765         err := json.NewDecoder(pr).Decode(cfg2)
766         if err != nil {
767                 panic(err)
768         }
769         return cfg2
770 }
771
772 func watchConfig(ctx context.Context, logger logrus.FieldLogger, cfgPath string, prevcfg *arvados.Config, fn func()) {
773         watcher, err := fsnotify.NewWatcher()
774         if err != nil {
775                 logger.WithError(err).Error("fsnotify setup failed")
776                 return
777         }
778         defer watcher.Close()
779
780         err = watcher.Add(cfgPath)
781         if err != nil {
782                 logger.WithError(err).Error("fsnotify watcher failed")
783                 return
784         }
785
786         for {
787                 select {
788                 case <-ctx.Done():
789                         return
790                 case err, ok := <-watcher.Errors:
791                         if !ok {
792                                 return
793                         }
794                         logger.WithError(err).Warn("fsnotify watcher reported error")
795                 case _, ok := <-watcher.Events:
796                         if !ok {
797                                 return
798                         }
799                         for len(watcher.Events) > 0 {
800                                 <-watcher.Events
801                         }
802                         loader := config.NewLoader(&bytes.Buffer{}, &logrus.Logger{Out: ioutil.Discard})
803                         loader.Path = cfgPath
804                         loader.SkipAPICalls = true
805                         cfg, err := loader.Load()
806                         if err != nil {
807                                 logger.WithError(err).Warn("error reloading config file after change detected; ignoring new config for now")
808                         } else if reflect.DeepEqual(cfg, prevcfg) {
809                                 logger.Debug("config file changed but is still DeepEqual to the existing config")
810                         } else {
811                                 logger.Debug("config changed, notifying supervisor")
812                                 fn()
813                                 prevcfg = cfg
814                         }
815                 }
816         }
817 }