1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
16 "git.arvados.org/arvados.git/lib/crunchrun"
17 "github.com/sirupsen/logrus"
20 // remoteRunner handles the starting and stopping of a crunch-run
21 // process on a remote machine.
22 type remoteRunner struct {
25 configJSON json.RawMessage
29 timeoutTERM time.Duration
30 timeoutSignal time.Duration
31 onUnkillable func(uuid string) // callback invoked when giving up on SIGTERM
32 onKilled func(uuid string) // callback invoked when process exits after SIGTERM
33 logger logrus.FieldLogger
35 stopping bool // true if Stop() has been called
36 givenup bool // true if timeoutTERM has been reached
37 closed chan struct{} // channel is closed if Close() has been called
40 // newRemoteRunner returns a new remoteRunner. Caller should ensure
41 // Close() is called to release resources.
42 func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
43 // Send the instance type record as a JSON doc so crunch-run
45 var instJSON bytes.Buffer
46 enc := json.NewEncoder(&instJSON)
47 enc.SetIndent("", " ")
48 if err := enc.Encode(wkr.instType); err != nil {
51 var configData crunchrun.ConfigData
52 configData.Env = map[string]string{
53 "ARVADOS_API_HOST": wkr.wp.arvClient.APIHost,
54 "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
55 "InstanceType": instJSON.String(),
56 "GatewayAddress": net.JoinHostPort(wkr.instance.Address(), "0"),
57 "GatewayAuthSecret": wkr.wp.gatewayAuthSecret(uuid),
59 if wkr.wp.arvClient.Insecure {
60 configData.Env["ARVADOS_API_HOST_INSECURE"] = "1"
62 if bufs := wkr.wp.cluster.Containers.LocalKeepBlobBuffersPerVCPU; bufs > 0 {
63 configData.Cluster = wkr.wp.cluster
64 configData.KeepBuffers = bufs * wkr.instType.VCPUs
66 if wkr.wp.cluster.Containers.CloudVMs.Driver == "ec2" && wkr.instType.Preemptible {
67 configData.EC2SpotCheck = true
69 configJSON, err := json.Marshal(configData)
75 executor: wkr.executor,
76 configJSON: configJSON,
77 runnerCmd: wkr.wp.runnerCmd,
78 runnerArgs: wkr.wp.runnerArgs,
79 remoteUser: wkr.instance.RemoteUser(),
80 timeoutTERM: wkr.wp.timeoutTERM,
81 timeoutSignal: wkr.wp.timeoutSignal,
82 onUnkillable: wkr.onUnkillable,
83 onKilled: wkr.onKilled,
84 logger: wkr.logger.WithField("ContainerUUID", uuid),
85 closed: make(chan struct{}),
90 // Start a crunch-run process on the remote host.
92 // Start does not return any error encountered. The caller should
93 // assume the remote process _might_ have started, at least until it
94 // probes the worker and finds otherwise.
95 func (rr *remoteRunner) Start() {
96 cmd := rr.runnerCmd + " --detach --stdin-config"
97 for _, arg := range rr.runnerArgs {
98 cmd += " '" + strings.Replace(arg, "'", "'\\''", -1) + "'"
100 cmd += " '" + rr.uuid + "'"
101 if rr.remoteUser != "root" {
104 stdin := bytes.NewBuffer(rr.configJSON)
105 stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin)
107 rr.logger.WithField("stdout", string(stdout)).
108 WithField("stderr", string(stderr)).
110 Error("error starting crunch-run process")
113 rr.logger.Info("crunch-run process started")
116 // Close abandons the remote process (if any) and releases
117 // resources. Close must not be called more than once.
118 func (rr *remoteRunner) Close() {
122 // Kill starts a background task to kill the remote process, first
123 // trying SIGTERM until reaching timeoutTERM, then calling
126 // SIGKILL is not used. It would merely kill the crunch-run supervisor
127 // and thereby make the docker container, arv-mount, etc. invisible to
128 // us without actually stopping them.
130 // Once Kill has been called, calling it again has no effect.
131 func (rr *remoteRunner) Kill(reason string) {
136 rr.logger.WithField("Reason", reason).Info("killing crunch-run process")
138 termDeadline := time.Now().Add(rr.timeoutTERM)
139 t := time.NewTicker(rr.timeoutSignal)
145 case time.Now().After(termDeadline):
146 rr.logger.Debug("giving up")
148 rr.onUnkillable(rr.uuid)
151 rr.kill(syscall.SIGTERM)
157 func (rr *remoteRunner) kill(sig syscall.Signal) {
158 logger := rr.logger.WithField("Signal", int(sig))
159 logger.Info("sending signal")
160 cmd := fmt.Sprintf(rr.runnerCmd+" --kill %d %s", sig, rr.uuid)
161 if rr.remoteUser != "root" {
164 stdout, stderr, err := rr.executor.Execute(nil, cmd, nil)
166 logger.WithFields(logrus.Fields{
167 "stderr": string(stderr),
168 "stdout": string(stdout),
170 }).Info("kill attempt unsuccessful")
176 func (rr *remoteRunner) isClosed() bool {