add gateway (part)
[arvados.git] / services / boot / consul.go
1 package main
2
3 import (
4         "context"
5         "fmt"
6         "os"
7         "os/exec"
8         "path"
9         "strings"
10         "sync"
11         "time"
12
13         "github.com/hashicorp/consul/api"
14 )
15
16 var consul = &consulBooter{}
17
18 type consulBooter struct {
19         sync.Mutex
20 }
21
22 func (cb *consulBooter) Boot(ctx context.Context) error {
23         cb.Lock()
24         defer cb.Unlock()
25
26         if cb.check(ctx) == nil {
27                 return nil
28         }
29         cfg := cfg(ctx)
30         bin := cfg.UsrDir + "/bin/consul"
31         err := (&download{
32                 URL:  "https://releases.hashicorp.com/consul/0.7.2/consul_0.7.2_linux_amd64.zip",
33                 Dest: bin,
34                 Size: 29079005,
35                 Mode: 0755,
36         }).Boot(ctx)
37         if err != nil {
38                 return err
39         }
40         dataDir := path.Join(cfg.DataDir, "consul")
41         if err := os.MkdirAll(dataDir, 0700); err != nil {
42                 return err
43         }
44         args := []string{"agent"}
45         {
46                 cf := path.Join(cfg.DataDir, "consul-encrypt.json")
47                 if _, err := os.Stat(cf); err != nil && !os.IsNotExist(err) {
48                         return err
49                 } else if err != nil {
50                         key, err := exec.Command(bin, "keygen").CombinedOutput()
51                         if err != nil {
52                                 return err
53                         }
54                         if err = atomicWriteJSON(cf, map[string]interface{}{
55                                 "encrypt": strings.TrimSpace(string(key)),
56                         }, 0400); err != nil {
57                                 return err
58                         }
59                 }
60                 args = append(args, "-config-file="+cf)
61         }
62         {
63                 cf := path.Join(cfg.DataDir, "consul-ports.json")
64                 err = atomicWriteJSON(cf, map[string]interface{}{
65                         "client_addr":      "0.0.0.0",
66                         "bootstrap_expect": len(cfg.ControlHosts),
67                         "data_dir":         dataDir,
68                         "datacenter":       cfg.SiteID,
69                         "server":           true,
70                         "ui":               true,
71                         "ports": map[string]int{
72                                 "dns":      cfg.Ports.ConsulDNS,
73                                 "http":     cfg.Ports.ConsulHTTP,
74                                 "https":    cfg.Ports.ConsulHTTPS,
75                                 "rpc":      cfg.Ports.ConsulRPC,
76                                 "serf_lan": cfg.Ports.ConsulSerfLAN,
77                                 "serf_wan": cfg.Ports.ConsulSerfWAN,
78                                 "server":   cfg.Ports.ConsulServer,
79                         },
80                 }, 0644)
81                 if err != nil {
82                         return err
83                 }
84                 args = append(args, "-config-file="+cf)
85         }
86         supervisor := newSupervisor(ctx, "arvados-consul", bin, args...)
87         running, err := supervisor.Running(ctx)
88         if err != nil {
89                 return err
90         }
91         if !running {
92                 defer feedbackf(ctx, "starting consul service")()
93                 err = supervisor.Start(ctx)
94                 if err != nil {
95                         return fmt.Errorf("starting consul: %s", err)
96                 }
97                 if len(cfg.ControlHosts) > 1 {
98                         cmd := exec.Command(bin, append([]string{"join"}, cfg.ControlHosts...)...)
99                         cmd.Stdout = os.Stderr
100                         cmd.Stderr = os.Stderr
101                         err := cmd.Run()
102                         if err != nil {
103                                 return fmt.Errorf("consul join: %s", err)
104                         }
105                 }
106         }
107         return waitCheck(ctx, 30*time.Second, cb.check)
108 }
109
110 var consulCfg = api.DefaultConfig()
111
112 func (cb *consulBooter) check(ctx context.Context) error {
113         cfg := cfg(ctx)
114         consulCfg.Address = fmt.Sprintf("127.0.0.1:%d", cfg.Ports.ConsulHTTP)
115         consulCfg.Datacenter = cfg.SiteID
116         consul, err := api.NewClient(consulCfg)
117         if err != nil {
118                 return err
119         }
120         _, err = consul.Catalog().Datacenters()
121         if err != nil {
122                 return err
123         }
124         return nil
125 }
126
127 // OnlyNode returns true if this is the only consul node.
128 func (cb *consulBooter) OnlyNode() (bool, error) {
129         c, err := api.NewClient(consulCfg)
130         if err != nil {
131                 return false, err
132         }
133         nodes, _, err := c.Catalog().Nodes(nil)
134         return len(nodes) == 1, err
135 }