refactor as procedural
authorTom Clegg <tom@curoverse.com>
Sat, 28 Jan 2017 06:56:01 +0000 (01:56 -0500)
committerTom Clegg <tom@curoverse.com>
Sat, 28 Jan 2017 06:56:01 +0000 (01:56 -0500)
14 files changed:
services/boot/.gitignore
services/boot/booter.go [new file with mode: 0644]
services/boot/config.go
services/boot/consul.go [new file with mode: 0644]
services/boot/consul_task.go [deleted file]
services/boot/controller.go [new file with mode: 0644]
services/boot/ctl_tasks.go [deleted file]
services/boot/download.go [moved from services/boot/download_task.go with 71% similarity]
services/boot/feedback.go [new file with mode: 0644]
services/boot/js/index.js
services/boot/server.go
services/boot/shell.go [new file with mode: 0644]
services/boot/systemd.go
services/boot/task.go [deleted file]

index d08895dd5aa128f78e1717d6bd2c7274525641a1..8c8b8e3bd976b1d57f521a760d2699dc86e0be60 100644 (file)
@@ -2,3 +2,4 @@
 bindata.tmp
 node_modules
 bindata_assetfs.go
+npm-debug.log
diff --git a/services/boot/booter.go b/services/boot/booter.go
new file mode 100644 (file)
index 0000000..cb672ab
--- /dev/null
@@ -0,0 +1,81 @@
+package main
+
+import (
+       "context"
+       "fmt"
+       "sync"
+)
+
+// A Booter ensures some piece of the system ("target") is correctly
+// installed, configured, running, or working.
+type Booter interface {
+       // Inspect, repair, and report the current state of the target.
+       Boot(context.Context) error
+}
+
+var cfgKey = &struct{}{}
+
+func cfg(ctx context.Context) *Config {
+       return ctx.Value(cfgKey).(*Config)
+}
+
+func withCfg(ctx context.Context, cfg *Config) context.Context {
+       return context.WithValue(ctx, cfgKey, cfg)
+}
+
+type Series []Booter
+
+func (sb Series) Boot(ctx context.Context) error {
+       for _, b := range sb {
+               err := b.Boot(ctx)
+               if err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
+type Concurrent []Booter
+
+func (cb Concurrent) Boot(ctx context.Context) error {
+       errs := make([]error, len(cb))
+       var wg sync.WaitGroup
+       wg.Add(len(cb))
+       for i, b := range cb {
+               i, b := i, b
+               go func() {
+                       defer wg.Done()
+                       errs[i] = b.Boot(ctx)
+               }()
+       }
+       wg.Wait()
+       return NewMultipleError(errs)
+}
+
+type MultipleError struct {
+       error
+       errors []error
+}
+
+func NewMultipleError(errs []error) error {
+       var errors []error
+       for _, err := range errs {
+               switch err := err.(type) {
+               case *MultipleError:
+                       errors = append(errors, err.errors...)
+               case nil:
+               default:
+                       errors = append(errors, err)
+               }
+       }
+       if len(errors) == 0 {
+               return nil
+       }
+       if len(errors) == 1 {
+               return errors[0]
+       }
+       return &MultipleError{
+               error:  fmt.Errorf("%d errors", len(errors)),
+               errors: errors,
+       }
+}
index 0eb1c3016dc0b3666275f0bfc8769b56c2040062..84cb853464e5f55d6a71287674a737304bcec889 100644 (file)
@@ -10,8 +10,21 @@ type Config struct {
        // alive.
        ControlHosts []string
 
-       // addr:port to serve web-based setup/monitoring application
-       WebListen string
+       ConsulPorts struct {
+               DNS     int
+               HTTP    int
+               HTTPS   int
+               RPC     int
+               SerfLAN int `json:"Serf_LAN"`
+               SerfWAN int `json:"Serf_WAN"`
+               Server  int
+       }
+
+       WebGUI struct {
+               // addr:port to serve web-based setup/monitoring
+               // application
+               Listen string
+       }
 
        UsrDir  string
        DataDir string
@@ -21,13 +34,27 @@ func (c *Config) SetDefaults() {
        if len(c.ControlHosts) == 0 {
                c.ControlHosts = []string{"127.0.0.1"}
        }
+       defaultPort := []int{18600, 18500, -1, 18400, 18301, 18302, 18300}
+       for i, port := range []*int{
+               &c.ConsulPorts.DNS,
+               &c.ConsulPorts.HTTP,
+               &c.ConsulPorts.HTTPS,
+               &c.ConsulPorts.RPC,
+               &c.ConsulPorts.SerfLAN,
+               &c.ConsulPorts.SerfWAN,
+               &c.ConsulPorts.Server,
+       } {
+               if *port == 0 {
+                       *port = defaultPort[i]
+               }
+       }
        if c.DataDir == "" {
                c.DataDir = "/var/lib/arvados"
        }
        if c.UsrDir == "" {
-               c.DataDir = "/usr/local"
+               c.DataDir = "/usr/local/arvados"
        }
-       if c.WebListen == "" {
-               c.WebListen = "localhost:8000"
+       if c.WebGUI.Listen == "" {
+               c.WebGUI.Listen = "localhost:18000"
        }
 }
diff --git a/services/boot/consul.go b/services/boot/consul.go
new file mode 100644 (file)
index 0000000..095ea89
--- /dev/null
@@ -0,0 +1,81 @@
+package main
+
+import (
+       "context"
+       "fmt"
+       "os"
+       "os/exec"
+       "sync"
+
+       "github.com/hashicorp/consul/api"
+)
+
+var consul = &consulBooter{}
+
+type consulBooter struct {
+       sync.Mutex
+}
+
+func (cb *consulBooter) Boot(ctx context.Context) error {
+       cb.Lock()
+       defer cb.Unlock()
+
+       cfg := cfg(ctx)
+       bin := cfg.UsrDir + "/bin/consul"
+       err := (&download{
+               URL:  "https://releases.hashicorp.com/consul/0.7.2/consul_0.7.2_linux_amd64.zip",
+               Dest: bin,
+               Size: 29079005,
+               Mode: 0755,
+       }).Boot(ctx)
+       if err != nil {
+               return err
+       }
+       if cb.check(ctx) == nil {
+               return nil
+       }
+       args := []string{
+               "agent",
+               "-server",
+               "-advertise=127.0.0.1",
+               "-data-dir", cfg.DataDir + "/consul",
+               "-bootstrap-expect", fmt.Sprintf("%d", len(cfg.ControlHosts))}
+       supervisor := newSupervisor("consul", bin, args...)
+       running, err := supervisor.Running()
+       if err != nil {
+               return err
+       }
+       if !running {
+               defer feedbackf(ctx, "starting consul service")()
+               err = supervisor.Start()
+               if err != nil {
+                       return fmt.Errorf("starting consul: %s", err)
+               }
+               if len(cfg.ControlHosts) > 1 {
+                       cmd := exec.Command(bin, append([]string{"join"}, cfg.ControlHosts...)...)
+                       cmd.Stdout = os.Stderr
+                       cmd.Stderr = os.Stderr
+                       err := cmd.Run()
+                       if err != nil {
+                               return fmt.Errorf("consul join: %s", err)
+                       }
+               }
+       }
+       return cb.check(ctx)
+}
+
+var consulCfg = api.DefaultConfig()
+
+func (cb *consulBooter) check(ctx context.Context) error {
+       cfg := cfg(ctx)
+       consulCfg.Datacenter = cfg.SiteID
+       consul, err := api.NewClient(consulCfg)
+       if err != nil {
+               return err
+       }
+       _, err = consul.Catalog().Datacenters()
+       if err != nil {
+               return err
+       }
+       return nil
+}
diff --git a/services/boot/consul_task.go b/services/boot/consul_task.go
deleted file mode 100644 (file)
index 4de5512..0000000
+++ /dev/null
@@ -1,81 +0,0 @@
-package main
-
-import (
-       "fmt"
-       "os"
-       "os/exec"
-       "time"
-
-       "github.com/hashicorp/consul/api"
-)
-
-type consulService struct {
-       supervisor
-}
-
-func (cs *consulService) Init(cfg *Config) {
-       args := []string{
-               "agent",
-               "-server",
-               "-advertise=127.0.0.1",
-               "-data-dir", cfg.DataDir + "/consul",
-               "-bootstrap-expect", fmt.Sprintf("%d", len(cfg.ControlHosts))}
-       cs.supervisor = newSupervisor("consul", "/usr/local/bin/consul", args...)
-}
-
-func (cs *consulService) Children() []task {
-       return nil
-}
-
-func (cs *consulService) ShortName() string {
-       return "consul running"
-}
-
-func (cs *consulService) String() string {
-       return "Ensure consul daemon is supervised & running"
-}
-
-func (cs *consulService) Check() error {
-       consul, err := api.NewClient(api.DefaultConfig())
-       if err != nil {
-               return err
-       }
-       _, err = consul.Catalog().Datacenters()
-       if err != nil {
-               return err
-       }
-       return nil
-}
-
-func (cs *consulService) CanFix() bool {
-       return true
-}
-
-func (cs *consulService) Fix() error {
-       err := cs.supervisor.Start()
-       if err != nil {
-               return err
-       }
-
-       if len(cfg.ControlHosts) > 1 {
-               cmd := exec.Command("/usr/local/bin/consul", append([]string{"join"}, cfg.ControlHosts...)...)
-               cmd.Stdout = os.Stderr
-               cmd.Stderr = os.Stderr
-               err := cmd.Run()
-               if err != nil {
-                       return err
-               }
-       }
-
-       timeout := time.After(10 * time.Second)
-       ticker := time.NewTicker(50 * time.Millisecond)
-       defer ticker.Stop()
-       for cs.Check() != nil {
-               select {
-               case <-ticker.C:
-               case <-timeout:
-                       return cs.Check()
-               }
-       }
-       return nil
-}
diff --git a/services/boot/controller.go b/services/boot/controller.go
new file mode 100644 (file)
index 0000000..396e37a
--- /dev/null
@@ -0,0 +1,13 @@
+package main
+
+import (
+       "context"
+)
+
+type controller struct{}
+
+func (c *controller) Boot(ctx context.Context) error {
+       return Concurrent{
+               consul,
+       }.Boot(ctx)
+}
diff --git a/services/boot/ctl_tasks.go b/services/boot/ctl_tasks.go
deleted file mode 100644 (file)
index 8737316..0000000
+++ /dev/null
@@ -1,12 +0,0 @@
-package main
-
-// tasks to run on a controller node
-var ctlTasks = []task{
-       &download{
-               URL:  "https://releases.hashicorp.com/consul/0.7.2/consul_0.7.2_linux_amd64.zip",
-               Dest: "/usr/local/bin/consul",
-               Size: 29079005,
-               Mode: 0755,
-       },
-       &consulService{},
-}
similarity index 71%
rename from services/boot/download_task.go
rename to services/boot/download.go
index da70678c6ebd18ddc5c9a694de957a6b1cfd56f3..e047aab9c108d64bb81b065f5d20b93aea19ea16 100644 (file)
@@ -2,6 +2,7 @@ package main
 
 import (
        "archive/zip"
+       "context"
        "fmt"
        "io"
        "io/ioutil"
@@ -20,39 +21,22 @@ type download struct {
        Hash string
 }
 
-func (d *download) Init(cfg *Config) {}
-
-func (d *download) Children() []task {
-       return nil
-}
-
-func (d *download) ShortName() string {
-       return d.Dest
-}
-
-func (d *download) String() string {
-       return fmt.Sprintf("Download %q from %q", d.Dest, d.URL)
-}
-
-func (d *download) Check() error {
+func (d *download) Boot(ctx context.Context) error {
        fi, err := os.Stat(d.Dest)
-       if err != nil {
+       if os.IsNotExist(err) {
+               // fall through to fix
+       } else if err != nil {
                return err
+       } else if d.Size > 0 && fi.Size() != d.Size {
+               err = fmt.Errorf("Size mismatch: %q is %d bytes, expected %d", d.Dest, fi.Size(), d.Size)
+       } else if d.Mode > 0 && fi.Mode() != d.Mode {
+               err = fmt.Errorf("Mode mismatch: %q is %s, expected %s", d.Dest, fi.Mode(), d.Mode)
+       } else {
+               return nil
        }
-       if d.Size > 0 && fi.Size() != d.Size {
-               return fmt.Errorf("Size mismatch: %q is %d bytes, expected %d", d.Dest, fi.Size(), d.Size)
-       }
-       if d.Mode > 0 && fi.Mode() != d.Mode {
-               return fmt.Errorf("Mode mismatch: %q is %s, expected %s", d.Dest, fi.Mode(), d.Mode)
-       }
-       return nil
-}
 
-func (d *download) CanFix() bool {
-       return true
-}
+       defer feedbackf(ctx, "downloading %s", d.URL)()
 
-func (d *download) Fix() error {
        out, err := ioutil.TempFile(path.Dir(d.Dest), path.Base(d.Dest))
        if err != nil {
                return err
diff --git a/services/boot/feedback.go b/services/boot/feedback.go
new file mode 100644 (file)
index 0000000..6cc509f
--- /dev/null
@@ -0,0 +1,15 @@
+package main
+
+import (
+       "context"
+       "fmt"
+       "log"
+)
+
+func feedbackf(ctx context.Context, f string, args ...interface{}) func() {
+       msg := fmt.Sprintf(f, args...)
+       log.Print("start: ", msg)
+       return func() {
+               log.Print(" done: ", msg)
+       }
+}
index b4a87442a8e16e2537eee3bb05de19e0e4efe13a..9ed5c047325f43839424a021c3f651088bc45546 100644 (file)
@@ -50,7 +50,7 @@ var Home = {
             m('table.table', {style: {width: '350px'}},
               m('tbody', {style: {opacity: ctl().Outdated ? .5 : 1}}, ctl().Tasks.map(function(task) {
                   return m('tr', [
-                      m('td', task.ShortName),
+                      m('td', task.Name),
                       m('td',
                         m('span.badge',
                           {class: task.State == 'OK' ? 'badge-success' : 'badge-danger'},
index f9aad5727a5baac67e0598f954a31bda6ace7c24..2c477e4dfcd6ee6dacc5a87739522d93c37097d1 100644 (file)
@@ -1,6 +1,7 @@
 package main
 
 import (
+       "context"
        "encoding/json"
        "flag"
        "log"
@@ -14,12 +15,11 @@ import (
 
 const defaultCfgPath = "/etc/arvados/boot/boot.yml"
 
-var cfg Config
-
 func main() {
        cfgPath := flag.String("config", defaultCfgPath, "`path` to config file")
        flag.Parse()
 
+       var cfg Config
        if err := config.LoadFile(&cfg, *cfgPath); os.IsNotExist(err) && *cfgPath == defaultCfgPath {
                log.Printf("WARNING: No config file specified or found, starting fresh!")
        } else if err != nil {
@@ -27,13 +27,15 @@ func main() {
        }
        cfg.SetDefaults()
        go func() {
-               log.Printf("starting server at %s", cfg.WebListen)
-               log.Fatal(http.ListenAndServe(cfg.WebListen, stack(logger, apiOrAssets)))
+               log.Printf("starting server at %s", cfg.WebGUI.Listen)
+               log.Fatal(http.ListenAndServe(cfg.WebGUI.Listen, stack(cfg.logger, cfg.apiOrAssets)))
        }()
        go func() {
+               var ctl Booter = &controller{}
                ticker := time.NewTicker(5 * time.Second)
                for {
-                       runTasks(&cfg, ctlTasks)
+                       err := ctl.Boot(withCfg(context.Background(), &cfg))
+                       log.Printf("ctl.Boot: %v", err)
                        <-ticker.C
                }
        }()
@@ -53,7 +55,7 @@ func stack(m ...middleware) http.Handler {
 }
 
 // logs each request.
-func logger(next http.Handler) http.Handler {
+func (cfg *Config) logger(next http.Handler) http.Handler {
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
                t := time.Now()
                next.ServeHTTP(w, r)
@@ -63,15 +65,15 @@ func logger(next http.Handler) http.Handler {
 
 // dispatches /api/ to the API stack, everything else to the static
 // assets stack.
-func apiOrAssets(next http.Handler) http.Handler {
+func (cfg *Config) apiOrAssets(next http.Handler) http.Handler {
        mux := http.NewServeMux()
-       mux.Handle("/api/", stack(apiHeaders, apiRoutes))
+       mux.Handle("/api/", stack(cfg.apiHeaders, cfg.apiRoutes))
        mux.Handle("/", http.FileServer(assetFS()))
        return mux
 }
 
 // adds response headers suitable for API responses
-func apiHeaders(next http.Handler) http.Handler {
+func (cfg *Config) apiHeaders(next http.Handler) http.Handler {
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
                w.Header().Set("Content-Type", "application/json")
                next.ServeHTTP(w, r)
@@ -79,23 +81,29 @@ func apiHeaders(next http.Handler) http.Handler {
 }
 
 // dispatches API routes
-func apiRoutes(http.Handler) http.Handler {
+func (cfg *Config) apiRoutes(http.Handler) http.Handler {
        mux := http.NewServeMux()
        mux.HandleFunc("/api/ping", func(w http.ResponseWriter, r *http.Request) {
                json.NewEncoder(w).Encode(map[string]interface{}{"time": time.Now().UTC()})
        })
-       mux.HandleFunc("/api/tasks/ctl", func(w http.ResponseWriter, r *http.Request) {
+       mux.HandleFunc("/api/status/controller", func(w http.ResponseWriter, r *http.Request) {
                timeout := time.Minute
                if v, err := strconv.ParseInt(r.FormValue("timeout"), 10, 64); err == nil {
                        timeout = time.Duration(v) * time.Second
                }
                if v, err := strconv.ParseInt(r.FormValue("newerThan"), 10, 64); err == nil {
-                       TaskState.Wait(version(v), timeout, r.Context())
+                       log.Println(v, timeout)
+                       // TODO: wait
+                       // TaskState.Wait(version(v), timeout, r.Context())
                }
-               rep, v := report(ctlTasks)
+               // TODO:
+               // rep, v := report(ctlTasks)
                json.NewEncoder(w).Encode(map[string]interface{}{
-                       "Version": v,
-                       "Tasks":   rep,
+                       // "Version": v,
+                       // "Tasks":   rep,
+                       // TODO:
+                       "Version": 1,
+                       "Tasks": []int{},
                })
        })
        mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
diff --git a/services/boot/shell.go b/services/boot/shell.go
new file mode 100644 (file)
index 0000000..8dcfbe2
--- /dev/null
@@ -0,0 +1,17 @@
+package main
+
+import (
+       "bytes"
+       "os/exec"
+       "strings"
+)
+
+func BashScript(script string) ([]byte, []byte, error) {
+       cmd := exec.Command("bash", "-e", "-x")
+       cmd.Stdin = strings.NewReader(script)
+       var stdout, stderr bytes.Buffer
+       cmd.Stdout = &stdout
+       cmd.Stderr = &stderr
+       err := cmd.Run()
+       return stdout.Bytes(), stderr.Bytes(), err
+}
index e88ecc39dfd30197ddd5fa66af89677fc0ea95fa..30154334802c7b04564d02239c0f1a84f157d99a 100644 (file)
@@ -7,26 +7,26 @@ import (
 )
 
 type supervisor interface {
-       Check() (bool, error)
+       Running() (bool, error)
        Start() error
 }
 
 func newSupervisor(name, cmd string, args ...string) supervisor {
        return &systemdUnit{
                name: name,
-               cmd: cmd,
+               cmd:  cmd,
                args: args,
        }
 }
 
 type systemdUnit struct {
        name string
-       cmd string
+       cmd  string
        args []string
 }
 
 func (u *systemdUnit) Start() error {
-       cmd := exec.Command("systemd-run", append([]string{"--unit=arvados-"+u.name, u.cmd}, u.args...)...)
+       cmd := exec.Command("systemd-run", append([]string{"--unit=arvados-" + u.name, u.cmd}, u.args...)...)
        cmd.Stdout = os.Stderr
        cmd.Stderr = os.Stderr
        err := cmd.Run()
@@ -36,7 +36,7 @@ func (u *systemdUnit) Start() error {
        return err
 }
 
-func (u *systemdUnit) Check() (bool, error) {
+func (u *systemdUnit) Running() (bool, error) {
        cmd := exec.Command("systemctl", "status", "arvados-"+u.name)
        cmd.Stdout = os.Stderr
        cmd.Stderr = os.Stderr
diff --git a/services/boot/task.go b/services/boot/task.go
deleted file mode 100644 (file)
index 75c49bd..0000000
+++ /dev/null
@@ -1,151 +0,0 @@
-package main
-
-import (
-       "context"
-       "log"
-       "sync"
-       "time"
-)
-
-type taskState string
-
-const (
-       StateUnchecked taskState = "Unchecked"
-       StateChecking            = "Checking"
-       StateFixing              = "Fixing"
-       StateFailed              = "Failed"
-       StateOK                  = "OK"
-)
-
-type version int64
-
-type taskStateMap struct {
-       s       map[task]taskState
-       cond    *sync.Cond
-       version version
-}
-
-var TaskState = taskStateMap{
-       s:    make(map[task]taskState),
-       cond: sync.NewCond(&sync.Mutex{}),
-}
-
-func (m *taskStateMap) Set(t task, s taskState) {
-       m.cond.L.Lock()
-       defer m.cond.L.Unlock()
-       if old, ok := m.s[t]; ok && old == s {
-               return
-       }
-       m.s[t] = s
-       m.version++
-       m.cond.Broadcast()
-}
-
-func (m *taskStateMap) Version() version {
-       m.cond.L.Lock()
-       defer m.cond.L.Unlock()
-       return m.version
-}
-
-func (m *taskStateMap) Get(t task) taskState {
-       m.cond.L.Lock()
-       defer m.cond.L.Unlock()
-       if s, ok := m.s[t]; ok {
-               return s
-       } else {
-               return StateUnchecked
-       }
-}
-
-type repEnt struct {
-       ShortName   string
-       Description string
-       State       taskState
-       Children    []repEnt
-}
-
-func (m *taskStateMap) Wait(v version, t time.Duration, ctx context.Context) bool {
-       ready := make(chan struct{})
-       var done bool
-       go func() {
-               m.cond.L.Lock()
-               defer m.cond.L.Unlock()
-               for v == m.version && !done {
-                       m.cond.Wait()
-               }
-               close(ready)
-       }()
-       select {
-       case <-ready:
-               return true
-       case <-ctx.Done():
-       case <-time.After(t):
-       }
-       done = true
-       m.cond.Broadcast()
-       return false
-}
-
-func report(tasks []task) ([]repEnt, version) {
-       v := TaskState.Version()
-       if len(tasks) == 0 {
-               return nil, v
-       }
-       var rep []repEnt
-       for _, t := range tasks {
-               crep, _ := report(t.Children())
-               rep = append(rep, repEnt{
-                       ShortName:   t.ShortName(),
-                       Description: t.String(),
-                       State:       TaskState.Get(t),
-                       Children:    crep,
-               })
-       }
-       return rep, v
-}
-
-func runTasks(cfg *Config, tasks []task) {
-       for _, t := range tasks {
-               t.Init(cfg)
-       }
-       for _, t := range tasks {
-               if TaskState.Get(t) == taskState("") {
-                       TaskState.Set(t, StateChecking)
-               }
-               err := t.Check()
-               if err == nil {
-                       log.Printf("%s: OK", t)
-                       TaskState.Set(t, StateOK)
-                       continue
-               }
-               log.Printf("%s: %s", t, err)
-               if !t.CanFix() {
-                       log.Printf("%s: can't fix")
-                       TaskState.Set(t, StateFailed)
-                       continue
-               }
-               TaskState.Set(t, StateFixing)
-               if err = t.Fix(); err != nil {
-                       log.Printf("%s: can't fix: %s", t, err)
-                       TaskState.Set(t, StateFailed)
-                       continue
-               }
-               if err = t.Check(); err != nil {
-                       log.Printf("%s: fixed, but still broken?!: %s", t, err)
-                       TaskState.Set(t, StateFailed)
-                       continue
-               }
-               log.Printf("%s: OK", t)
-               TaskState.Set(t, StateOK)
-       }
-}
-
-type task interface {
-       Init(*Config)
-       ShortName() string
-       String() string
-       Check() error
-       CanFix() bool
-       Fix() error
-       Children() []task
-}