14807: Merge branch 'master'
[arvados.git] / lib / dispatchcloud / worker / worker.go
index a24747267615b9b0d0d0c8851271e92bdb85087c..9be9f41f43b7ef51cbb1d1257e4ac39f642472aa 100644 (file)
@@ -6,6 +6,7 @@ package worker
 
 import (
        "bytes"
+       "encoding/json"
        "fmt"
        "strings"
        "sync"
@@ -13,6 +14,7 @@ import (
 
        "git.curoverse.com/arvados.git/lib/cloud"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/stats"
        "github.com/sirupsen/logrus"
 )
 
@@ -96,7 +98,7 @@ func (wkr *worker) startContainer(ctr arvados.Container) {
                "ContainerUUID": ctr.UUID,
                "Priority":      ctr.Priority,
        })
-       logger = logger.WithField("Instance", wkr.instance)
+       logger = logger.WithField("Instance", wkr.instance.ID())
        logger.Debug("starting container")
        wkr.starting[ctr.UUID] = struct{}{}
        wkr.state = StateRunning
@@ -105,7 +107,19 @@ func (wkr *worker) startContainer(ctr arvados.Container) {
                        "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
                        "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
                }
-               stdout, stderr, err := wkr.executor.Execute(env, "crunch-run --detach '"+ctr.UUID+"'", nil)
+               if wkr.wp.arvClient.Insecure {
+                       env["ARVADOS_API_HOST_INSECURE"] = "1"
+               }
+               envJSON, err := json.Marshal(env)
+               if err != nil {
+                       panic(err)
+               }
+               stdin := bytes.NewBuffer(envJSON)
+               cmd := "crunch-run --detach --stdin-env '" + ctr.UUID + "'"
+               if u := wkr.instance.RemoteUser(); u != "root" {
+                       cmd = "sudo " + cmd
+               }
+               stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
                wkr.mtx.Lock()
                defer wkr.mtx.Unlock()
                now := time.Now()
@@ -325,6 +339,9 @@ func (wkr *worker) probeAndUpdate() {
 
 func (wkr *worker) probeRunning() (running []string, ok bool) {
        cmd := "crunch-run --list"
+       if u := wkr.instance.RemoteUser(); u != "root" {
+               cmd = "sudo " + cmd
+       }
        stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
        if err != nil {
                wkr.logger.WithFields(logrus.Fields{
@@ -400,7 +417,7 @@ func (wkr *worker) shutdownIfIdle() bool {
 
        wkr.logger.WithFields(logrus.Fields{
                "State":        wkr.state,
-               "Age":          age,
+               "IdleDuration": stats.Duration(age),
                "IdleBehavior": wkr.idleBehavior,
        }).Info("shutdown idle worker")
        wkr.shutdown()
@@ -427,22 +444,24 @@ func (wkr *worker) shutdown() {
 // match. Caller must have lock.
 func (wkr *worker) saveTags() {
        instance := wkr.instance
-       have := instance.Tags()
-       want := cloud.InstanceTags{
+       tags := instance.Tags()
+       update := cloud.InstanceTags{
                tagKeyInstanceType: wkr.instType.Name,
                tagKeyIdleBehavior: string(wkr.idleBehavior),
        }
-       go func() {
-               for k, v := range want {
-                       if v == have[k] {
-                               continue
-                       }
-                       err := instance.SetTags(want)
+       save := false
+       for k, v := range update {
+               if tags[k] != v {
+                       tags[k] = v
+                       save = true
+               }
+       }
+       if save {
+               go func() {
+                       err := instance.SetTags(tags)
                        if err != nil {
-                               wkr.wp.logger.WithField("Instance", instance).WithError(err).Warnf("error updating tags")
+                               wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
                        }
-                       break
-
-               }
-       }()
+               }()
+       }
 }