1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
15 "github.com/sirupsen/logrus"
18 // remoteRunner handles the starting and stopping of a crunch-run
19 // process on a remote machine.
20 type remoteRunner struct {
23 envJSON json.RawMessage
26 timeoutTERM time.Duration
27 timeoutSignal time.Duration
28 onUnkillable func(uuid string) // callback invoked when giving up on SIGTERM
29 onKilled func(uuid string) // callback invoked when process exits after SIGTERM
30 logger logrus.FieldLogger
32 stopping bool // true if Stop() has been called
33 givenup bool // true if timeoutTERM has been reached
34 closed chan struct{} // channel is closed if Close() has been called
37 // newRemoteRunner returns a new remoteRunner. Caller should ensure
38 // Close() is called to release resources.
39 func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
40 // Send the instance type record as a JSON doc so crunch-run
42 var instJSON bytes.Buffer
43 enc := json.NewEncoder(&instJSON)
44 enc.SetIndent("", " ")
45 if err := enc.Encode(wkr.instType); err != nil {
48 env := map[string]string{
49 "ARVADOS_API_HOST": wkr.wp.arvClient.APIHost,
50 "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
51 "InstanceType": instJSON.String(),
52 "GatewayAddress": net.JoinHostPort(wkr.instance.Address(), "0"),
53 "GatewayAuthSecret": wkr.wp.gatewayAuthSecret(uuid),
55 if wkr.wp.arvClient.Insecure {
56 env["ARVADOS_API_HOST_INSECURE"] = "1"
58 envJSON, err := json.Marshal(env)
64 executor: wkr.executor,
66 runnerCmd: wkr.wp.runnerCmd,
67 remoteUser: wkr.instance.RemoteUser(),
68 timeoutTERM: wkr.wp.timeoutTERM,
69 timeoutSignal: wkr.wp.timeoutSignal,
70 onUnkillable: wkr.onUnkillable,
71 onKilled: wkr.onKilled,
72 logger: wkr.logger.WithField("ContainerUUID", uuid),
73 closed: make(chan struct{}),
78 // Start a crunch-run process on the remote host.
80 // Start does not return any error encountered. The caller should
81 // assume the remote process _might_ have started, at least until it
82 // probes the worker and finds otherwise.
83 func (rr *remoteRunner) Start() {
84 cmd := rr.runnerCmd + " --detach --stdin-env '" + rr.uuid + "'"
85 if rr.remoteUser != "root" {
88 stdin := bytes.NewBuffer(rr.envJSON)
89 stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin)
91 rr.logger.WithField("stdout", string(stdout)).
92 WithField("stderr", string(stderr)).
94 Error("error starting crunch-run process")
97 rr.logger.Info("crunch-run process started")
100 // Close abandons the remote process (if any) and releases
101 // resources. Close must not be called more than once.
102 func (rr *remoteRunner) Close() {
106 // Kill starts a background task to kill the remote process, first
107 // trying SIGTERM until reaching timeoutTERM, then calling
110 // SIGKILL is not used. It would merely kill the crunch-run supervisor
111 // and thereby make the docker container, arv-mount, etc. invisible to
112 // us without actually stopping them.
114 // Once Kill has been called, calling it again has no effect.
115 func (rr *remoteRunner) Kill(reason string) {
120 rr.logger.WithField("Reason", reason).Info("killing crunch-run process")
122 termDeadline := time.Now().Add(rr.timeoutTERM)
123 t := time.NewTicker(rr.timeoutSignal)
129 case time.Now().After(termDeadline):
130 rr.logger.Debug("giving up")
132 rr.onUnkillable(rr.uuid)
135 rr.kill(syscall.SIGTERM)
141 func (rr *remoteRunner) kill(sig syscall.Signal) {
142 logger := rr.logger.WithField("Signal", int(sig))
143 logger.Info("sending signal")
144 cmd := fmt.Sprintf(rr.runnerCmd+" --kill %d %s", sig, rr.uuid)
145 if rr.remoteUser != "root" {
148 stdout, stderr, err := rr.executor.Execute(nil, cmd, nil)
150 logger.WithFields(logrus.Fields{
151 "stderr": string(stderr),
152 "stdout": string(stdout),
154 }).Info("kill attempt unsuccessful")
160 func (rr *remoteRunner) isClosed() bool {