14807: Lengthen kill delays in stub driver to test term-kill-drain.
[arvados.git] / lib / dispatchcloud / worker / runner.go
1 package worker
2
3 import (
4         "bytes"
5         "encoding/json"
6         "fmt"
7         "syscall"
8         "time"
9
10         "git.curoverse.com/arvados.git/sdk/go/arvados"
11         "github.com/sirupsen/logrus"
12 )
13
14 // remoteRunner handles the starting and stopping of a crunch-run
15 // process on a remote machine.
16 type remoteRunner struct {
17         uuid          string
18         executor      Executor
19         arvClient     *arvados.Client
20         remoteUser    string
21         timeoutTERM   time.Duration
22         timeoutKILL   time.Duration
23         timeoutSignal time.Duration
24         onUnkillable  func(uuid string) // callback invoked when giving up on SIGKILL
25         onKilled      func(uuid string) // callback invoked when process exits after SIGTERM/SIGKILL
26         logger        logrus.FieldLogger
27
28         stopping bool          // true if Stop() has been called
29         sentKILL bool          // true if SIGKILL has been sent
30         closed   chan struct{} // channel is closed if Close() has been called
31 }
32
33 // newRemoteRunner returns a new remoteRunner. Caller should ensure
34 // Close() is called to release resources.
35 func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
36         rr := &remoteRunner{
37                 uuid:          uuid,
38                 executor:      wkr.executor,
39                 arvClient:     wkr.wp.arvClient,
40                 remoteUser:    wkr.instance.RemoteUser(),
41                 timeoutTERM:   wkr.wp.timeoutTERM,
42                 timeoutKILL:   wkr.wp.timeoutKILL,
43                 timeoutSignal: wkr.wp.timeoutSignal,
44                 onUnkillable:  wkr.onUnkillable,
45                 onKilled:      wkr.onKilled,
46                 logger:        wkr.logger.WithField("ContainerUUID", uuid),
47                 closed:        make(chan struct{}),
48         }
49         return rr
50 }
51
52 // Start a crunch-run process on the remote host.
53 //
54 // Start does not return any error encountered. The caller should
55 // assume the remote process _might_ have started, at least until it
56 // probes the worker and finds otherwise.
57 func (rr *remoteRunner) Start() {
58         env := map[string]string{
59                 "ARVADOS_API_HOST":  rr.arvClient.APIHost,
60                 "ARVADOS_API_TOKEN": rr.arvClient.AuthToken,
61         }
62         if rr.arvClient.Insecure {
63                 env["ARVADOS_API_HOST_INSECURE"] = "1"
64         }
65         envJSON, err := json.Marshal(env)
66         if err != nil {
67                 panic(err)
68         }
69         stdin := bytes.NewBuffer(envJSON)
70         cmd := "crunch-run --detach --stdin-env '" + rr.uuid + "'"
71         if rr.remoteUser != "root" {
72                 cmd = "sudo " + cmd
73         }
74         stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin)
75         if err != nil {
76                 rr.logger.WithField("stdout", string(stdout)).
77                         WithField("stderr", string(stderr)).
78                         WithError(err).
79                         Error("error starting crunch-run process")
80                 return
81         }
82         rr.logger.Info("crunch-run process started")
83 }
84
85 // Close abandons the remote process (if any) and releases
86 // resources. Close must not be called more than once.
87 func (rr *remoteRunner) Close() {
88         close(rr.closed)
89 }
90
91 // Kill starts a background task to kill the remote process,
92 // escalating from SIGTERM to SIGKILL to onUnkillable() according to
93 // the configured timeouts.
94 //
95 // Once Kill has been called, calling it again has no effect.
96 func (rr *remoteRunner) Kill(reason string) {
97         if rr.stopping {
98                 return
99         }
100         rr.stopping = true
101         rr.logger.WithField("Reason", reason).Info("killing crunch-run process")
102         go func() {
103                 termDeadline := time.Now().Add(rr.timeoutTERM)
104                 killDeadline := termDeadline.Add(rr.timeoutKILL)
105                 t := time.NewTicker(rr.timeoutSignal)
106                 defer t.Stop()
107                 for range t.C {
108                         switch {
109                         case rr.isClosed():
110                                 return
111                         case time.Now().After(killDeadline):
112                                 rr.onUnkillable(rr.uuid)
113                                 return
114                         case time.Now().After(termDeadline):
115                                 rr.sentKILL = true
116                                 rr.kill(syscall.SIGKILL)
117                         default:
118                                 rr.kill(syscall.SIGTERM)
119                         }
120                 }
121         }()
122 }
123
124 func (rr *remoteRunner) kill(sig syscall.Signal) {
125         logger := rr.logger.WithField("Signal", int(sig))
126         logger.Info("sending signal")
127         cmd := fmt.Sprintf("crunch-run --kill %d %s", sig, rr.uuid)
128         if rr.remoteUser != "root" {
129                 cmd = "sudo " + cmd
130         }
131         stdout, stderr, err := rr.executor.Execute(nil, cmd, nil)
132         if err != nil {
133                 logger.WithFields(logrus.Fields{
134                         "stderr": string(stderr),
135                         "stdout": string(stdout),
136                         "error":  err,
137                 }).Info("kill failed")
138                 return
139         }
140         rr.onKilled(rr.uuid)
141 }
142
143 func (rr *remoteRunner) isClosed() bool {
144         select {
145         case <-rr.closed:
146                 return true
147         default:
148                 return false
149         }
150 }