15734: Reword crunch-run compatibility comment.
[arvados.git] / lib / dispatchcloud / worker / runner.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "encoding/json"
10         "fmt"
11         "syscall"
12         "time"
13
14         "github.com/sirupsen/logrus"
15 )
16
17 // remoteRunner handles the starting and stopping of a crunch-run
18 // process on a remote machine.
19 type remoteRunner struct {
20         uuid          string
21         executor      Executor
22         envJSON       json.RawMessage
23         remoteUser    string
24         timeoutTERM   time.Duration
25         timeoutSignal time.Duration
26         onUnkillable  func(uuid string) // callback invoked when giving up on SIGTERM
27         onKilled      func(uuid string) // callback invoked when process exits after SIGTERM
28         logger        logrus.FieldLogger
29
30         stopping bool          // true if Stop() has been called
31         givenup  bool          // true if timeoutTERM has been reached
32         closed   chan struct{} // channel is closed if Close() has been called
33 }
34
35 // newRemoteRunner returns a new remoteRunner. Caller should ensure
36 // Close() is called to release resources.
37 func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
38         // Early (<1.5) versions of crunch-run error out if they see
39         // non-string values in the env map -- so here we send the
40         // instance type record as a JSON doc. Once worker images are
41         // updated, we can skip the extra encoding, and just include
42         // {"InstanceType": wkr.instType} in the env map.
43         var instJSON bytes.Buffer
44         enc := json.NewEncoder(&instJSON)
45         enc.SetIndent("", "    ")
46         if err := enc.Encode(wkr.instType); err != nil {
47                 panic(err)
48         }
49         env := map[string]string{
50                 "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
51                 "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
52                 "InstanceType":      instJSON.String(),
53         }
54         if wkr.wp.arvClient.Insecure {
55                 env["ARVADOS_API_HOST_INSECURE"] = "1"
56         }
57         envJSON, err := json.Marshal(env)
58         if err != nil {
59                 panic(err)
60         }
61         rr := &remoteRunner{
62                 uuid:          uuid,
63                 executor:      wkr.executor,
64                 envJSON:       envJSON,
65                 remoteUser:    wkr.instance.RemoteUser(),
66                 timeoutTERM:   wkr.wp.timeoutTERM,
67                 timeoutSignal: wkr.wp.timeoutSignal,
68                 onUnkillable:  wkr.onUnkillable,
69                 onKilled:      wkr.onKilled,
70                 logger:        wkr.logger.WithField("ContainerUUID", uuid),
71                 closed:        make(chan struct{}),
72         }
73         return rr
74 }
75
76 // Start a crunch-run process on the remote host.
77 //
78 // Start does not return any error encountered. The caller should
79 // assume the remote process _might_ have started, at least until it
80 // probes the worker and finds otherwise.
81 func (rr *remoteRunner) Start() {
82         cmd := "crunch-run --detach --stdin-env '" + rr.uuid + "'"
83         if rr.remoteUser != "root" {
84                 cmd = "sudo " + cmd
85         }
86         stdin := bytes.NewBuffer(rr.envJSON)
87         stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin)
88         if err != nil {
89                 rr.logger.WithField("stdout", string(stdout)).
90                         WithField("stderr", string(stderr)).
91                         WithError(err).
92                         Error("error starting crunch-run process")
93                 return
94         }
95         rr.logger.Info("crunch-run process started")
96 }
97
98 // Close abandons the remote process (if any) and releases
99 // resources. Close must not be called more than once.
100 func (rr *remoteRunner) Close() {
101         close(rr.closed)
102 }
103
104 // Kill starts a background task to kill the remote process, first
105 // trying SIGTERM until reaching timeoutTERM, then calling
106 // onUnkillable().
107 //
108 // SIGKILL is not used. It would merely kill the crunch-run supervisor
109 // and thereby make the docker container, arv-mount, etc. invisible to
110 // us without actually stopping them.
111 //
112 // Once Kill has been called, calling it again has no effect.
113 func (rr *remoteRunner) Kill(reason string) {
114         if rr.stopping {
115                 return
116         }
117         rr.stopping = true
118         rr.logger.WithField("Reason", reason).Info("killing crunch-run process")
119         go func() {
120                 termDeadline := time.Now().Add(rr.timeoutTERM)
121                 t := time.NewTicker(rr.timeoutSignal)
122                 defer t.Stop()
123                 for range t.C {
124                         switch {
125                         case rr.isClosed():
126                                 return
127                         case time.Now().After(termDeadline):
128                                 rr.logger.Debug("giving up")
129                                 rr.givenup = true
130                                 rr.onUnkillable(rr.uuid)
131                                 return
132                         default:
133                                 rr.kill(syscall.SIGTERM)
134                         }
135                 }
136         }()
137 }
138
139 func (rr *remoteRunner) kill(sig syscall.Signal) {
140         logger := rr.logger.WithField("Signal", int(sig))
141         logger.Info("sending signal")
142         cmd := fmt.Sprintf("crunch-run --kill %d %s", sig, rr.uuid)
143         if rr.remoteUser != "root" {
144                 cmd = "sudo " + cmd
145         }
146         stdout, stderr, err := rr.executor.Execute(nil, cmd, nil)
147         if err != nil {
148                 logger.WithFields(logrus.Fields{
149                         "stderr": string(stderr),
150                         "stdout": string(stdout),
151                         "error":  err,
152                 }).Info("kill attempt unsuccessful")
153                 return
154         }
155         rr.onKilled(rr.uuid)
156 }
157
158 func (rr *remoteRunner) isClosed() bool {
159         select {
160         case <-rr.closed:
161                 return true
162         default:
163                 return false
164         }
165 }