"syscall"
"time"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/dispatch"
+ "git.arvados.org/arvados.git/lib/config"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+ "git.arvados.org/arvados.git/sdk/go/dispatch"
"github.com/sirupsen/logrus"
)
return nil
}
+ loader := config.NewLoader(nil, logger)
+ cfg, err := loader.Load()
+ cluster, err := cfg.GetCluster("")
+ if err != nil {
+ return fmt.Errorf("config error: %s", err)
+ }
+
logger.Printf("crunch-dispatch-local %s started", version)
runningCmds = make(map[string]*exec.Cmd)
+ var client arvados.Client
+ client.APIHost = cluster.Services.Controller.ExternalURL.Host
+ client.AuthToken = cluster.SystemRootToken
+ client.Insecure = cluster.TLS.Insecure
+
+ if client.APIHost != "" || client.AuthToken != "" {
+ // Copy real configs into env vars so [a]
+ // MakeArvadosClient() uses them, and [b] they get
+ // propagated to crunch-run via SLURM.
+ os.Setenv("ARVADOS_API_HOST", client.APIHost)
+ os.Setenv("ARVADOS_API_TOKEN", client.AuthToken)
+ os.Setenv("ARVADOS_API_HOST_INSECURE", "")
+ if client.Insecure {
+ os.Setenv("ARVADOS_API_HOST_INSECURE", "1")
+ }
+ os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
+ } else {
+ logger.Warnf("Client credentials missing from config, so falling back on environment variables (deprecated).")
+ }
+
arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
logger.Errorf("error making Arvados client: %v", err)
dispatcher := dispatch.Dispatcher{
Logger: logger,
Arv: arv,
- RunContainer: (&LocalRun{startFunc, make(chan bool, 8), ctx}).run,
+ RunContainer: (&LocalRun{startFunc, make(chan bool, 8), ctx, cluster}).run,
PollPeriod: time.Duration(*pollInterval) * time.Second,
}
startCmd func(container arvados.Container, cmd *exec.Cmd) error
concurrencyLimit chan bool
ctx context.Context
+ cluster *arvados.Cluster
}
// Run a container.
// crunch-run terminates, mark the container as Cancelled.
func (lr *LocalRun) run(dispatcher *dispatch.Dispatcher,
container arvados.Container,
- status <-chan arvados.Container) {
+ status <-chan arvados.Container) error {
uuid := container.UUID
case lr.concurrencyLimit <- true:
break
case <-lr.ctx.Done():
- return
+ return lr.ctx.Err()
}
defer func() { <-lr.concurrencyLimit }()
waitGroup.Add(1)
defer waitGroup.Done()
- cmd := exec.Command(*crunchRunCommand, uuid)
+ cmd := exec.Command(*crunchRunCommand, "--runtime-engine="+lr.cluster.Containers.RuntimeEngine, uuid)
cmd.Stdin = nil
cmd.Stderr = os.Stderr
cmd.Stdout = os.Stderr
}
dispatcher.Logger.Printf("finalized container %v", uuid)
+ return nil
}