projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Merge branch '20680-rolling-deploy' refs #20680
[arvados.git]
/
services
/
crunch-dispatch-local
/
crunch-dispatch-local.go
diff --git
a/services/crunch-dispatch-local/crunch-dispatch-local.go
b/services/crunch-dispatch-local/crunch-dispatch-local.go
index a3cb1341a4677e7ecdc7c03976da7483e47c1aa5..e45598189107261401930126cc1e04235211b798 100644
(file)
--- a/
services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/
services/crunch-dispatch-local/crunch-dispatch-local.go
@@
-17,6
+17,7
@@
import (
"syscall"
"time"
"syscall"
"time"
+ "git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
@@
-26,26
+27,19
@@
import (
var version = "dev"
var version = "dev"
-func main() {
- err := doMain()
- if err != nil {
- logrus.Fatalf("%q", err)
- }
-}
-
var (
runningCmds map[string]*exec.Cmd
runningCmdsMutex sync.Mutex
waitGroup sync.WaitGroup
var (
runningCmds map[string]*exec.Cmd
runningCmdsMutex sync.Mutex
waitGroup sync.WaitGroup
- crunchRunCommand
*
string
+ crunchRunCommand string
)
)
-func
doMain() error
{
-
l
ogger := logrus.StandardLogger()
+func
main()
{
+
baseL
ogger := logrus.StandardLogger()
if os.Getenv("DEBUG") != "" {
if os.Getenv("DEBUG") != "" {
-
l
ogger.SetLevel(logrus.DebugLevel)
+
baseL
ogger.SetLevel(logrus.DebugLevel)
}
}
-
l
ogger.Formatter = &logrus.JSONFormatter{
+
baseL
ogger.Formatter = &logrus.JSONFormatter{
TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00",
}
TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00",
}
@@
-56,7
+50,7
@@
func doMain() error {
10,
"Interval in seconds to poll for queued containers")
10,
"Interval in seconds to poll for queued containers")
- crunchRunCommand = flags.String(
+ flags.StringVar(&crunchRunCommand,
"crunch-run-command",
"/usr/bin/crunch-run",
"Crunch command to run container")
"crunch-run-command",
"/usr/bin/crunch-run",
"Crunch command to run container")
@@
-66,22
+60,29
@@
func doMain() error {
false,
"Print version information and exit.")
false,
"Print version information and exit.")
- // Parse args; omit the first arg which is the command name
- flags.Parse(os.Args[1:])
+ if ok, code := cmd.ParseFlags(flags, os.Args[0], os.Args[1:], "", os.Stderr); !ok {
+ os.Exit(code)
+ }
// Print version information if requested
if *getVersion {
fmt.Printf("crunch-dispatch-local %s\n", version)
// Print version information if requested
if *getVersion {
fmt.Printf("crunch-dispatch-local %s\n", version)
- return
nil
+ return
}
}
- loader := config.NewLoader(nil,
l
ogger)
+ loader := config.NewLoader(nil,
baseL
ogger)
cfg, err := loader.Load()
cfg, err := loader.Load()
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "error loading config: %s\n", err)
+ os.Exit(1)
+ }
cluster, err := cfg.GetCluster("")
if err != nil {
cluster, err := cfg.GetCluster("")
if err != nil {
- return fmt.Errorf("config error: %s", err)
+ fmt.Fprintf(os.Stderr, "config error: %s\n", err)
+ os.Exit(1)
}
}
+ logger := baseLogger.WithField("ClusterID", cluster.ClusterID)
logger.Printf("crunch-dispatch-local %s started", version)
runningCmds = make(map[string]*exec.Cmd)
logger.Printf("crunch-dispatch-local %s started", version)
runningCmds = make(map[string]*exec.Cmd)
@@
-101,7
+102,6
@@
func doMain() error {
if client.Insecure {
os.Setenv("ARVADOS_API_HOST_INSECURE", "1")
}
if client.Insecure {
os.Setenv("ARVADOS_API_HOST_INSECURE", "1")
}
- os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
} else {
logger.Warnf("Client credentials missing from config, so falling back on environment variables (deprecated).")
}
} else {
logger.Warnf("Client credentials missing from config, so falling back on environment variables (deprecated).")
}
@@
-109,7
+109,7
@@
func doMain() error {
arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
logger.Errorf("error making Arvados client: %v", err)
arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
logger.Errorf("error making Arvados client: %v", err)
- return err
+ os.Exit(1)
}
arv.Retries = 25
}
arv.Retries = 25
@@
-124,7
+124,8
@@
func doMain() error {
err = dispatcher.Run(ctx)
if err != nil {
err = dispatcher.Run(ctx)
if err != nil {
- return err
+ logger.Error(err)
+ return
}
c := make(chan os.Signal, 1)
}
c := make(chan os.Signal, 1)
@@
-144,8
+145,6
@@
func doMain() error {
// Wait for all running crunch jobs to complete / terminate
waitGroup.Wait()
// Wait for all running crunch jobs to complete / terminate
waitGroup.Wait()
-
- return nil
}
func startFunc(container arvados.Container, cmd *exec.Cmd) error {
}
func startFunc(container arvados.Container, cmd *exec.Cmd) error {
@@
-198,7
+197,7
@@
func (lr *LocalRun) run(dispatcher *dispatch.Dispatcher,
waitGroup.Add(1)
defer waitGroup.Done()
waitGroup.Add(1)
defer waitGroup.Done()
- cmd := exec.Command(
*
crunchRunCommand, "--runtime-engine="+lr.cluster.Containers.RuntimeEngine, uuid)
+ cmd := exec.Command(crunchRunCommand, "--runtime-engine="+lr.cluster.Containers.RuntimeEngine, uuid)
cmd.Stdin = nil
cmd.Stderr = os.Stderr
cmd.Stdout = os.Stderr
cmd.Stdin = nil
cmd.Stderr = os.Stderr
cmd.Stdout = os.Stderr
@@
-211,7
+210,7
@@
func (lr *LocalRun) run(dispatcher *dispatch.Dispatcher,
runningCmdsMutex.Lock()
if err := lr.startCmd(container, cmd); err != nil {
runningCmdsMutex.Unlock()
runningCmdsMutex.Lock()
if err := lr.startCmd(container, cmd); err != nil {
runningCmdsMutex.Unlock()
- dispatcher.Logger.Warnf("error starting %q for %s: %s",
*
crunchRunCommand, uuid, err)
+ dispatcher.Logger.Warnf("error starting %q for %s: %s", crunchRunCommand, uuid, err)
dispatcher.UpdateState(uuid, dispatch.Cancelled)
} else {
runningCmds[uuid] = cmd
dispatcher.UpdateState(uuid, dispatch.Cancelled)
} else {
runningCmds[uuid] = cmd
@@
-261,7
+260,7
@@
Finish:
}
if container.State == dispatch.Locked || container.State == dispatch.Running {
dispatcher.Logger.Warnf("after %q process termination, container state for %v is %q; updating it to %q",
}
if container.State == dispatch.Locked || container.State == dispatch.Running {
dispatcher.Logger.Warnf("after %q process termination, container state for %v is %q; updating it to %q",
-
*
crunchRunCommand, uuid, container.State, dispatch.Cancelled)
+ crunchRunCommand, uuid, container.State, dispatch.Cancelled)
dispatcher.UpdateState(uuid, dispatch.Cancelled)
}
dispatcher.UpdateState(uuid, dispatch.Cancelled)
}