Merge branch 'master' into 14669-java-sdk-v2
[arvados.git] / lib / dispatchcloud / worker / runner.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "bytes"
9         "encoding/json"
10         "fmt"
11         "syscall"
12         "time"
13
14         "git.curoverse.com/arvados.git/sdk/go/arvados"
15         "github.com/sirupsen/logrus"
16 )
17
18 // remoteRunner handles the starting and stopping of a crunch-run
19 // process on a remote machine.
20 type remoteRunner struct {
21         uuid          string
22         executor      Executor
23         arvClient     *arvados.Client
24         remoteUser    string
25         timeoutTERM   time.Duration
26         timeoutSignal time.Duration
27         onUnkillable  func(uuid string) // callback invoked when giving up on SIGTERM
28         onKilled      func(uuid string) // callback invoked when process exits after SIGTERM
29         logger        logrus.FieldLogger
30
31         stopping bool          // true if Stop() has been called
32         givenup  bool          // true if timeoutTERM has been reached
33         closed   chan struct{} // channel is closed if Close() has been called
34 }
35
36 // newRemoteRunner returns a new remoteRunner. Caller should ensure
37 // Close() is called to release resources.
38 func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
39         rr := &remoteRunner{
40                 uuid:          uuid,
41                 executor:      wkr.executor,
42                 arvClient:     wkr.wp.arvClient,
43                 remoteUser:    wkr.instance.RemoteUser(),
44                 timeoutTERM:   wkr.wp.timeoutTERM,
45                 timeoutSignal: wkr.wp.timeoutSignal,
46                 onUnkillable:  wkr.onUnkillable,
47                 onKilled:      wkr.onKilled,
48                 logger:        wkr.logger.WithField("ContainerUUID", uuid),
49                 closed:        make(chan struct{}),
50         }
51         return rr
52 }
53
54 // Start a crunch-run process on the remote host.
55 //
56 // Start does not return any error encountered. The caller should
57 // assume the remote process _might_ have started, at least until it
58 // probes the worker and finds otherwise.
59 func (rr *remoteRunner) Start() {
60         env := map[string]string{
61                 "ARVADOS_API_HOST":  rr.arvClient.APIHost,
62                 "ARVADOS_API_TOKEN": rr.arvClient.AuthToken,
63         }
64         if rr.arvClient.Insecure {
65                 env["ARVADOS_API_HOST_INSECURE"] = "1"
66         }
67         envJSON, err := json.Marshal(env)
68         if err != nil {
69                 panic(err)
70         }
71         stdin := bytes.NewBuffer(envJSON)
72         cmd := "crunch-run --detach --stdin-env '" + rr.uuid + "'"
73         if rr.remoteUser != "root" {
74                 cmd = "sudo " + cmd
75         }
76         stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin)
77         if err != nil {
78                 rr.logger.WithField("stdout", string(stdout)).
79                         WithField("stderr", string(stderr)).
80                         WithError(err).
81                         Error("error starting crunch-run process")
82                 return
83         }
84         rr.logger.Info("crunch-run process started")
85 }
86
87 // Close abandons the remote process (if any) and releases
88 // resources. Close must not be called more than once.
89 func (rr *remoteRunner) Close() {
90         close(rr.closed)
91 }
92
93 // Kill starts a background task to kill the remote process, first
94 // trying SIGTERM until reaching timeoutTERM, then calling
95 // onUnkillable().
96 //
97 // SIGKILL is not used. It would merely kill the crunch-run supervisor
98 // and thereby make the docker container, arv-mount, etc. invisible to
99 // us without actually stopping them.
100 //
101 // Once Kill has been called, calling it again has no effect.
102 func (rr *remoteRunner) Kill(reason string) {
103         if rr.stopping {
104                 return
105         }
106         rr.stopping = true
107         rr.logger.WithField("Reason", reason).Info("killing crunch-run process")
108         go func() {
109                 termDeadline := time.Now().Add(rr.timeoutTERM)
110                 t := time.NewTicker(rr.timeoutSignal)
111                 defer t.Stop()
112                 for range t.C {
113                         switch {
114                         case rr.isClosed():
115                                 return
116                         case time.Now().After(termDeadline):
117                                 rr.logger.Debug("giving up")
118                                 rr.givenup = true
119                                 rr.onUnkillable(rr.uuid)
120                                 return
121                         default:
122                                 rr.kill(syscall.SIGTERM)
123                         }
124                 }
125         }()
126 }
127
128 func (rr *remoteRunner) kill(sig syscall.Signal) {
129         logger := rr.logger.WithField("Signal", int(sig))
130         logger.Info("sending signal")
131         cmd := fmt.Sprintf("crunch-run --kill %d %s", sig, rr.uuid)
132         if rr.remoteUser != "root" {
133                 cmd = "sudo " + cmd
134         }
135         stdout, stderr, err := rr.executor.Execute(nil, cmd, nil)
136         if err != nil {
137                 logger.WithFields(logrus.Fields{
138                         "stderr": string(stderr),
139                         "stdout": string(stdout),
140                         "error":  err,
141                 }).Info("kill attempt unsuccessful")
142                 return
143         }
144         rr.onKilled(rr.uuid)
145 }
146
147 func (rr *remoteRunner) isClosed() bool {
148         select {
149         case <-rr.closed:
150                 return true
151         default:
152                 return false
153         }
154 }