Merge branch '20235-probe-after-upgrade'
[arvados.git] / lib / dispatchcloud / worker / runner.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "encoding/json"
10         "fmt"
11         "net"
12         "strings"
13         "syscall"
14         "time"
15
16         "git.arvados.org/arvados.git/lib/crunchrun"
17         "github.com/sirupsen/logrus"
18 )
19
20 // remoteRunner handles the starting and stopping of a crunch-run
21 // process on a remote machine.
22 type remoteRunner struct {
23         uuid          string
24         executor      Executor
25         configJSON    json.RawMessage
26         runnerCmd     string
27         runnerArgs    []string
28         remoteUser    string
29         timeoutTERM   time.Duration
30         timeoutSignal time.Duration
31         onUnkillable  func(uuid string) // callback invoked when giving up on SIGTERM
32         onKilled      func(uuid string) // callback invoked when process exits after SIGTERM
33         logger        logrus.FieldLogger
34
35         stopping bool          // true if Stop() has been called
36         givenup  bool          // true if timeoutTERM has been reached
37         closed   chan struct{} // channel is closed if Close() has been called
38 }
39
40 // newRemoteRunner returns a new remoteRunner. Caller should ensure
41 // Close() is called to release resources.
42 func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
43         // Send the instance type record as a JSON doc so crunch-run
44         // can log it.
45         var instJSON bytes.Buffer
46         enc := json.NewEncoder(&instJSON)
47         enc.SetIndent("", "    ")
48         if err := enc.Encode(wkr.instType); err != nil {
49                 panic(err)
50         }
51         var configData crunchrun.ConfigData
52         configData.Env = map[string]string{
53                 "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
54                 "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
55                 "InstanceType":      instJSON.String(),
56                 "GatewayAddress":    net.JoinHostPort(wkr.instance.Address(), "0"),
57                 "GatewayAuthSecret": wkr.wp.gatewayAuthSecret(uuid),
58         }
59         if wkr.wp.arvClient.Insecure {
60                 configData.Env["ARVADOS_API_HOST_INSECURE"] = "1"
61         }
62         if bufs := wkr.wp.cluster.Containers.LocalKeepBlobBuffersPerVCPU; bufs > 0 {
63                 configData.Cluster = wkr.wp.cluster
64                 configData.KeepBuffers = bufs * wkr.instType.VCPUs
65         }
66         if wkr.wp.cluster.Containers.CloudVMs.Driver == "ec2" && wkr.instType.Preemptible {
67                 configData.EC2SpotCheck = true
68         }
69         configJSON, err := json.Marshal(configData)
70         if err != nil {
71                 panic(err)
72         }
73         rr := &remoteRunner{
74                 uuid:          uuid,
75                 executor:      wkr.executor,
76                 configJSON:    configJSON,
77                 runnerCmd:     wkr.wp.runnerCmd,
78                 runnerArgs:    wkr.wp.runnerArgs,
79                 remoteUser:    wkr.instance.RemoteUser(),
80                 timeoutTERM:   wkr.wp.timeoutTERM,
81                 timeoutSignal: wkr.wp.timeoutSignal,
82                 onUnkillable:  wkr.onUnkillable,
83                 onKilled:      wkr.onKilled,
84                 logger:        wkr.logger.WithField("ContainerUUID", uuid),
85                 closed:        make(chan struct{}),
86         }
87         return rr
88 }
89
90 // Start a crunch-run process on the remote host.
91 //
92 // Start does not return any error encountered. The caller should
93 // assume the remote process _might_ have started, at least until it
94 // probes the worker and finds otherwise.
95 func (rr *remoteRunner) Start() {
96         cmd := rr.runnerCmd + " --detach --stdin-config"
97         for _, arg := range rr.runnerArgs {
98                 cmd += " '" + strings.Replace(arg, "'", "'\\''", -1) + "'"
99         }
100         cmd += " '" + rr.uuid + "'"
101         if rr.remoteUser != "root" {
102                 cmd = "sudo " + cmd
103         }
104         stdin := bytes.NewBuffer(rr.configJSON)
105         stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin)
106         if err != nil {
107                 rr.logger.WithField("stdout", string(stdout)).
108                         WithField("stderr", string(stderr)).
109                         WithError(err).
110                         Error("error starting crunch-run process")
111                 return
112         }
113         rr.logger.Info("crunch-run process started")
114 }
115
116 // Close abandons the remote process (if any) and releases
117 // resources. Close must not be called more than once.
118 func (rr *remoteRunner) Close() {
119         close(rr.closed)
120 }
121
122 // Kill starts a background task to kill the remote process, first
123 // trying SIGTERM until reaching timeoutTERM, then calling
124 // onUnkillable().
125 //
126 // SIGKILL is not used. It would merely kill the crunch-run supervisor
127 // and thereby make the docker container, arv-mount, etc. invisible to
128 // us without actually stopping them.
129 //
130 // Once Kill has been called, calling it again has no effect.
131 func (rr *remoteRunner) Kill(reason string) {
132         if rr.stopping {
133                 return
134         }
135         rr.stopping = true
136         rr.logger.WithField("Reason", reason).Info("killing crunch-run process")
137         go func() {
138                 termDeadline := time.Now().Add(rr.timeoutTERM)
139                 t := time.NewTicker(rr.timeoutSignal)
140                 defer t.Stop()
141                 for range t.C {
142                         switch {
143                         case rr.isClosed():
144                                 return
145                         case time.Now().After(termDeadline):
146                                 rr.logger.Debug("giving up")
147                                 rr.givenup = true
148                                 rr.onUnkillable(rr.uuid)
149                                 return
150                         default:
151                                 rr.kill(syscall.SIGTERM)
152                         }
153                 }
154         }()
155 }
156
157 func (rr *remoteRunner) kill(sig syscall.Signal) {
158         logger := rr.logger.WithField("Signal", int(sig))
159         logger.Info("sending signal")
160         cmd := fmt.Sprintf(rr.runnerCmd+" --kill %d %s", sig, rr.uuid)
161         if rr.remoteUser != "root" {
162                 cmd = "sudo " + cmd
163         }
164         stdout, stderr, err := rr.executor.Execute(nil, cmd, nil)
165         if err != nil {
166                 logger.WithFields(logrus.Fields{
167                         "stderr": string(stderr),
168                         "stdout": string(stdout),
169                         "error":  err,
170                 }).Info("kill attempt unsuccessful")
171                 return
172         }
173         rr.onKilled(rr.uuid)
174 }
175
176 func (rr *remoteRunner) isClosed() bool {
177         select {
178         case <-rr.closed:
179                 return true
180         default:
181                 return false
182         }
183 }