+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package worker
import (
"bytes"
"encoding/json"
"fmt"
+ "net"
+ "strings"
"syscall"
"time"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/sirupsen/logrus"
)
type remoteRunner struct {
uuid string
executor Executor
- arvClient *arvados.Client
+ envJSON json.RawMessage
+ runnerCmd string
+ runnerArgs []string
remoteUser string
timeoutTERM time.Duration
- timeoutKILL time.Duration
timeoutSignal time.Duration
- onUnkillable func(uuid string) // callback invoked when giving up on SIGKILL
- onKilled func(uuid string) // callback invoked when process exits after SIGTERM/SIGKILL
+ onUnkillable func(uuid string) // callback invoked when giving up on SIGTERM
+ onKilled func(uuid string) // callback invoked when process exits after SIGTERM
logger logrus.FieldLogger
stopping bool // true if Stop() has been called
- sentKILL bool // true if SIGKILL has been sent
+ givenup bool // true if timeoutTERM has been reached
closed chan struct{} // channel is closed if Close() has been called
}
// newRemoteRunner returns a new remoteRunner. Caller should ensure
// Close() is called to release resources.
func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
+ // Send the instance type record as a JSON doc so crunch-run
+ // can log it.
+ var instJSON bytes.Buffer
+ enc := json.NewEncoder(&instJSON)
+ enc.SetIndent("", " ")
+ if err := enc.Encode(wkr.instType); err != nil {
+ panic(err)
+ }
+ env := map[string]string{
+ "ARVADOS_API_HOST": wkr.wp.arvClient.APIHost,
+ "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
+ "InstanceType": instJSON.String(),
+ "GatewayAddress": net.JoinHostPort(wkr.instance.Address(), "0"),
+ "GatewayAuthSecret": wkr.wp.gatewayAuthSecret(uuid),
+ }
+ if wkr.wp.arvClient.Insecure {
+ env["ARVADOS_API_HOST_INSECURE"] = "1"
+ }
+ envJSON, err := json.Marshal(env)
+ if err != nil {
+ panic(err)
+ }
rr := &remoteRunner{
uuid: uuid,
executor: wkr.executor,
- arvClient: wkr.wp.arvClient,
+ envJSON: envJSON,
+ runnerCmd: wkr.wp.runnerCmd,
+ runnerArgs: wkr.wp.runnerArgs,
remoteUser: wkr.instance.RemoteUser(),
timeoutTERM: wkr.wp.timeoutTERM,
- timeoutKILL: wkr.wp.timeoutKILL,
timeoutSignal: wkr.wp.timeoutSignal,
onUnkillable: wkr.onUnkillable,
onKilled: wkr.onKilled,
// assume the remote process _might_ have started, at least until it
// probes the worker and finds otherwise.
func (rr *remoteRunner) Start() {
- env := map[string]string{
- "ARVADOS_API_HOST": rr.arvClient.APIHost,
- "ARVADOS_API_TOKEN": rr.arvClient.AuthToken,
- }
- if rr.arvClient.Insecure {
- env["ARVADOS_API_HOST_INSECURE"] = "1"
+ cmd := rr.runnerCmd + " --detach --stdin-env"
+ for _, arg := range rr.runnerArgs {
+ cmd += " '" + strings.Replace(arg, "'", "'\\''", -1) + "'"
}
- envJSON, err := json.Marshal(env)
- if err != nil {
- panic(err)
- }
- stdin := bytes.NewBuffer(envJSON)
- cmd := "crunch-run --detach --stdin-env '" + rr.uuid + "'"
+ cmd += " '" + rr.uuid + "'"
if rr.remoteUser != "root" {
cmd = "sudo " + cmd
}
+ stdin := bytes.NewBuffer(rr.envJSON)
stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin)
if err != nil {
rr.logger.WithField("stdout", string(stdout)).
close(rr.closed)
}
-// Kill starts a background task to kill the remote process,
-// escalating from SIGTERM to SIGKILL to onUnkillable() according to
-// the configured timeouts.
+// Kill starts a background task to kill the remote process, first
+// trying SIGTERM until reaching timeoutTERM, then calling
+// onUnkillable().
+//
+// SIGKILL is not used. It would merely kill the crunch-run supervisor
+// and thereby make the docker container, arv-mount, etc. invisible to
+// us without actually stopping them.
//
// Once Kill has been called, calling it again has no effect.
func (rr *remoteRunner) Kill(reason string) {
rr.logger.WithField("Reason", reason).Info("killing crunch-run process")
go func() {
termDeadline := time.Now().Add(rr.timeoutTERM)
- killDeadline := termDeadline.Add(rr.timeoutKILL)
t := time.NewTicker(rr.timeoutSignal)
defer t.Stop()
for range t.C {
switch {
case rr.isClosed():
return
- case time.Now().After(killDeadline):
+ case time.Now().After(termDeadline):
+ rr.logger.Debug("giving up")
+ rr.givenup = true
rr.onUnkillable(rr.uuid)
return
- case time.Now().After(termDeadline):
- rr.sentKILL = true
- rr.kill(syscall.SIGKILL)
default:
rr.kill(syscall.SIGTERM)
}
func (rr *remoteRunner) kill(sig syscall.Signal) {
logger := rr.logger.WithField("Signal", int(sig))
logger.Info("sending signal")
- cmd := fmt.Sprintf("crunch-run --kill %d %s", sig, rr.uuid)
+ cmd := fmt.Sprintf(rr.runnerCmd+" --kill %d %s", sig, rr.uuid)
if rr.remoteUser != "root" {
cmd = "sudo " + cmd
}
"stderr": string(stderr),
"stdout": string(stdout),
"error": err,
- }).Info("kill failed")
+ }).Info("kill attempt unsuccessful")
return
}
rr.onKilled(rr.uuid)