1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
14 "github.com/sirupsen/logrus"
17 // remoteRunner handles the starting and stopping of a crunch-run
18 // process on a remote machine.
19 type remoteRunner struct {
22 envJSON json.RawMessage
24 timeoutTERM time.Duration
25 timeoutSignal time.Duration
26 onUnkillable func(uuid string) // callback invoked when giving up on SIGTERM
27 onKilled func(uuid string) // callback invoked when process exits after SIGTERM
28 logger logrus.FieldLogger
30 stopping bool // true if Stop() has been called
31 givenup bool // true if timeoutTERM has been reached
32 closed chan struct{} // channel is closed if Close() has been called
35 // newRemoteRunner returns a new remoteRunner. Caller should ensure
36 // Close() is called to release resources.
37 func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
38 // Early (<1.5) versions of crunch-run error out if they see
39 // non-string values in the env map -- so here we send the
40 // instance type record as a JSON doc. Once worker images are
41 // updated, we can skip the extra encoding, and just include
42 // {"InstanceType": wkr.instType} in the env map.
43 var instJSON bytes.Buffer
44 enc := json.NewEncoder(&instJSON)
45 enc.SetIndent("", " ")
46 if err := enc.Encode(wkr.instType); err != nil {
49 env := map[string]string{
50 "ARVADOS_API_HOST": wkr.wp.arvClient.APIHost,
51 "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
52 "InstanceType": instJSON.String(),
54 if wkr.wp.arvClient.Insecure {
55 env["ARVADOS_API_HOST_INSECURE"] = "1"
57 envJSON, err := json.Marshal(env)
63 executor: wkr.executor,
65 remoteUser: wkr.instance.RemoteUser(),
66 timeoutTERM: wkr.wp.timeoutTERM,
67 timeoutSignal: wkr.wp.timeoutSignal,
68 onUnkillable: wkr.onUnkillable,
69 onKilled: wkr.onKilled,
70 logger: wkr.logger.WithField("ContainerUUID", uuid),
71 closed: make(chan struct{}),
76 // Start a crunch-run process on the remote host.
78 // Start does not return any error encountered. The caller should
79 // assume the remote process _might_ have started, at least until it
80 // probes the worker and finds otherwise.
81 func (rr *remoteRunner) Start() {
82 cmd := "crunch-run --detach --stdin-env '" + rr.uuid + "'"
83 if rr.remoteUser != "root" {
86 stdin := bytes.NewBuffer(rr.envJSON)
87 stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin)
89 rr.logger.WithField("stdout", string(stdout)).
90 WithField("stderr", string(stderr)).
92 Error("error starting crunch-run process")
95 rr.logger.Info("crunch-run process started")
98 // Close abandons the remote process (if any) and releases
99 // resources. Close must not be called more than once.
100 func (rr *remoteRunner) Close() {
104 // Kill starts a background task to kill the remote process, first
105 // trying SIGTERM until reaching timeoutTERM, then calling
108 // SIGKILL is not used. It would merely kill the crunch-run supervisor
109 // and thereby make the docker container, arv-mount, etc. invisible to
110 // us without actually stopping them.
112 // Once Kill has been called, calling it again has no effect.
113 func (rr *remoteRunner) Kill(reason string) {
118 rr.logger.WithField("Reason", reason).Info("killing crunch-run process")
120 termDeadline := time.Now().Add(rr.timeoutTERM)
121 t := time.NewTicker(rr.timeoutSignal)
127 case time.Now().After(termDeadline):
128 rr.logger.Debug("giving up")
130 rr.onUnkillable(rr.uuid)
133 rr.kill(syscall.SIGTERM)
139 func (rr *remoteRunner) kill(sig syscall.Signal) {
140 logger := rr.logger.WithField("Signal", int(sig))
141 logger.Info("sending signal")
142 cmd := fmt.Sprintf("crunch-run --kill %d %s", sig, rr.uuid)
143 if rr.remoteUser != "root" {
146 stdout, stderr, err := rr.executor.Execute(nil, cmd, nil)
148 logger.WithFields(logrus.Fields{
149 "stderr": string(stderr),
150 "stdout": string(stdout),
152 }).Info("kill attempt unsuccessful")
158 func (rr *remoteRunner) isClosed() bool {