47521213427610a531fa269df32b046b17ba72aa
[arvados.git] / lib / dispatchcloud / worker / runner.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "encoding/json"
10         "fmt"
11         "syscall"
12         "time"
13
14         "github.com/sirupsen/logrus"
15 )
16
17 // remoteRunner handles the starting and stopping of a crunch-run
18 // process on a remote machine.
19 type remoteRunner struct {
20         uuid          string
21         executor      Executor
22         envJSON       json.RawMessage
23         runnerCmd     string
24         remoteUser    string
25         timeoutTERM   time.Duration
26         timeoutSignal time.Duration
27         onUnkillable  func(uuid string) // callback invoked when giving up on SIGTERM
28         onKilled      func(uuid string) // callback invoked when process exits after SIGTERM
29         logger        logrus.FieldLogger
30
31         stopping bool          // true if Stop() has been called
32         givenup  bool          // true if timeoutTERM has been reached
33         closed   chan struct{} // channel is closed if Close() has been called
34 }
35
36 // newRemoteRunner returns a new remoteRunner. Caller should ensure
37 // Close() is called to release resources.
38 func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
39         // Send the instance type record as a JSON doc so crunch-run
40         // can log it.
41         var instJSON bytes.Buffer
42         enc := json.NewEncoder(&instJSON)
43         enc.SetIndent("", "    ")
44         if err := enc.Encode(wkr.instType); err != nil {
45                 panic(err)
46         }
47         env := map[string]string{
48                 "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
49                 "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
50                 "InstanceType":      instJSON.String(),
51         }
52         if wkr.wp.arvClient.Insecure {
53                 env["ARVADOS_API_HOST_INSECURE"] = "1"
54         }
55         envJSON, err := json.Marshal(env)
56         if err != nil {
57                 panic(err)
58         }
59         rr := &remoteRunner{
60                 uuid:          uuid,
61                 executor:      wkr.executor,
62                 envJSON:       envJSON,
63                 runnerCmd:     wkr.wp.runnerCmd,
64                 remoteUser:    wkr.instance.RemoteUser(),
65                 timeoutTERM:   wkr.wp.timeoutTERM,
66                 timeoutSignal: wkr.wp.timeoutSignal,
67                 onUnkillable:  wkr.onUnkillable,
68                 onKilled:      wkr.onKilled,
69                 logger:        wkr.logger.WithField("ContainerUUID", uuid),
70                 closed:        make(chan struct{}),
71         }
72         return rr
73 }
74
75 // Start a crunch-run process on the remote host.
76 //
77 // Start does not return any error encountered. The caller should
78 // assume the remote process _might_ have started, at least until it
79 // probes the worker and finds otherwise.
80 func (rr *remoteRunner) Start() {
81         cmd := rr.runnerCmd + " --detach --stdin-env '" + rr.uuid + "'"
82         if rr.remoteUser != "root" {
83                 cmd = "sudo " + cmd
84         }
85         stdin := bytes.NewBuffer(rr.envJSON)
86         stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin)
87         if err != nil {
88                 rr.logger.WithField("stdout", string(stdout)).
89                         WithField("stderr", string(stderr)).
90                         WithError(err).
91                         Error("error starting crunch-run process")
92                 return
93         }
94         rr.logger.Info("crunch-run process started")
95 }
96
97 // Close abandons the remote process (if any) and releases
98 // resources. Close must not be called more than once.
99 func (rr *remoteRunner) Close() {
100         close(rr.closed)
101 }
102
103 // Kill starts a background task to kill the remote process, first
104 // trying SIGTERM until reaching timeoutTERM, then calling
105 // onUnkillable().
106 //
107 // SIGKILL is not used. It would merely kill the crunch-run supervisor
108 // and thereby make the docker container, arv-mount, etc. invisible to
109 // us without actually stopping them.
110 //
111 // Once Kill has been called, calling it again has no effect.
112 func (rr *remoteRunner) Kill(reason string) {
113         if rr.stopping {
114                 return
115         }
116         rr.stopping = true
117         rr.logger.WithField("Reason", reason).Info("killing crunch-run process")
118         go func() {
119                 termDeadline := time.Now().Add(rr.timeoutTERM)
120                 t := time.NewTicker(rr.timeoutSignal)
121                 defer t.Stop()
122                 for range t.C {
123                         switch {
124                         case rr.isClosed():
125                                 return
126                         case time.Now().After(termDeadline):
127                                 rr.logger.Debug("giving up")
128                                 rr.givenup = true
129                                 rr.onUnkillable(rr.uuid)
130                                 return
131                         default:
132                                 rr.kill(syscall.SIGTERM)
133                         }
134                 }
135         }()
136 }
137
138 func (rr *remoteRunner) kill(sig syscall.Signal) {
139         logger := rr.logger.WithField("Signal", int(sig))
140         logger.Info("sending signal")
141         cmd := fmt.Sprintf(rr.runnerCmd+" --kill %d %s", sig, rr.uuid)
142         if rr.remoteUser != "root" {
143                 cmd = "sudo " + cmd
144         }
145         stdout, stderr, err := rr.executor.Execute(nil, cmd, nil)
146         if err != nil {
147                 logger.WithFields(logrus.Fields{
148                         "stderr": string(stderr),
149                         "stdout": string(stdout),
150                         "error":  err,
151                 }).Info("kill attempt unsuccessful")
152                 return
153         }
154         rr.onKilled(rr.uuid)
155 }
156
157 func (rr *remoteRunner) isClosed() bool {
158         select {
159         case <-rr.closed:
160                 return true
161         default:
162                 return false
163         }
164 }