63561874c9c5e570187048922addbbc4e4ece502
[arvados.git] / lib / dispatchcloud / worker / runner.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "encoding/json"
10         "fmt"
11         "net"
12         "strings"
13         "syscall"
14         "time"
15
16         "github.com/sirupsen/logrus"
17 )
18
19 // remoteRunner handles the starting and stopping of a crunch-run
20 // process on a remote machine.
21 type remoteRunner struct {
22         uuid          string
23         executor      Executor
24         envJSON       json.RawMessage
25         runnerCmd     string
26         runnerArgs    []string
27         remoteUser    string
28         timeoutTERM   time.Duration
29         timeoutSignal time.Duration
30         onUnkillable  func(uuid string) // callback invoked when giving up on SIGTERM
31         onKilled      func(uuid string) // callback invoked when process exits after SIGTERM
32         logger        logrus.FieldLogger
33
34         stopping bool          // true if Stop() has been called
35         givenup  bool          // true if timeoutTERM has been reached
36         closed   chan struct{} // channel is closed if Close() has been called
37 }
38
39 // newRemoteRunner returns a new remoteRunner. Caller should ensure
40 // Close() is called to release resources.
41 func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
42         // Send the instance type record as a JSON doc so crunch-run
43         // can log it.
44         var instJSON bytes.Buffer
45         enc := json.NewEncoder(&instJSON)
46         enc.SetIndent("", "    ")
47         if err := enc.Encode(wkr.instType); err != nil {
48                 panic(err)
49         }
50         env := map[string]string{
51                 "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
52                 "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
53                 "InstanceType":      instJSON.String(),
54                 "GatewayAddress":    net.JoinHostPort(wkr.instance.Address(), "0"),
55                 "GatewayAuthSecret": wkr.wp.gatewayAuthSecret(uuid),
56         }
57         if wkr.wp.arvClient.Insecure {
58                 env["ARVADOS_API_HOST_INSECURE"] = "1"
59         }
60         envJSON, err := json.Marshal(env)
61         if err != nil {
62                 panic(err)
63         }
64         rr := &remoteRunner{
65                 uuid:          uuid,
66                 executor:      wkr.executor,
67                 envJSON:       envJSON,
68                 runnerCmd:     wkr.wp.runnerCmd,
69                 runnerArgs:    wkr.wp.runnerArgs,
70                 remoteUser:    wkr.instance.RemoteUser(),
71                 timeoutTERM:   wkr.wp.timeoutTERM,
72                 timeoutSignal: wkr.wp.timeoutSignal,
73                 onUnkillable:  wkr.onUnkillable,
74                 onKilled:      wkr.onKilled,
75                 logger:        wkr.logger.WithField("ContainerUUID", uuid),
76                 closed:        make(chan struct{}),
77         }
78         return rr
79 }
80
81 // Start a crunch-run process on the remote host.
82 //
83 // Start does not return any error encountered. The caller should
84 // assume the remote process _might_ have started, at least until it
85 // probes the worker and finds otherwise.
86 func (rr *remoteRunner) Start() {
87         cmd := rr.runnerCmd + " --detach --stdin-env"
88         for _, arg := range rr.runnerArgs {
89                 cmd += " '" + strings.Replace(arg, "'", "'\\''", -1) + "'"
90         }
91         cmd += " '" + rr.uuid + "'"
92         if rr.remoteUser != "root" {
93                 cmd = "sudo " + cmd
94         }
95         stdin := bytes.NewBuffer(rr.envJSON)
96         stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin)
97         if err != nil {
98                 rr.logger.WithField("stdout", string(stdout)).
99                         WithField("stderr", string(stderr)).
100                         WithError(err).
101                         Error("error starting crunch-run process")
102                 return
103         }
104         rr.logger.Info("crunch-run process started")
105 }
106
107 // Close abandons the remote process (if any) and releases
108 // resources. Close must not be called more than once.
109 func (rr *remoteRunner) Close() {
110         close(rr.closed)
111 }
112
113 // Kill starts a background task to kill the remote process, first
114 // trying SIGTERM until reaching timeoutTERM, then calling
115 // onUnkillable().
116 //
117 // SIGKILL is not used. It would merely kill the crunch-run supervisor
118 // and thereby make the docker container, arv-mount, etc. invisible to
119 // us without actually stopping them.
120 //
121 // Once Kill has been called, calling it again has no effect.
122 func (rr *remoteRunner) Kill(reason string) {
123         if rr.stopping {
124                 return
125         }
126         rr.stopping = true
127         rr.logger.WithField("Reason", reason).Info("killing crunch-run process")
128         go func() {
129                 termDeadline := time.Now().Add(rr.timeoutTERM)
130                 t := time.NewTicker(rr.timeoutSignal)
131                 defer t.Stop()
132                 for range t.C {
133                         switch {
134                         case rr.isClosed():
135                                 return
136                         case time.Now().After(termDeadline):
137                                 rr.logger.Debug("giving up")
138                                 rr.givenup = true
139                                 rr.onUnkillable(rr.uuid)
140                                 return
141                         default:
142                                 rr.kill(syscall.SIGTERM)
143                         }
144                 }
145         }()
146 }
147
148 func (rr *remoteRunner) kill(sig syscall.Signal) {
149         logger := rr.logger.WithField("Signal", int(sig))
150         logger.Info("sending signal")
151         cmd := fmt.Sprintf(rr.runnerCmd+" --kill %d %s", sig, rr.uuid)
152         if rr.remoteUser != "root" {
153                 cmd = "sudo " + cmd
154         }
155         stdout, stderr, err := rr.executor.Execute(nil, cmd, nil)
156         if err != nil {
157                 logger.WithFields(logrus.Fields{
158                         "stderr": string(stderr),
159                         "stdout": string(stdout),
160                         "error":  err,
161                 }).Info("kill attempt unsuccessful")
162                 return
163         }
164         rr.onKilled(rr.uuid)
165 }
166
167 func (rr *remoteRunner) isClosed() bool {
168         select {
169         case <-rr.closed:
170                 return true
171         default:
172                 return false
173         }
174 }