17170: Add "arvados-client shell" subcommand and backend support.
[arvados.git] / lib / dispatchcloud / worker / runner.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "encoding/json"
10         "fmt"
11         "net"
12         "syscall"
13         "time"
14
15         "github.com/sirupsen/logrus"
16 )
17
18 // remoteRunner handles the starting and stopping of a crunch-run
19 // process on a remote machine.
20 type remoteRunner struct {
21         uuid          string
22         executor      Executor
23         envJSON       json.RawMessage
24         runnerCmd     string
25         remoteUser    string
26         timeoutTERM   time.Duration
27         timeoutSignal time.Duration
28         onUnkillable  func(uuid string) // callback invoked when giving up on SIGTERM
29         onKilled      func(uuid string) // callback invoked when process exits after SIGTERM
30         logger        logrus.FieldLogger
31
32         stopping bool          // true if Stop() has been called
33         givenup  bool          // true if timeoutTERM has been reached
34         closed   chan struct{} // channel is closed if Close() has been called
35 }
36
37 // newRemoteRunner returns a new remoteRunner. Caller should ensure
38 // Close() is called to release resources.
39 func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
40         // Send the instance type record as a JSON doc so crunch-run
41         // can log it.
42         var instJSON bytes.Buffer
43         enc := json.NewEncoder(&instJSON)
44         enc.SetIndent("", "    ")
45         if err := enc.Encode(wkr.instType); err != nil {
46                 panic(err)
47         }
48         env := map[string]string{
49                 "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
50                 "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
51                 "InstanceType":      instJSON.String(),
52                 "GatewayAddress":    net.JoinHostPort(wkr.instance.Address(), "0"),
53                 "GatewayAuthSecret": wkr.wp.gatewayAuthSecret(uuid),
54         }
55         if wkr.wp.arvClient.Insecure {
56                 env["ARVADOS_API_HOST_INSECURE"] = "1"
57         }
58         envJSON, err := json.Marshal(env)
59         if err != nil {
60                 panic(err)
61         }
62         rr := &remoteRunner{
63                 uuid:          uuid,
64                 executor:      wkr.executor,
65                 envJSON:       envJSON,
66                 runnerCmd:     wkr.wp.runnerCmd,
67                 remoteUser:    wkr.instance.RemoteUser(),
68                 timeoutTERM:   wkr.wp.timeoutTERM,
69                 timeoutSignal: wkr.wp.timeoutSignal,
70                 onUnkillable:  wkr.onUnkillable,
71                 onKilled:      wkr.onKilled,
72                 logger:        wkr.logger.WithField("ContainerUUID", uuid),
73                 closed:        make(chan struct{}),
74         }
75         return rr
76 }
77
78 // Start a crunch-run process on the remote host.
79 //
80 // Start does not return any error encountered. The caller should
81 // assume the remote process _might_ have started, at least until it
82 // probes the worker and finds otherwise.
83 func (rr *remoteRunner) Start() {
84         cmd := rr.runnerCmd + " --detach --stdin-env '" + rr.uuid + "'"
85         if rr.remoteUser != "root" {
86                 cmd = "sudo " + cmd
87         }
88         stdin := bytes.NewBuffer(rr.envJSON)
89         stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin)
90         if err != nil {
91                 rr.logger.WithField("stdout", string(stdout)).
92                         WithField("stderr", string(stderr)).
93                         WithError(err).
94                         Error("error starting crunch-run process")
95                 return
96         }
97         rr.logger.Info("crunch-run process started")
98 }
99
100 // Close abandons the remote process (if any) and releases
101 // resources. Close must not be called more than once.
102 func (rr *remoteRunner) Close() {
103         close(rr.closed)
104 }
105
106 // Kill starts a background task to kill the remote process, first
107 // trying SIGTERM until reaching timeoutTERM, then calling
108 // onUnkillable().
109 //
110 // SIGKILL is not used. It would merely kill the crunch-run supervisor
111 // and thereby make the docker container, arv-mount, etc. invisible to
112 // us without actually stopping them.
113 //
114 // Once Kill has been called, calling it again has no effect.
115 func (rr *remoteRunner) Kill(reason string) {
116         if rr.stopping {
117                 return
118         }
119         rr.stopping = true
120         rr.logger.WithField("Reason", reason).Info("killing crunch-run process")
121         go func() {
122                 termDeadline := time.Now().Add(rr.timeoutTERM)
123                 t := time.NewTicker(rr.timeoutSignal)
124                 defer t.Stop()
125                 for range t.C {
126                         switch {
127                         case rr.isClosed():
128                                 return
129                         case time.Now().After(termDeadline):
130                                 rr.logger.Debug("giving up")
131                                 rr.givenup = true
132                                 rr.onUnkillable(rr.uuid)
133                                 return
134                         default:
135                                 rr.kill(syscall.SIGTERM)
136                         }
137                 }
138         }()
139 }
140
141 func (rr *remoteRunner) kill(sig syscall.Signal) {
142         logger := rr.logger.WithField("Signal", int(sig))
143         logger.Info("sending signal")
144         cmd := fmt.Sprintf(rr.runnerCmd+" --kill %d %s", sig, rr.uuid)
145         if rr.remoteUser != "root" {
146                 cmd = "sudo " + cmd
147         }
148         stdout, stderr, err := rr.executor.Execute(nil, cmd, nil)
149         if err != nil {
150                 logger.WithFields(logrus.Fields{
151                         "stderr": string(stderr),
152                         "stdout": string(stdout),
153                         "error":  err,
154                 }).Info("kill attempt unsuccessful")
155                 return
156         }
157         rr.onKilled(rr.uuid)
158 }
159
160 func (rr *remoteRunner) isClosed() bool {
161         select {
162         case <-rr.closed:
163                 return true
164         default:
165                 return false
166         }
167 }