X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/cd020c016106fbe844501c5f434c16f4def4e08d..4257184a0fd276af7e1741dda8a7468a30b4a9c6:/lib/dispatchcloud/worker/runner.go diff --git a/lib/dispatchcloud/worker/runner.go b/lib/dispatchcloud/worker/runner.go index bf1632a6a2..e819a6036b 100644 --- a/lib/dispatchcloud/worker/runner.go +++ b/lib/dispatchcloud/worker/runner.go @@ -1,3 +1,7 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package worker import ( @@ -7,7 +11,6 @@ import ( "syscall" "time" - "git.curoverse.com/arvados.git/sdk/go/arvados" "github.com/sirupsen/logrus" ) @@ -16,30 +19,48 @@ import ( type remoteRunner struct { uuid string executor Executor - arvClient *arvados.Client + envJSON json.RawMessage remoteUser string timeoutTERM time.Duration - timeoutKILL time.Duration timeoutSignal time.Duration - onUnkillable func(uuid string) // callback invoked when giving up on SIGKILL - onKilled func(uuid string) // callback invoked when process exits after SIGTERM/SIGKILL + onUnkillable func(uuid string) // callback invoked when giving up on SIGTERM + onKilled func(uuid string) // callback invoked when process exits after SIGTERM logger logrus.FieldLogger stopping bool // true if Stop() has been called - sentKILL bool // true if SIGKILL has been sent + givenup bool // true if timeoutTERM has been reached closed chan struct{} // channel is closed if Close() has been called } // newRemoteRunner returns a new remoteRunner. Caller should ensure // Close() is called to release resources. func newRemoteRunner(uuid string, wkr *worker) *remoteRunner { + // Send the instance type record as a JSON doc so crunch-run + // can log it. + var instJSON bytes.Buffer + enc := json.NewEncoder(&instJSON) + enc.SetIndent("", " ") + if err := enc.Encode(wkr.instType); err != nil { + panic(err) + } + env := map[string]string{ + "ARVADOS_API_HOST": wkr.wp.arvClient.APIHost, + "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken, + "InstanceType": instJSON.String(), + } + if wkr.wp.arvClient.Insecure { + env["ARVADOS_API_HOST_INSECURE"] = "1" + } + envJSON, err := json.Marshal(env) + if err != nil { + panic(err) + } rr := &remoteRunner{ uuid: uuid, executor: wkr.executor, - arvClient: wkr.wp.arvClient, + envJSON: envJSON, remoteUser: wkr.instance.RemoteUser(), timeoutTERM: wkr.wp.timeoutTERM, - timeoutKILL: wkr.wp.timeoutKILL, timeoutSignal: wkr.wp.timeoutSignal, onUnkillable: wkr.onUnkillable, onKilled: wkr.onKilled, @@ -55,22 +76,11 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner { // assume the remote process _might_ have started, at least until it // probes the worker and finds otherwise. func (rr *remoteRunner) Start() { - env := map[string]string{ - "ARVADOS_API_HOST": rr.arvClient.APIHost, - "ARVADOS_API_TOKEN": rr.arvClient.AuthToken, - } - if rr.arvClient.Insecure { - env["ARVADOS_API_HOST_INSECURE"] = "1" - } - envJSON, err := json.Marshal(env) - if err != nil { - panic(err) - } - stdin := bytes.NewBuffer(envJSON) cmd := "crunch-run --detach --stdin-env '" + rr.uuid + "'" if rr.remoteUser != "root" { cmd = "sudo " + cmd } + stdin := bytes.NewBuffer(rr.envJSON) stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin) if err != nil { rr.logger.WithField("stdout", string(stdout)). @@ -88,9 +98,13 @@ func (rr *remoteRunner) Close() { close(rr.closed) } -// Kill starts a background task to kill the remote process, -// escalating from SIGTERM to SIGKILL to onUnkillable() according to -// the configured timeouts. +// Kill starts a background task to kill the remote process, first +// trying SIGTERM until reaching timeoutTERM, then calling +// onUnkillable(). +// +// SIGKILL is not used. It would merely kill the crunch-run supervisor +// and thereby make the docker container, arv-mount, etc. invisible to +// us without actually stopping them. // // Once Kill has been called, calling it again has no effect. func (rr *remoteRunner) Kill(reason string) { @@ -101,19 +115,17 @@ func (rr *remoteRunner) Kill(reason string) { rr.logger.WithField("Reason", reason).Info("killing crunch-run process") go func() { termDeadline := time.Now().Add(rr.timeoutTERM) - killDeadline := termDeadline.Add(rr.timeoutKILL) t := time.NewTicker(rr.timeoutSignal) defer t.Stop() for range t.C { switch { case rr.isClosed(): return - case time.Now().After(killDeadline): + case time.Now().After(termDeadline): + rr.logger.Debug("giving up") + rr.givenup = true rr.onUnkillable(rr.uuid) return - case time.Now().After(termDeadline): - rr.sentKILL = true - rr.kill(syscall.SIGKILL) default: rr.kill(syscall.SIGTERM) } @@ -134,7 +146,7 @@ func (rr *remoteRunner) kill(sig syscall.Signal) { "stderr": string(stderr), "stdout": string(stdout), "error": err, - }).Info("kill failed") + }).Info("kill attempt unsuccessful") return } rr.onKilled(rr.uuid)