X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/fb429aa6a8dd1d28d08038abd8de8b9206a1d51e..HEAD:/lib/lsf/dispatch.go diff --git a/lib/lsf/dispatch.go b/lib/lsf/dispatch.go index 537d52a072..897e5803f2 100644 --- a/lib/lsf/dispatch.go +++ b/lib/lsf/dispatch.go @@ -6,6 +6,8 @@ package lsf import ( "context" + "crypto/hmac" + "crypto/sha256" "errors" "fmt" "math" @@ -16,6 +18,8 @@ import ( "time" "git.arvados.org/arvados.git/lib/cmd" + "git.arvados.org/arvados.git/lib/controller/dblock" + "git.arvados.org/arvados.git/lib/ctrlctx" "git.arvados.org/arvados.git/lib/dispatchcloud" "git.arvados.org/arvados.git/lib/service" "git.arvados.org/arvados.git/sdk/go/arvados" @@ -56,6 +60,7 @@ type dispatcher struct { Registry *prometheus.Registry logger logrus.FieldLogger + dbConnector ctrlctx.DBConnector lsfcli lsfcli lsfqueue lsfqueue arvDispatcher *dispatch.Dispatcher @@ -71,7 +76,9 @@ type dispatcher struct { func (disp *dispatcher) Start() { disp.initOnce.Do(func() { disp.init() + dblock.Dispatch.Lock(context.Background(), disp.dbConnector.GetDB) go func() { + defer dblock.Dispatch.Unlock() disp.checkLsfQueueForOrphans() err := disp.arvDispatcher.Run(disp.Context) if err != nil { @@ -119,10 +126,11 @@ func (disp *dispatcher) init() { disp.lsfcli.logger = disp.logger disp.lsfqueue = lsfqueue{ logger: disp.logger, - period: time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval), + period: disp.Cluster.Containers.CloudVMs.PollInterval.Duration(), lsfcli: &disp.lsfcli, } disp.ArvClient.AuthToken = disp.AuthToken + disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.Cluster.PostgreSQL} disp.stop = make(chan struct{}, 1) disp.stopped = make(chan struct{}) @@ -168,6 +176,19 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain if ctr.State != dispatch.Locked { // already started by prior invocation } else if _, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok { + if _, err := dispatchcloud.ChooseInstanceType(disp.Cluster, &ctr); errors.As(err, &dispatchcloud.ConstraintsNotSatisfiableError{}) { + err := disp.arvDispatcher.Arv.Update("containers", ctr.UUID, arvadosclient.Dict{ + "container": map[string]interface{}{ + "runtime_status": map[string]string{ + "error": err.Error(), + }, + }, + }, nil) + if err != nil { + return fmt.Errorf("error setting runtime_status on %s: %s", ctr.UUID, err) + } + return disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled) + } disp.logger.Printf("Submitting container %s to LSF", ctr.UUID) cmd := []string{disp.Cluster.Containers.CrunchRunCommand} cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine) @@ -182,9 +203,8 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain defer disp.logger.Printf("Done monitoring container %s", ctr.UUID) go func(uuid string) { - cancelled := false for ctx.Err() == nil { - qent, ok := disp.lsfqueue.Lookup(uuid) + _, ok := disp.lsfqueue.Lookup(uuid) if !ok { // If the container disappears from // the lsf queue, there is no point in @@ -194,25 +214,6 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain cancel() return } - if !cancelled && qent.Stat == "PEND" && strings.Contains(qent.PendReason, "There are no suitable hosts for the job") { - disp.logger.Printf("container %s: %s", uuid, qent.PendReason) - err := disp.arvDispatcher.Arv.Update("containers", uuid, arvadosclient.Dict{ - "container": map[string]interface{}{ - "runtime_status": map[string]string{ - "error": qent.PendReason, - }, - }, - }, nil) - if err != nil { - disp.logger.Printf("error setting runtime_status on %s: %s", uuid, err) - continue // retry - } - err = disp.arvDispatcher.UpdateState(uuid, dispatch.Cancelled) - if err != nil { - continue // retry (UpdateState() already logged the error) - } - cancelled = true - } } }(ctr.UUID) @@ -256,7 +257,7 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain // Try "bkill" every few seconds until the LSF job disappears // from the queue. - ticker := time.NewTicker(5 * time.Second) + ticker := time.NewTicker(disp.Cluster.Containers.CloudVMs.PollInterval.Duration() / 2) defer ticker.Stop() for qent, ok := disp.lsfqueue.Lookup(ctr.UUID); ok; _, ok = disp.lsfqueue.Lookup(ctr.UUID) { err := disp.lsfcli.Bkill(qent.ID) @@ -274,7 +275,12 @@ func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []s var crArgs []string crArgs = append(crArgs, crunchRunCommand...) crArgs = append(crArgs, container.UUID) - crScript := execScript(crArgs) + + h := hmac.New(sha256.New, []byte(disp.Cluster.SystemRootToken)) + fmt.Fprint(h, container.UUID) + authsecret := fmt.Sprintf("%x", h.Sum(nil)) + + crScript := execScript(crArgs, map[string]string{"GatewayAuthSecret": authsecret}) bsubArgs, err := disp.bsubArgs(container) if err != nil { @@ -300,17 +306,41 @@ func (disp *dispatcher) bsubArgs(container arvados.Container) ([]string, error) container.RuntimeConstraints.KeepCacheRAM+ int64(disp.Cluster.Containers.ReserveExtraRAM)) / 1048576)) + maxruntime := time.Duration(container.SchedulingParameters.MaxRunTime) * time.Second + if maxruntime == 0 { + maxruntime = disp.Cluster.Containers.LSF.MaxRunTimeDefault.Duration() + } + if maxruntime > 0 { + maxruntime += disp.Cluster.Containers.LSF.MaxRunTimeOverhead.Duration() + } + maxrunminutes := int64(math.Ceil(float64(maxruntime.Seconds()) / 60)) + repl := map[string]string{ "%%": "%", "%C": fmt.Sprintf("%d", vcpus), "%M": fmt.Sprintf("%d", mem), "%T": fmt.Sprintf("%d", tmp), "%U": container.UUID, + "%G": fmt.Sprintf("%d", container.RuntimeConstraints.CUDA.DeviceCount), + "%W": fmt.Sprintf("%d", maxrunminutes), } re := regexp.MustCompile(`%.`) var substitutionErrors string - for _, a := range disp.Cluster.Containers.LSF.BsubArgumentsList { + argumentTemplate := disp.Cluster.Containers.LSF.BsubArgumentsList + if container.RuntimeConstraints.CUDA.DeviceCount > 0 { + argumentTemplate = append(argumentTemplate, disp.Cluster.Containers.LSF.BsubCUDAArguments...) + } + for idx, a := range argumentTemplate { + if idx > 0 && (argumentTemplate[idx-1] == "-W" || argumentTemplate[idx-1] == "-We") && a == "%W" && maxrunminutes == 0 { + // LSF docs don't specify an argument to "-W" + // or "-We" that indicates "unknown", so + // instead we drop the "-W %W" part of the + // command line entirely when max runtime is + // unknown. + args = args[:len(args)-1] + continue + } args = append(args, re.ReplaceAllStringFunc(a, func(s string) string { subst := repl[s] if len(subst) == 0 { @@ -348,8 +378,14 @@ func (disp *dispatcher) checkLsfQueueForOrphans() { } } -func execScript(args []string) []byte { - s := "#!/bin/sh\nexec" +func execScript(args []string, env map[string]string) []byte { + s := "#!/bin/sh\n" + for k, v := range env { + s += k + `='` + s += strings.Replace(v, `'`, `'\''`, -1) + s += `' ` + } + s += `exec` for _, w := range args { s += ` '` s += strings.Replace(w, `'`, `'\''`, -1)