import (
"context"
+ "crypto/hmac"
+ "crypto/sha256"
"errors"
"fmt"
"math"
"time"
"git.arvados.org/arvados.git/lib/cmd"
+ "git.arvados.org/arvados.git/lib/controller/dblock"
+ "git.arvados.org/arvados.git/lib/ctrlctx"
"git.arvados.org/arvados.git/lib/dispatchcloud"
"git.arvados.org/arvados.git/lib/service"
"git.arvados.org/arvados.git/sdk/go/arvados"
Registry *prometheus.Registry
logger logrus.FieldLogger
+ dbConnector ctrlctx.DBConnector
lsfcli lsfcli
lsfqueue lsfqueue
arvDispatcher *dispatch.Dispatcher
func (disp *dispatcher) Start() {
disp.initOnce.Do(func() {
disp.init()
+ dblock.Dispatch.Lock(context.Background(), disp.dbConnector.GetDB)
go func() {
+ defer dblock.Dispatch.Unlock()
disp.checkLsfQueueForOrphans()
err := disp.arvDispatcher.Run(disp.Context)
if err != nil {
disp.lsfcli.logger = disp.logger
disp.lsfqueue = lsfqueue{
logger: disp.logger,
- period: time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval),
+ period: disp.Cluster.Containers.CloudVMs.PollInterval.Duration(),
lsfcli: &disp.lsfcli,
}
disp.ArvClient.AuthToken = disp.AuthToken
+ disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.Cluster.PostgreSQL}
disp.stop = make(chan struct{}, 1)
disp.stopped = make(chan struct{})
if ctr.State != dispatch.Locked {
// already started by prior invocation
} else if _, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok {
+ if _, err := dispatchcloud.ChooseInstanceType(disp.Cluster, &ctr); errors.As(err, &dispatchcloud.ConstraintsNotSatisfiableError{}) {
+ err := disp.arvDispatcher.Arv.Update("containers", ctr.UUID, arvadosclient.Dict{
+ "container": map[string]interface{}{
+ "runtime_status": map[string]string{
+ "error": err.Error(),
+ },
+ },
+ }, nil)
+ if err != nil {
+ return fmt.Errorf("error setting runtime_status on %s: %s", ctr.UUID, err)
+ }
+ return disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
+ }
disp.logger.Printf("Submitting container %s to LSF", ctr.UUID)
cmd := []string{disp.Cluster.Containers.CrunchRunCommand}
cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine)
defer disp.logger.Printf("Done monitoring container %s", ctr.UUID)
go func(uuid string) {
- cancelled := false
for ctx.Err() == nil {
- qent, ok := disp.lsfqueue.Lookup(uuid)
+ _, ok := disp.lsfqueue.Lookup(uuid)
if !ok {
// If the container disappears from
// the lsf queue, there is no point in
cancel()
return
}
- if !cancelled && qent.Stat == "PEND" && strings.Contains(qent.PendReason, "There are no suitable hosts for the job") {
- disp.logger.Printf("container %s: %s", uuid, qent.PendReason)
- err := disp.arvDispatcher.Arv.Update("containers", uuid, arvadosclient.Dict{
- "container": map[string]interface{}{
- "runtime_status": map[string]string{
- "error": qent.PendReason,
- },
- },
- }, nil)
- if err != nil {
- disp.logger.Printf("error setting runtime_status on %s: %s", uuid, err)
- continue // retry
- }
- err = disp.arvDispatcher.UpdateState(uuid, dispatch.Cancelled)
- if err != nil {
- continue // retry (UpdateState() already logged the error)
- }
- cancelled = true
- }
}
}(ctr.UUID)
// Try "bkill" every few seconds until the LSF job disappears
// from the queue.
- ticker := time.NewTicker(5 * time.Second)
+ ticker := time.NewTicker(disp.Cluster.Containers.CloudVMs.PollInterval.Duration() / 2)
defer ticker.Stop()
for qent, ok := disp.lsfqueue.Lookup(ctr.UUID); ok; _, ok = disp.lsfqueue.Lookup(ctr.UUID) {
err := disp.lsfcli.Bkill(qent.ID)
var crArgs []string
crArgs = append(crArgs, crunchRunCommand...)
crArgs = append(crArgs, container.UUID)
- crScript := execScript(crArgs)
+
+ h := hmac.New(sha256.New, []byte(disp.Cluster.SystemRootToken))
+ fmt.Fprint(h, container.UUID)
+ authsecret := fmt.Sprintf("%x", h.Sum(nil))
+
+ crScript := execScript(crArgs, map[string]string{"GatewayAuthSecret": authsecret})
bsubArgs, err := disp.bsubArgs(container)
if err != nil {
}
}
-func execScript(args []string) []byte {
- s := "#!/bin/sh\nexec"
+func execScript(args []string, env map[string]string) []byte {
+ s := "#!/bin/sh\n"
+ for k, v := range env {
+ s += k + `='`
+ s += strings.Replace(v, `'`, `'\''`, -1)
+ s += `' `
+ }
+ s += `exec`
for _, w := range args {
s += ` '`
s += strings.Replace(w, `'`, `'\''`, -1)