17384: Respect CrunchRunCommand and CrunchRunArgumentsList in a-d-c.
[arvados.git] / lib / dispatchcloud / worker / runner.go
index bf1632a6a245e2d7edaf4b1e67486fa657cabe9a..63561874c9c5e570187048922addbbc4e4ece502 100644 (file)
@@ -1,13 +1,18 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package worker
 
 import (
        "bytes"
        "encoding/json"
        "fmt"
+       "net"
+       "strings"
        "syscall"
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/sirupsen/logrus"
 )
 
@@ -16,30 +21,54 @@ import (
 type remoteRunner struct {
        uuid          string
        executor      Executor
-       arvClient     *arvados.Client
+       envJSON       json.RawMessage
+       runnerCmd     string
+       runnerArgs    []string
        remoteUser    string
        timeoutTERM   time.Duration
-       timeoutKILL   time.Duration
        timeoutSignal time.Duration
-       onUnkillable  func(uuid string) // callback invoked when giving up on SIGKILL
-       onKilled      func(uuid string) // callback invoked when process exits after SIGTERM/SIGKILL
+       onUnkillable  func(uuid string) // callback invoked when giving up on SIGTERM
+       onKilled      func(uuid string) // callback invoked when process exits after SIGTERM
        logger        logrus.FieldLogger
 
        stopping bool          // true if Stop() has been called
-       sentKILL bool          // true if SIGKILL has been sent
+       givenup  bool          // true if timeoutTERM has been reached
        closed   chan struct{} // channel is closed if Close() has been called
 }
 
 // newRemoteRunner returns a new remoteRunner. Caller should ensure
 // Close() is called to release resources.
 func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
+       // Send the instance type record as a JSON doc so crunch-run
+       // can log it.
+       var instJSON bytes.Buffer
+       enc := json.NewEncoder(&instJSON)
+       enc.SetIndent("", "    ")
+       if err := enc.Encode(wkr.instType); err != nil {
+               panic(err)
+       }
+       env := map[string]string{
+               "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
+               "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
+               "InstanceType":      instJSON.String(),
+               "GatewayAddress":    net.JoinHostPort(wkr.instance.Address(), "0"),
+               "GatewayAuthSecret": wkr.wp.gatewayAuthSecret(uuid),
+       }
+       if wkr.wp.arvClient.Insecure {
+               env["ARVADOS_API_HOST_INSECURE"] = "1"
+       }
+       envJSON, err := json.Marshal(env)
+       if err != nil {
+               panic(err)
+       }
        rr := &remoteRunner{
                uuid:          uuid,
                executor:      wkr.executor,
-               arvClient:     wkr.wp.arvClient,
+               envJSON:       envJSON,
+               runnerCmd:     wkr.wp.runnerCmd,
+               runnerArgs:    wkr.wp.runnerArgs,
                remoteUser:    wkr.instance.RemoteUser(),
                timeoutTERM:   wkr.wp.timeoutTERM,
-               timeoutKILL:   wkr.wp.timeoutKILL,
                timeoutSignal: wkr.wp.timeoutSignal,
                onUnkillable:  wkr.onUnkillable,
                onKilled:      wkr.onKilled,
@@ -55,22 +84,15 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
 // assume the remote process _might_ have started, at least until it
 // probes the worker and finds otherwise.
 func (rr *remoteRunner) Start() {
-       env := map[string]string{
-               "ARVADOS_API_HOST":  rr.arvClient.APIHost,
-               "ARVADOS_API_TOKEN": rr.arvClient.AuthToken,
-       }
-       if rr.arvClient.Insecure {
-               env["ARVADOS_API_HOST_INSECURE"] = "1"
+       cmd := rr.runnerCmd + " --detach --stdin-env"
+       for _, arg := range rr.runnerArgs {
+               cmd += " '" + strings.Replace(arg, "'", "'\\''", -1) + "'"
        }
-       envJSON, err := json.Marshal(env)
-       if err != nil {
-               panic(err)
-       }
-       stdin := bytes.NewBuffer(envJSON)
-       cmd := "crunch-run --detach --stdin-env '" + rr.uuid + "'"
+       cmd += " '" + rr.uuid + "'"
        if rr.remoteUser != "root" {
                cmd = "sudo " + cmd
        }
+       stdin := bytes.NewBuffer(rr.envJSON)
        stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin)
        if err != nil {
                rr.logger.WithField("stdout", string(stdout)).
@@ -88,9 +110,13 @@ func (rr *remoteRunner) Close() {
        close(rr.closed)
 }
 
-// Kill starts a background task to kill the remote process,
-// escalating from SIGTERM to SIGKILL to onUnkillable() according to
-// the configured timeouts.
+// Kill starts a background task to kill the remote process, first
+// trying SIGTERM until reaching timeoutTERM, then calling
+// onUnkillable().
+//
+// SIGKILL is not used. It would merely kill the crunch-run supervisor
+// and thereby make the docker container, arv-mount, etc. invisible to
+// us without actually stopping them.
 //
 // Once Kill has been called, calling it again has no effect.
 func (rr *remoteRunner) Kill(reason string) {
@@ -101,19 +127,17 @@ func (rr *remoteRunner) Kill(reason string) {
        rr.logger.WithField("Reason", reason).Info("killing crunch-run process")
        go func() {
                termDeadline := time.Now().Add(rr.timeoutTERM)
-               killDeadline := termDeadline.Add(rr.timeoutKILL)
                t := time.NewTicker(rr.timeoutSignal)
                defer t.Stop()
                for range t.C {
                        switch {
                        case rr.isClosed():
                                return
-                       case time.Now().After(killDeadline):
+                       case time.Now().After(termDeadline):
+                               rr.logger.Debug("giving up")
+                               rr.givenup = true
                                rr.onUnkillable(rr.uuid)
                                return
-                       case time.Now().After(termDeadline):
-                               rr.sentKILL = true
-                               rr.kill(syscall.SIGKILL)
                        default:
                                rr.kill(syscall.SIGTERM)
                        }
@@ -124,7 +148,7 @@ func (rr *remoteRunner) Kill(reason string) {
 func (rr *remoteRunner) kill(sig syscall.Signal) {
        logger := rr.logger.WithField("Signal", int(sig))
        logger.Info("sending signal")
-       cmd := fmt.Sprintf("crunch-run --kill %d %s", sig, rr.uuid)
+       cmd := fmt.Sprintf(rr.runnerCmd+" --kill %d %s", sig, rr.uuid)
        if rr.remoteUser != "root" {
                cmd = "sudo " + cmd
        }
@@ -134,7 +158,7 @@ func (rr *remoteRunner) kill(sig syscall.Signal) {
                        "stderr": string(stderr),
                        "stdout": string(stdout),
                        "error":  err,
-               }).Info("kill failed")
+               }).Info("kill attempt unsuccessful")
                return
        }
        rr.onKilled(rr.uuid)