17215: Merge branch 'master' into 17215-aws-roles-a-d-c
[arvados.git] / lib / boot / supervisor.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package boot
6
7 import (
8         "bytes"
9         "context"
10         "crypto/rand"
11         "encoding/json"
12         "errors"
13         "fmt"
14         "io"
15         "io/ioutil"
16         "net"
17         "os"
18         "os/exec"
19         "os/signal"
20         "os/user"
21         "path/filepath"
22         "reflect"
23         "strings"
24         "sync"
25         "syscall"
26         "time"
27
28         "git.arvados.org/arvados.git/lib/config"
29         "git.arvados.org/arvados.git/lib/service"
30         "git.arvados.org/arvados.git/sdk/go/arvados"
31         "git.arvados.org/arvados.git/sdk/go/ctxlog"
32         "git.arvados.org/arvados.git/sdk/go/health"
33         "github.com/fsnotify/fsnotify"
34         "github.com/sirupsen/logrus"
35 )
36
37 type Supervisor struct {
38         SourcePath           string // e.g., /home/username/src/arvados
39         SourceVersion        string // e.g., acbd1324...
40         ClusterType          string // e.g., production
41         ListenHost           string // e.g., localhost
42         ControllerAddr       string // e.g., 127.0.0.1:8000
43         OwnTemporaryDatabase bool
44         Stderr               io.Writer
45
46         logger  logrus.FieldLogger
47         cluster *arvados.Cluster
48
49         ctx           context.Context
50         cancel        context.CancelFunc
51         done          chan struct{} // closed when child procs/services have shut down
52         err           error         // error that caused shutdown (valid when done is closed)
53         healthChecker *health.Aggregator
54         tasksReady    map[string]chan bool
55         waitShutdown  sync.WaitGroup
56
57         tempdir    string
58         configfile string
59         environ    []string // for child processes
60 }
61
62 func (super *Supervisor) Cluster() *arvados.Cluster { return super.cluster }
63
64 func (super *Supervisor) Start(ctx context.Context, cfg *arvados.Config, cfgPath string) {
65         super.ctx, super.cancel = context.WithCancel(ctx)
66         super.done = make(chan struct{})
67
68         go func() {
69                 defer close(super.done)
70
71                 sigch := make(chan os.Signal)
72                 signal.Notify(sigch, syscall.SIGINT, syscall.SIGTERM)
73                 defer signal.Stop(sigch)
74                 go func() {
75                         for sig := range sigch {
76                                 super.logger.WithField("signal", sig).Info("caught signal")
77                                 if super.err == nil {
78                                         super.err = fmt.Errorf("caught signal %s", sig)
79                                 }
80                                 super.cancel()
81                         }
82                 }()
83
84                 hupch := make(chan os.Signal)
85                 signal.Notify(hupch, syscall.SIGHUP)
86                 defer signal.Stop(hupch)
87                 go func() {
88                         for sig := range hupch {
89                                 super.logger.WithField("signal", sig).Info("caught signal")
90                                 if super.err == nil {
91                                         super.err = errNeedConfigReload
92                                 }
93                                 super.cancel()
94                         }
95                 }()
96
97                 if cfgPath != "" && cfgPath != "-" && cfg.AutoReloadConfig {
98                         go watchConfig(super.ctx, super.logger, cfgPath, copyConfig(cfg), func() {
99                                 if super.err == nil {
100                                         super.err = errNeedConfigReload
101                                 }
102                                 super.cancel()
103                         })
104                 }
105
106                 err := super.run(cfg)
107                 if err != nil {
108                         super.logger.WithError(err).Warn("supervisor shut down")
109                         if super.err == nil {
110                                 super.err = err
111                         }
112                 }
113         }()
114 }
115
116 func (super *Supervisor) Wait() error {
117         <-super.done
118         return super.err
119 }
120
121 func (super *Supervisor) run(cfg *arvados.Config) error {
122         defer super.cancel()
123
124         cwd, err := os.Getwd()
125         if err != nil {
126                 return err
127         }
128         if !strings.HasPrefix(super.SourcePath, "/") {
129                 super.SourcePath = filepath.Join(cwd, super.SourcePath)
130         }
131         super.SourcePath, err = filepath.EvalSymlinks(super.SourcePath)
132         if err != nil {
133                 return err
134         }
135
136         super.tempdir, err = ioutil.TempDir("", "arvados-server-boot-")
137         if err != nil {
138                 return err
139         }
140         defer os.RemoveAll(super.tempdir)
141         if err := os.Mkdir(filepath.Join(super.tempdir, "bin"), 0755); err != nil {
142                 return err
143         }
144
145         // Fill in any missing config keys, and write the resulting
146         // config in the temp dir for child services to use.
147         err = super.autofillConfig(cfg)
148         if err != nil {
149                 return err
150         }
151         conffile, err := os.OpenFile(filepath.Join(super.tempdir, "config.yml"), os.O_CREATE|os.O_WRONLY, 0644)
152         if err != nil {
153                 return err
154         }
155         defer conffile.Close()
156         err = json.NewEncoder(conffile).Encode(cfg)
157         if err != nil {
158                 return err
159         }
160         err = conffile.Close()
161         if err != nil {
162                 return err
163         }
164         super.configfile = conffile.Name()
165
166         super.environ = os.Environ()
167         super.cleanEnv([]string{"ARVADOS_"})
168         super.setEnv("ARVADOS_CONFIG", super.configfile)
169         super.setEnv("RAILS_ENV", super.ClusterType)
170         super.setEnv("TMPDIR", super.tempdir)
171         super.prependEnv("PATH", super.tempdir+"/bin:/var/lib/arvados/bin:")
172
173         super.cluster, err = cfg.GetCluster("")
174         if err != nil {
175                 return err
176         }
177         // Now that we have the config, replace the bootstrap logger
178         // with a new one according to the logging config.
179         loglevel := super.cluster.SystemLogs.LogLevel
180         if s := os.Getenv("ARVADOS_DEBUG"); s != "" && s != "0" {
181                 loglevel = "debug"
182         }
183         super.logger = ctxlog.New(super.Stderr, super.cluster.SystemLogs.Format, loglevel).WithFields(logrus.Fields{
184                 "PID": os.Getpid(),
185         })
186
187         if super.SourceVersion == "" {
188                 // Find current source tree version.
189                 var buf bytes.Buffer
190                 err = super.RunProgram(super.ctx, ".", &buf, nil, "git", "diff", "--shortstat")
191                 if err != nil {
192                         return err
193                 }
194                 dirty := buf.Len() > 0
195                 buf.Reset()
196                 err = super.RunProgram(super.ctx, ".", &buf, nil, "git", "log", "-n1", "--format=%H")
197                 if err != nil {
198                         return err
199                 }
200                 super.SourceVersion = strings.TrimSpace(buf.String())
201                 if dirty {
202                         super.SourceVersion += "+uncommitted"
203                 }
204         } else {
205                 return errors.New("specifying a version to run is not yet supported")
206         }
207
208         _, err = super.installGoProgram(super.ctx, "cmd/arvados-server")
209         if err != nil {
210                 return err
211         }
212         err = super.setupRubyEnv()
213         if err != nil {
214                 return err
215         }
216
217         tasks := []supervisedTask{
218                 createCertificates{},
219                 runPostgreSQL{},
220                 runNginx{},
221                 runServiceCommand{name: "controller", svc: super.cluster.Services.Controller, depends: []supervisedTask{seedDatabase{}}},
222                 runGoProgram{src: "services/arv-git-httpd", svc: super.cluster.Services.GitHTTP},
223                 runGoProgram{src: "services/health", svc: super.cluster.Services.Health},
224                 runGoProgram{src: "services/keepproxy", svc: super.cluster.Services.Keepproxy, depends: []supervisedTask{runPassenger{src: "services/api"}}},
225                 runGoProgram{src: "services/keepstore", svc: super.cluster.Services.Keepstore},
226                 runGoProgram{src: "services/keep-web", svc: super.cluster.Services.WebDAV},
227                 runServiceCommand{name: "ws", svc: super.cluster.Services.Websocket, depends: []supervisedTask{seedDatabase{}}},
228                 installPassenger{src: "services/api"},
229                 runPassenger{src: "services/api", svc: super.cluster.Services.RailsAPI, depends: []supervisedTask{createCertificates{}, seedDatabase{}, installPassenger{src: "services/api"}}},
230                 installPassenger{src: "apps/workbench", depends: []supervisedTask{seedDatabase{}}}, // dependency ensures workbench doesn't delay api install/startup
231                 runPassenger{src: "apps/workbench", svc: super.cluster.Services.Workbench1, depends: []supervisedTask{installPassenger{src: "apps/workbench"}}},
232                 seedDatabase{},
233         }
234         if super.ClusterType != "test" {
235                 tasks = append(tasks,
236                         runServiceCommand{name: "dispatch-cloud", svc: super.cluster.Services.Controller},
237                         runGoProgram{src: "services/keep-balance"},
238                 )
239         }
240         super.tasksReady = map[string]chan bool{}
241         for _, task := range tasks {
242                 super.tasksReady[task.String()] = make(chan bool)
243         }
244         for _, task := range tasks {
245                 task := task
246                 fail := func(err error) {
247                         if super.ctx.Err() != nil {
248                                 return
249                         }
250                         super.cancel()
251                         super.logger.WithField("task", task.String()).WithError(err).Error("task failed")
252                 }
253                 go func() {
254                         super.logger.WithField("task", task.String()).Info("starting")
255                         err := task.Run(super.ctx, fail, super)
256                         if err != nil {
257                                 fail(err)
258                                 return
259                         }
260                         close(super.tasksReady[task.String()])
261                 }()
262         }
263         err = super.wait(super.ctx, tasks...)
264         if err != nil {
265                 return err
266         }
267         super.logger.Info("all startup tasks are complete; starting health checks")
268         super.healthChecker = &health.Aggregator{Cluster: super.cluster}
269         <-super.ctx.Done()
270         super.logger.Info("shutting down")
271         super.waitShutdown.Wait()
272         return super.ctx.Err()
273 }
274
275 func (super *Supervisor) wait(ctx context.Context, tasks ...supervisedTask) error {
276         for _, task := range tasks {
277                 ch, ok := super.tasksReady[task.String()]
278                 if !ok {
279                         return fmt.Errorf("no such task: %s", task)
280                 }
281                 super.logger.WithField("task", task.String()).Info("waiting")
282                 select {
283                 case <-ch:
284                         super.logger.WithField("task", task.String()).Info("ready")
285                 case <-ctx.Done():
286                         super.logger.WithField("task", task.String()).Info("task was never ready")
287                         return ctx.Err()
288                 }
289         }
290         return nil
291 }
292
293 func (super *Supervisor) Stop() {
294         super.cancel()
295         <-super.done
296 }
297
298 func (super *Supervisor) WaitReady() (*arvados.URL, bool) {
299         ticker := time.NewTicker(time.Second)
300         defer ticker.Stop()
301         for waiting := "all"; waiting != ""; {
302                 select {
303                 case <-ticker.C:
304                 case <-super.ctx.Done():
305                         return nil, false
306                 }
307                 if super.healthChecker == nil {
308                         // not set up yet
309                         continue
310                 }
311                 resp := super.healthChecker.ClusterHealth()
312                 // The overall health check (resp.Health=="OK") might
313                 // never pass due to missing components (like
314                 // arvados-dispatch-cloud in a test cluster), so
315                 // instead we wait for all configured components to
316                 // pass.
317                 waiting = ""
318                 for target, check := range resp.Checks {
319                         if check.Health != "OK" {
320                                 waiting += " " + target
321                         }
322                 }
323                 if waiting != "" {
324                         super.logger.WithField("targets", waiting[1:]).Info("waiting")
325                 }
326         }
327         u := super.cluster.Services.Controller.ExternalURL
328         return &u, true
329 }
330
331 func (super *Supervisor) prependEnv(key, prepend string) {
332         for i, s := range super.environ {
333                 if strings.HasPrefix(s, key+"=") {
334                         super.environ[i] = key + "=" + prepend + s[len(key)+1:]
335                         return
336                 }
337         }
338         super.environ = append(super.environ, key+"="+prepend)
339 }
340
341 func (super *Supervisor) cleanEnv(prefixes []string) {
342         var cleaned []string
343         for _, s := range super.environ {
344                 drop := false
345                 for _, p := range prefixes {
346                         if strings.HasPrefix(s, p) {
347                                 drop = true
348                                 break
349                         }
350                 }
351                 if !drop {
352                         cleaned = append(cleaned, s)
353                 }
354         }
355         super.environ = cleaned
356 }
357
358 func (super *Supervisor) setEnv(key, val string) {
359         for i, s := range super.environ {
360                 if strings.HasPrefix(s, key+"=") {
361                         super.environ[i] = key + "=" + val
362                         return
363                 }
364         }
365         super.environ = append(super.environ, key+"="+val)
366 }
367
368 // Remove all but the first occurrence of each env var.
369 func dedupEnv(in []string) []string {
370         saw := map[string]bool{}
371         var out []string
372         for _, kv := range in {
373                 if split := strings.Index(kv, "="); split < 1 {
374                         panic("invalid environment var: " + kv)
375                 } else if saw[kv[:split]] {
376                         continue
377                 } else {
378                         saw[kv[:split]] = true
379                         out = append(out, kv)
380                 }
381         }
382         return out
383 }
384
385 func (super *Supervisor) installGoProgram(ctx context.Context, srcpath string) (string, error) {
386         _, basename := filepath.Split(srcpath)
387         bindir := filepath.Join(super.tempdir, "bin")
388         binfile := filepath.Join(bindir, basename)
389         err := super.RunProgram(ctx, filepath.Join(super.SourcePath, srcpath), nil, []string{"GOBIN=" + bindir}, "go", "install", "-ldflags", "-X git.arvados.org/arvados.git/lib/cmd.version="+super.SourceVersion+" -X main.version="+super.SourceVersion)
390         return binfile, err
391 }
392
393 func (super *Supervisor) usingRVM() bool {
394         return os.Getenv("rvm_path") != ""
395 }
396
397 func (super *Supervisor) setupRubyEnv() error {
398         if !super.usingRVM() {
399                 // (If rvm is in use, assume the caller has everything
400                 // set up as desired)
401                 super.cleanEnv([]string{
402                         "GEM_HOME=",
403                         "GEM_PATH=",
404                 })
405                 gem := "gem"
406                 if _, err := os.Stat("/var/lib/arvados/bin/gem"); err == nil {
407                         gem = "/var/lib/arvados/bin/gem"
408                 }
409                 cmd := exec.Command(gem, "env", "gempath")
410                 cmd.Env = super.environ
411                 buf, err := cmd.Output() // /var/lib/arvados/.gem/ruby/2.5.0/bin:...
412                 if err != nil || len(buf) == 0 {
413                         return fmt.Errorf("gem env gempath: %v", err)
414                 }
415                 gempath := string(bytes.Split(buf, []byte{':'})[0])
416                 super.prependEnv("PATH", gempath+"/bin:")
417                 super.setEnv("GEM_HOME", gempath)
418                 super.setEnv("GEM_PATH", gempath)
419         }
420         // Passenger install doesn't work unless $HOME is ~user
421         u, err := user.Current()
422         if err != nil {
423                 return err
424         }
425         super.setEnv("HOME", u.HomeDir)
426         return nil
427 }
428
429 func (super *Supervisor) lookPath(prog string) string {
430         for _, val := range super.environ {
431                 if strings.HasPrefix(val, "PATH=") {
432                         for _, dir := range filepath.SplitList(val[5:]) {
433                                 path := filepath.Join(dir, prog)
434                                 if fi, err := os.Stat(path); err == nil && fi.Mode()&0111 != 0 {
435                                         return path
436                                 }
437                         }
438                 }
439         }
440         return prog
441 }
442
443 // RunProgram runs prog with args, using dir as working directory. If ctx is
444 // cancelled while the child is running, RunProgram terminates the child, waits
445 // for it to exit, then returns.
446 //
447 // Child's environment will have our env vars, plus any given in env.
448 //
449 // Child's stdout will be written to output if non-nil, otherwise the
450 // boot command's stderr.
451 func (super *Supervisor) RunProgram(ctx context.Context, dir string, output io.Writer, env []string, prog string, args ...string) error {
452         cmdline := fmt.Sprintf("%s", append([]string{prog}, args...))
453         super.logger.WithField("command", cmdline).WithField("dir", dir).Info("executing")
454
455         logprefix := prog
456         if logprefix == "setuidgid" && len(args) >= 2 {
457                 logprefix = args[1]
458         }
459         logprefix = strings.TrimPrefix(logprefix, super.tempdir+"/bin/")
460         if logprefix == "bundle" && len(args) > 2 && args[0] == "exec" {
461                 logprefix = args[1]
462         } else if logprefix == "arvados-server" && len(args) > 1 {
463                 logprefix = args[0]
464         }
465         if !strings.HasPrefix(dir, "/") {
466                 logprefix = dir + ": " + logprefix
467         }
468
469         cmd := exec.Command(super.lookPath(prog), args...)
470         stdout, err := cmd.StdoutPipe()
471         if err != nil {
472                 return err
473         }
474         stderr, err := cmd.StderrPipe()
475         if err != nil {
476                 return err
477         }
478         logwriter := &service.LogPrefixer{Writer: super.Stderr, Prefix: []byte("[" + logprefix + "] ")}
479         var copiers sync.WaitGroup
480         copiers.Add(1)
481         go func() {
482                 io.Copy(logwriter, stderr)
483                 copiers.Done()
484         }()
485         copiers.Add(1)
486         go func() {
487                 if output == nil {
488                         io.Copy(logwriter, stdout)
489                 } else {
490                         io.Copy(output, stdout)
491                 }
492                 copiers.Done()
493         }()
494
495         if strings.HasPrefix(dir, "/") {
496                 cmd.Dir = dir
497         } else {
498                 cmd.Dir = filepath.Join(super.SourcePath, dir)
499         }
500         env = append([]string(nil), env...)
501         env = append(env, super.environ...)
502         cmd.Env = dedupEnv(env)
503
504         exited := false
505         defer func() { exited = true }()
506         go func() {
507                 <-ctx.Done()
508                 log := ctxlog.FromContext(ctx).WithFields(logrus.Fields{"dir": dir, "cmdline": cmdline})
509                 for !exited {
510                         if cmd.Process == nil {
511                                 log.Debug("waiting for child process to start")
512                                 time.Sleep(time.Second / 2)
513                         } else {
514                                 log.WithField("PID", cmd.Process.Pid).Debug("sending SIGTERM")
515                                 cmd.Process.Signal(syscall.SIGTERM)
516                                 time.Sleep(5 * time.Second)
517                                 if !exited {
518                                         stdout.Close()
519                                         stderr.Close()
520                                         log.WithField("PID", cmd.Process.Pid).Warn("still waiting for child process to exit 5s after SIGTERM")
521                                 }
522                         }
523                 }
524         }()
525
526         err = cmd.Start()
527         if err != nil {
528                 return err
529         }
530         copiers.Wait()
531         err = cmd.Wait()
532         if ctx.Err() != nil {
533                 // Return "context canceled", instead of the "killed"
534                 // error that was probably caused by the context being
535                 // canceled.
536                 return ctx.Err()
537         } else if err != nil {
538                 return fmt.Errorf("%s: error: %v", cmdline, err)
539         }
540         return nil
541 }
542
543 func (super *Supervisor) autofillConfig(cfg *arvados.Config) error {
544         cluster, err := cfg.GetCluster("")
545         if err != nil {
546                 return err
547         }
548         usedPort := map[string]bool{}
549         nextPort := func(host string) string {
550                 for {
551                         port, err := availablePort(host)
552                         if err != nil {
553                                 panic(err)
554                         }
555                         if usedPort[port] {
556                                 continue
557                         }
558                         usedPort[port] = true
559                         return port
560                 }
561         }
562         if cluster.Services.Controller.ExternalURL.Host == "" {
563                 h, p, err := net.SplitHostPort(super.ControllerAddr)
564                 if err != nil {
565                         return err
566                 }
567                 if h == "" {
568                         h = super.ListenHost
569                 }
570                 if p == "0" {
571                         p = nextPort(h)
572                 }
573                 cluster.Services.Controller.ExternalURL = arvados.URL{Scheme: "https", Host: net.JoinHostPort(h, p), Path: "/"}
574         }
575         for _, svc := range []*arvados.Service{
576                 &cluster.Services.Controller,
577                 &cluster.Services.DispatchCloud,
578                 &cluster.Services.GitHTTP,
579                 &cluster.Services.Health,
580                 &cluster.Services.Keepproxy,
581                 &cluster.Services.Keepstore,
582                 &cluster.Services.RailsAPI,
583                 &cluster.Services.WebDAV,
584                 &cluster.Services.WebDAVDownload,
585                 &cluster.Services.Websocket,
586                 &cluster.Services.Workbench1,
587         } {
588                 if svc == &cluster.Services.DispatchCloud && super.ClusterType == "test" {
589                         continue
590                 }
591                 if svc.ExternalURL.Host == "" {
592                         if svc == &cluster.Services.Controller ||
593                                 svc == &cluster.Services.GitHTTP ||
594                                 svc == &cluster.Services.Health ||
595                                 svc == &cluster.Services.Keepproxy ||
596                                 svc == &cluster.Services.WebDAV ||
597                                 svc == &cluster.Services.WebDAVDownload ||
598                                 svc == &cluster.Services.Workbench1 {
599                                 svc.ExternalURL = arvados.URL{Scheme: "https", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost)), Path: "/"}
600                         } else if svc == &cluster.Services.Websocket {
601                                 svc.ExternalURL = arvados.URL{Scheme: "wss", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost)), Path: "/websocket"}
602                         }
603                 }
604                 if len(svc.InternalURLs) == 0 {
605                         svc.InternalURLs = map[arvados.URL]arvados.ServiceInstance{
606                                 {Scheme: "http", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost)), Path: "/"}: {},
607                         }
608                 }
609         }
610         if cluster.SystemRootToken == "" {
611                 cluster.SystemRootToken = randomHexString(64)
612         }
613         if cluster.ManagementToken == "" {
614                 cluster.ManagementToken = randomHexString(64)
615         }
616         if cluster.Collections.BlobSigningKey == "" {
617                 cluster.Collections.BlobSigningKey = randomHexString(64)
618         }
619         if cluster.Users.AnonymousUserToken == "" {
620                 cluster.Users.AnonymousUserToken = randomHexString(64)
621         }
622
623         if super.ClusterType != "production" && cluster.Containers.DispatchPrivateKey == "" {
624                 buf, err := ioutil.ReadFile(filepath.Join(super.SourcePath, "lib", "dispatchcloud", "test", "sshkey_dispatch"))
625                 if err != nil {
626                         return err
627                 }
628                 cluster.Containers.DispatchPrivateKey = string(buf)
629         }
630         if super.ClusterType != "production" {
631                 cluster.TLS.Insecure = true
632         }
633         if super.ClusterType == "test" {
634                 // Add a second keepstore process.
635                 cluster.Services.Keepstore.InternalURLs[arvados.URL{Scheme: "http", Host: fmt.Sprintf("%s:%s", super.ListenHost, nextPort(super.ListenHost)), Path: "/"}] = arvados.ServiceInstance{}
636
637                 // Create a directory-backed volume for each keepstore
638                 // process.
639                 cluster.Volumes = map[string]arvados.Volume{}
640                 for url := range cluster.Services.Keepstore.InternalURLs {
641                         volnum := len(cluster.Volumes)
642                         datadir := fmt.Sprintf("%s/keep%d.data", super.tempdir, volnum)
643                         if _, err = os.Stat(datadir + "/."); err == nil {
644                         } else if !os.IsNotExist(err) {
645                                 return err
646                         } else if err = os.Mkdir(datadir, 0755); err != nil {
647                                 return err
648                         }
649                         cluster.Volumes[fmt.Sprintf(cluster.ClusterID+"-nyw5e-%015d", volnum)] = arvados.Volume{
650                                 Driver:           "Directory",
651                                 DriverParameters: json.RawMessage(fmt.Sprintf(`{"Root":%q}`, datadir)),
652                                 AccessViaHosts: map[arvados.URL]arvados.VolumeAccess{
653                                         url: {},
654                                 },
655                         }
656                 }
657         }
658         if super.OwnTemporaryDatabase {
659                 cluster.PostgreSQL.Connection = arvados.PostgreSQLConnection{
660                         "client_encoding": "utf8",
661                         "host":            "localhost",
662                         "port":            nextPort(super.ListenHost),
663                         "dbname":          "arvados_test",
664                         "user":            "arvados",
665                         "password":        "insecure_arvados_test",
666                 }
667         }
668
669         cfg.Clusters[cluster.ClusterID] = *cluster
670         return nil
671 }
672
673 func addrIsLocal(addr string) (bool, error) {
674         return true, nil
675         listener, err := net.Listen("tcp", addr)
676         if err == nil {
677                 listener.Close()
678                 return true, nil
679         } else if strings.Contains(err.Error(), "cannot assign requested address") {
680                 return false, nil
681         } else {
682                 return false, err
683         }
684 }
685
686 func randomHexString(chars int) string {
687         b := make([]byte, chars/2)
688         _, err := rand.Read(b)
689         if err != nil {
690                 panic(err)
691         }
692         return fmt.Sprintf("%x", b)
693 }
694
695 func internalPort(svc arvados.Service) (string, error) {
696         if len(svc.InternalURLs) > 1 {
697                 return "", errors.New("internalPort() doesn't work with multiple InternalURLs")
698         }
699         for u := range svc.InternalURLs {
700                 if _, p, err := net.SplitHostPort(u.Host); err != nil {
701                         return "", err
702                 } else if p != "" {
703                         return p, nil
704                 } else if u.Scheme == "https" {
705                         return "443", nil
706                 } else {
707                         return "80", nil
708                 }
709         }
710         return "", fmt.Errorf("service has no InternalURLs")
711 }
712
713 func externalPort(svc arvados.Service) (string, error) {
714         if _, p, err := net.SplitHostPort(svc.ExternalURL.Host); err != nil {
715                 return "", err
716         } else if p != "" {
717                 return p, nil
718         } else if svc.ExternalURL.Scheme == "https" {
719                 return "443", nil
720         } else {
721                 return "80", nil
722         }
723 }
724
725 func availablePort(host string) (string, error) {
726         ln, err := net.Listen("tcp", net.JoinHostPort(host, "0"))
727         if err != nil {
728                 return "", err
729         }
730         defer ln.Close()
731         _, port, err := net.SplitHostPort(ln.Addr().String())
732         if err != nil {
733                 return "", err
734         }
735         return port, nil
736 }
737
738 // Try to connect to addr until it works, then close ch. Give up if
739 // ctx cancels.
740 func waitForConnect(ctx context.Context, addr string) error {
741         dialer := net.Dialer{Timeout: time.Second}
742         for ctx.Err() == nil {
743                 conn, err := dialer.DialContext(ctx, "tcp", addr)
744                 if err != nil {
745                         time.Sleep(time.Second / 10)
746                         continue
747                 }
748                 conn.Close()
749                 return nil
750         }
751         return ctx.Err()
752 }
753
754 func copyConfig(cfg *arvados.Config) *arvados.Config {
755         pr, pw := io.Pipe()
756         go func() {
757                 err := json.NewEncoder(pw).Encode(cfg)
758                 if err != nil {
759                         panic(err)
760                 }
761                 pw.Close()
762         }()
763         cfg2 := new(arvados.Config)
764         err := json.NewDecoder(pr).Decode(cfg2)
765         if err != nil {
766                 panic(err)
767         }
768         return cfg2
769 }
770
771 func watchConfig(ctx context.Context, logger logrus.FieldLogger, cfgPath string, prevcfg *arvados.Config, fn func()) {
772         watcher, err := fsnotify.NewWatcher()
773         if err != nil {
774                 logger.WithError(err).Error("fsnotify setup failed")
775                 return
776         }
777         defer watcher.Close()
778
779         err = watcher.Add(cfgPath)
780         if err != nil {
781                 logger.WithError(err).Error("fsnotify watcher failed")
782                 return
783         }
784
785         for {
786                 select {
787                 case <-ctx.Done():
788                         return
789                 case err, ok := <-watcher.Errors:
790                         if !ok {
791                                 return
792                         }
793                         logger.WithError(err).Warn("fsnotify watcher reported error")
794                 case _, ok := <-watcher.Events:
795                         if !ok {
796                                 return
797                         }
798                         for len(watcher.Events) > 0 {
799                                 <-watcher.Events
800                         }
801                         loader := config.NewLoader(&bytes.Buffer{}, &logrus.Logger{Out: ioutil.Discard})
802                         loader.Path = cfgPath
803                         loader.SkipAPICalls = true
804                         cfg, err := loader.Load()
805                         if err != nil {
806                                 logger.WithError(err).Warn("error reloading config file after change detected; ignoring new config for now")
807                         } else if reflect.DeepEqual(cfg, prevcfg) {
808                                 logger.Debug("config file changed but is still DeepEqual to the existing config")
809                         } else {
810                                 logger.Debug("config changed, notifying supervisor")
811                                 fn()
812                                 prevcfg = cfg
813                         }
814                 }
815         }
816 }