Bump loofah from 2.2.3 to 2.3.1 in /apps/workbench
[arvados.git] / lib / dispatchcloud / worker / runner.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "encoding/json"
10         "fmt"
11         "syscall"
12         "time"
13
14         "github.com/sirupsen/logrus"
15 )
16
17 // remoteRunner handles the starting and stopping of a crunch-run
18 // process on a remote machine.
19 type remoteRunner struct {
20         uuid          string
21         executor      Executor
22         envJSON       json.RawMessage
23         remoteUser    string
24         timeoutTERM   time.Duration
25         timeoutSignal time.Duration
26         onUnkillable  func(uuid string) // callback invoked when giving up on SIGTERM
27         onKilled      func(uuid string) // callback invoked when process exits after SIGTERM
28         logger        logrus.FieldLogger
29
30         stopping bool          // true if Stop() has been called
31         givenup  bool          // true if timeoutTERM has been reached
32         closed   chan struct{} // channel is closed if Close() has been called
33 }
34
35 // newRemoteRunner returns a new remoteRunner. Caller should ensure
36 // Close() is called to release resources.
37 func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
38         // Send the instance type record as a JSON doc so crunch-run
39         // can log it.
40         var instJSON bytes.Buffer
41         enc := json.NewEncoder(&instJSON)
42         enc.SetIndent("", "    ")
43         if err := enc.Encode(wkr.instType); err != nil {
44                 panic(err)
45         }
46         env := map[string]string{
47                 "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
48                 "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
49                 "InstanceType":      instJSON.String(),
50         }
51         if wkr.wp.arvClient.Insecure {
52                 env["ARVADOS_API_HOST_INSECURE"] = "1"
53         }
54         envJSON, err := json.Marshal(env)
55         if err != nil {
56                 panic(err)
57         }
58         rr := &remoteRunner{
59                 uuid:          uuid,
60                 executor:      wkr.executor,
61                 envJSON:       envJSON,
62                 remoteUser:    wkr.instance.RemoteUser(),
63                 timeoutTERM:   wkr.wp.timeoutTERM,
64                 timeoutSignal: wkr.wp.timeoutSignal,
65                 onUnkillable:  wkr.onUnkillable,
66                 onKilled:      wkr.onKilled,
67                 logger:        wkr.logger.WithField("ContainerUUID", uuid),
68                 closed:        make(chan struct{}),
69         }
70         return rr
71 }
72
73 // Start a crunch-run process on the remote host.
74 //
75 // Start does not return any error encountered. The caller should
76 // assume the remote process _might_ have started, at least until it
77 // probes the worker and finds otherwise.
78 func (rr *remoteRunner) Start() {
79         cmd := "crunch-run --detach --stdin-env '" + rr.uuid + "'"
80         if rr.remoteUser != "root" {
81                 cmd = "sudo " + cmd
82         }
83         stdin := bytes.NewBuffer(rr.envJSON)
84         stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin)
85         if err != nil {
86                 rr.logger.WithField("stdout", string(stdout)).
87                         WithField("stderr", string(stderr)).
88                         WithError(err).
89                         Error("error starting crunch-run process")
90                 return
91         }
92         rr.logger.Info("crunch-run process started")
93 }
94
95 // Close abandons the remote process (if any) and releases
96 // resources. Close must not be called more than once.
97 func (rr *remoteRunner) Close() {
98         close(rr.closed)
99 }
100
101 // Kill starts a background task to kill the remote process, first
102 // trying SIGTERM until reaching timeoutTERM, then calling
103 // onUnkillable().
104 //
105 // SIGKILL is not used. It would merely kill the crunch-run supervisor
106 // and thereby make the docker container, arv-mount, etc. invisible to
107 // us without actually stopping them.
108 //
109 // Once Kill has been called, calling it again has no effect.
110 func (rr *remoteRunner) Kill(reason string) {
111         if rr.stopping {
112                 return
113         }
114         rr.stopping = true
115         rr.logger.WithField("Reason", reason).Info("killing crunch-run process")
116         go func() {
117                 termDeadline := time.Now().Add(rr.timeoutTERM)
118                 t := time.NewTicker(rr.timeoutSignal)
119                 defer t.Stop()
120                 for range t.C {
121                         switch {
122                         case rr.isClosed():
123                                 return
124                         case time.Now().After(termDeadline):
125                                 rr.logger.Debug("giving up")
126                                 rr.givenup = true
127                                 rr.onUnkillable(rr.uuid)
128                                 return
129                         default:
130                                 rr.kill(syscall.SIGTERM)
131                         }
132                 }
133         }()
134 }
135
136 func (rr *remoteRunner) kill(sig syscall.Signal) {
137         logger := rr.logger.WithField("Signal", int(sig))
138         logger.Info("sending signal")
139         cmd := fmt.Sprintf("crunch-run --kill %d %s", sig, rr.uuid)
140         if rr.remoteUser != "root" {
141                 cmd = "sudo " + cmd
142         }
143         stdout, stderr, err := rr.executor.Execute(nil, cmd, nil)
144         if err != nil {
145                 logger.WithFields(logrus.Fields{
146                         "stderr": string(stderr),
147                         "stdout": string(stdout),
148                         "error":  err,
149                 }).Info("kill attempt unsuccessful")
150                 return
151         }
152         rr.onKilled(rr.uuid)
153 }
154
155 func (rr *remoteRunner) isClosed() bool {
156         select {
157         case <-rr.closed:
158                 return true
159         default:
160                 return false
161         }
162 }