10 "git.curoverse.com/arvados.git/sdk/go/arvados"
11 "github.com/sirupsen/logrus"
14 // remoteRunner handles the starting and stopping of a crunch-run
15 // process on a remote machine.
16 type remoteRunner struct {
19 arvClient *arvados.Client
21 timeoutTERM time.Duration
22 timeoutKILL time.Duration
23 timeoutSignal time.Duration
24 onUnkillable func(uuid string) // callback invoked when giving up on SIGKILL
25 onKilled func(uuid string) // callback invoked when process exits after SIGTERM/SIGKILL
26 logger logrus.FieldLogger
28 stopping bool // true if Stop() has been called
29 sentKILL bool // true if SIGKILL has been sent
30 closed chan struct{} // channel is closed if Close() has been called
33 // newRemoteRunner returns a new remoteRunner. Caller should ensure
34 // Close() is called to release resources.
35 func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
38 executor: wkr.executor,
39 arvClient: wkr.wp.arvClient,
40 remoteUser: wkr.instance.RemoteUser(),
41 timeoutTERM: wkr.wp.timeoutTERM,
42 timeoutKILL: wkr.wp.timeoutKILL,
43 timeoutSignal: wkr.wp.timeoutSignal,
44 onUnkillable: wkr.onUnkillable,
45 onKilled: wkr.onKilled,
46 logger: wkr.logger.WithField("ContainerUUID", uuid),
47 closed: make(chan struct{}),
52 // Start a crunch-run process on the remote host.
54 // Start does not return any error encountered. The caller should
55 // assume the remote process _might_ have started, at least until it
56 // probes the worker and finds otherwise.
57 func (rr *remoteRunner) Start() {
58 env := map[string]string{
59 "ARVADOS_API_HOST": rr.arvClient.APIHost,
60 "ARVADOS_API_TOKEN": rr.arvClient.AuthToken,
62 if rr.arvClient.Insecure {
63 env["ARVADOS_API_HOST_INSECURE"] = "1"
65 envJSON, err := json.Marshal(env)
69 stdin := bytes.NewBuffer(envJSON)
70 cmd := "crunch-run --detach --stdin-env '" + rr.uuid + "'"
71 if rr.remoteUser != "root" {
74 stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin)
76 rr.logger.WithField("stdout", string(stdout)).
77 WithField("stderr", string(stderr)).
79 Error("error starting crunch-run process")
82 rr.logger.Info("crunch-run process started")
85 // Close abandons the remote process (if any) and releases
86 // resources. Close must not be called more than once.
87 func (rr *remoteRunner) Close() {
91 // Kill starts a background task to kill the remote process,
92 // escalating from SIGTERM to SIGKILL to onUnkillable() according to
93 // the configured timeouts.
95 // Once Kill has been called, calling it again has no effect.
96 func (rr *remoteRunner) Kill(reason string) {
101 rr.logger.WithField("Reason", reason).Info("killing crunch-run process")
103 termDeadline := time.Now().Add(rr.timeoutTERM)
104 killDeadline := termDeadline.Add(rr.timeoutKILL)
105 t := time.NewTicker(rr.timeoutSignal)
111 case time.Now().After(killDeadline):
112 rr.onUnkillable(rr.uuid)
114 case time.Now().After(termDeadline):
116 rr.kill(syscall.SIGKILL)
118 rr.kill(syscall.SIGTERM)
124 func (rr *remoteRunner) kill(sig syscall.Signal) {
125 logger := rr.logger.WithField("Signal", int(sig))
126 logger.Info("sending signal")
127 cmd := fmt.Sprintf("crunch-run --kill %d %s", sig, rr.uuid)
128 if rr.remoteUser != "root" {
131 stdout, stderr, err := rr.executor.Execute(nil, cmd, nil)
133 logger.WithFields(logrus.Fields{
134 "stderr": string(stderr),
135 "stdout": string(stdout),
137 }).Info("kill failed")
143 func (rr *remoteRunner) isClosed() bool {