X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/3b63632698de9868a501191e8989f14c23e4e743..65b12213f740b117fb14822bce0dbb415257c355:/lib/lsf/dispatch.go diff --git a/lib/lsf/dispatch.go b/lib/lsf/dispatch.go index b7032dc735..d1408d23cb 100644 --- a/lib/lsf/dispatch.go +++ b/lib/lsf/dispatch.go @@ -5,8 +5,9 @@ package lsf import ( - "bytes" "context" + "crypto/hmac" + "crypto/sha256" "errors" "fmt" "math" @@ -17,6 +18,8 @@ import ( "time" "git.arvados.org/arvados.git/lib/cmd" + "git.arvados.org/arvados.git/lib/controller/dblock" + "git.arvados.org/arvados.git/lib/ctrlctx" "git.arvados.org/arvados.git/lib/dispatchcloud" "git.arvados.org/arvados.git/lib/service" "git.arvados.org/arvados.git/sdk/go/arvados" @@ -57,6 +60,7 @@ type dispatcher struct { Registry *prometheus.Registry logger logrus.FieldLogger + dbConnector ctrlctx.DBConnector lsfcli lsfcli lsfqueue lsfqueue arvDispatcher *dispatch.Dispatcher @@ -72,7 +76,9 @@ type dispatcher struct { func (disp *dispatcher) Start() { disp.initOnce.Do(func() { disp.init() + dblock.Dispatch.Lock(context.Background(), disp.dbConnector.GetDB) go func() { + defer dblock.Dispatch.Unlock() disp.checkLsfQueueForOrphans() err := disp.arvDispatcher.Run(disp.Context) if err != nil { @@ -120,10 +126,11 @@ func (disp *dispatcher) init() { disp.lsfcli.logger = disp.logger disp.lsfqueue = lsfqueue{ logger: disp.logger, - period: time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval), + period: disp.Cluster.Containers.CloudVMs.PollInterval.Duration(), lsfcli: &disp.lsfcli, } disp.ArvClient.AuthToken = disp.AuthToken + disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.Cluster.PostgreSQL} disp.stop = make(chan struct{}, 1) disp.stopped = make(chan struct{}) @@ -162,61 +169,47 @@ func (disp *dispatcher) init() { } } -func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) { +func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error { ctx, cancel := context.WithCancel(disp.Context) defer cancel() if ctr.State != dispatch.Locked { // already started by prior invocation - } else if _, ok := disp.lsfqueue.JobID(ctr.UUID); !ok { + } else if _, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok { + if _, err := dispatchcloud.ChooseInstanceType(disp.Cluster, &ctr); errors.As(err, &dispatchcloud.ConstraintsNotSatisfiableError{}) { + err := disp.arvDispatcher.Arv.Update("containers", ctr.UUID, arvadosclient.Dict{ + "container": map[string]interface{}{ + "runtime_status": map[string]string{ + "error": err.Error(), + }, + }, + }, nil) + if err != nil { + return fmt.Errorf("error setting runtime_status on %s: %s", ctr.UUID, err) + } + return disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled) + } disp.logger.Printf("Submitting container %s to LSF", ctr.UUID) cmd := []string{disp.Cluster.Containers.CrunchRunCommand} cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine) cmd = append(cmd, disp.Cluster.Containers.CrunchRunArgumentsList...) - if err := disp.submit(ctr, cmd); err != nil { - var text string - switch err := err.(type) { - case dispatchcloud.ConstraintsNotSatisfiableError: - var logBuf bytes.Buffer - fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", ctr.UUID, err) - if len(err.AvailableTypes) == 0 { - fmt.Fprint(&logBuf, "No instance types are configured.\n") - } else { - fmt.Fprint(&logBuf, "Available instance types:\n") - for _, t := range err.AvailableTypes { - fmt.Fprintf(&logBuf, - "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n", - t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price, - ) - } - } - text = logBuf.String() - disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled) - default: - text = fmt.Sprintf("Error submitting container %s to LSF: %s", ctr.UUID, err) - } - disp.logger.Print(text) - - lr := arvadosclient.Dict{"log": arvadosclient.Dict{ - "object_uuid": ctr.UUID, - "event_type": "dispatch", - "properties": map[string]string{"text": text}}} - disp.arvDispatcher.Arv.Create("logs", lr, nil) - - disp.arvDispatcher.Unlock(ctr.UUID) - return + err := disp.submit(ctr, cmd) + if err != nil { + return err } } disp.logger.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State) defer disp.logger.Printf("Done monitoring container %s", ctr.UUID) - // If the container disappears from the lsf queue, there is - // no point in waiting for further dispatch updates: just - // clean up and return. go func(uuid string) { for ctx.Err() == nil { - if _, ok := disp.lsfqueue.JobID(uuid); !ok { + _, ok := disp.lsfqueue.Lookup(uuid) + if !ok { + // If the container disappears from + // the lsf queue, there is no point in + // waiting for further dispatch + // updates: just clean up and return. disp.logger.Printf("container %s job disappeared from LSF queue", uuid) cancel() return @@ -237,9 +230,14 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain case dispatch.Locked: disp.arvDispatcher.Unlock(ctr.UUID) } - return + return nil case updated, ok := <-status: if !ok { + // status channel is closed, which is + // how arvDispatcher tells us to stop + // touching the container record, kill + // off any remaining LSF processes, + // etc. done = true break } @@ -247,7 +245,7 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain disp.logger.Infof("container %s changed state from %s to %s", ctr.UUID, ctr.State, updated.State) } ctr = updated - if ctr.Priority == 0 { + if ctr.Priority < 1 { disp.logger.Printf("container %s has state %s, priority %d: cancel lsf job", ctr.UUID, ctr.State, ctr.Priority) disp.bkill(ctr) } else { @@ -259,15 +257,16 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain // Try "bkill" every few seconds until the LSF job disappears // from the queue. - ticker := time.NewTicker(5 * time.Second) + ticker := time.NewTicker(disp.Cluster.Containers.CloudVMs.PollInterval.Duration() / 2) defer ticker.Stop() - for jobid, ok := disp.lsfqueue.JobID(ctr.UUID); ok; _, ok = disp.lsfqueue.JobID(ctr.UUID) { - err := disp.lsfcli.Bkill(jobid) + for qent, ok := disp.lsfqueue.Lookup(ctr.UUID); ok; _, ok = disp.lsfqueue.Lookup(ctr.UUID) { + err := disp.lsfcli.Bkill(qent.ID) if err != nil { - disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err) + disp.logger.Warnf("%s: bkill(%s): %s", ctr.UUID, qent.ID, err) } <-ticker.C } + return nil } func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []string) error { @@ -276,7 +275,12 @@ func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []s var crArgs []string crArgs = append(crArgs, crunchRunCommand...) crArgs = append(crArgs, container.UUID) - crScript := execScript(crArgs) + + h := hmac.New(sha256.New, []byte(disp.Cluster.SystemRootToken)) + fmt.Fprint(h, container.UUID) + authsecret := fmt.Sprintf("%x", h.Sum(nil)) + + crScript := execScript(crArgs, map[string]string{"GatewayAuthSecret": authsecret}) bsubArgs, err := disp.bsubArgs(container) if err != nil { @@ -286,34 +290,54 @@ func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []s } func (disp *dispatcher) bkill(ctr arvados.Container) { - if jobid, ok := disp.lsfqueue.JobID(ctr.UUID); !ok { + if qent, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok { disp.logger.Debugf("bkill(%s): redundant, job not in queue", ctr.UUID) - } else if err := disp.lsfcli.Bkill(jobid); err != nil { - disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err) + } else if err := disp.lsfcli.Bkill(qent.ID); err != nil { + disp.logger.Warnf("%s: bkill(%s): %s", ctr.UUID, qent.ID, err) } } func (disp *dispatcher) bsubArgs(container arvados.Container) ([]string, error) { args := []string{"bsub"} - args = append(args, disp.Cluster.Containers.LSF.BsubArgumentsList...) - args = append(args, "-J", container.UUID) - args = append(args, disp.bsubConstraintArgs(container)...) - if u := disp.Cluster.Containers.LSF.BsubSudoUser; u != "" { - args = append([]string{"sudo", "-E", "-u", u}, args...) - } - return args, nil -} -func (disp *dispatcher) bsubConstraintArgs(container arvados.Container) []string { - // TODO: propagate container.SchedulingParameters.Partitions tmp := int64(math.Ceil(float64(dispatchcloud.EstimateScratchSpace(&container)) / 1048576)) vcpus := container.RuntimeConstraints.VCPUs mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+ container.RuntimeConstraints.KeepCacheRAM+ int64(disp.Cluster.Containers.ReserveExtraRAM)) / 1048576)) - return []string{ - "-R", fmt.Sprintf("rusage[mem=%dMB:tmp=%dMB] affinity[core(%d)]", mem, tmp, vcpus), + + repl := map[string]string{ + "%%": "%", + "%C": fmt.Sprintf("%d", vcpus), + "%M": fmt.Sprintf("%d", mem), + "%T": fmt.Sprintf("%d", tmp), + "%U": container.UUID, + "%G": fmt.Sprintf("%d", container.RuntimeConstraints.CUDA.DeviceCount), + } + + re := regexp.MustCompile(`%.`) + var substitutionErrors string + argumentTemplate := disp.Cluster.Containers.LSF.BsubArgumentsList + if container.RuntimeConstraints.CUDA.DeviceCount > 0 { + argumentTemplate = append(argumentTemplate, disp.Cluster.Containers.LSF.BsubCUDAArguments...) + } + for _, a := range argumentTemplate { + args = append(args, re.ReplaceAllStringFunc(a, func(s string) string { + subst := repl[s] + if len(subst) == 0 { + substitutionErrors += fmt.Sprintf("Unknown substitution parameter %s in BsubArgumentsList, ", s) + } + return subst + })) } + if len(substitutionErrors) != 0 { + return nil, fmt.Errorf("%s", substitutionErrors[:len(substitutionErrors)-2]) + } + + if u := disp.Cluster.Containers.LSF.BsubSudoUser; u != "" { + args = append([]string{"sudo", "-E", "-u", u}, args...) + } + return args, nil } // Check the next bjobs report, and invoke TrackContainer for all the @@ -335,8 +359,14 @@ func (disp *dispatcher) checkLsfQueueForOrphans() { } } -func execScript(args []string) []byte { - s := "#!/bin/sh\nexec" +func execScript(args []string, env map[string]string) []byte { + s := "#!/bin/sh\n" + for k, v := range env { + s += k + `='` + s += strings.Replace(v, `'`, `'\''`, -1) + s += `' ` + } + s += `exec` for _, w := range args { s += ` '` s += strings.Replace(w, `'`, `'\''`, -1)