X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/6d940c5e6940a1dca97989c47e67c33d20a4d050..83fed933f8d4d6000613024caec7d62dd7651209:/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go index f718fbcdce..4bfff6a5f0 100644 --- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go +++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go @@ -5,6 +5,7 @@ package main import ( "flag" "fmt" + "git.curoverse.com/arvados.git/sdk/go/arvados" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/dispatch" "io/ioutil" @@ -69,17 +70,17 @@ func doMain() error { } // sbatchCmd -func sbatchFunc(container dispatch.Container) *exec.Cmd { - memPerCPU := math.Ceil((float64(container.RuntimeConstraints["ram"])) / (float64(container.RuntimeConstraints["vcpus"] * 1048576))) +func sbatchFunc(container arvados.Container) *exec.Cmd { + memPerCPU := math.Ceil(float64(container.RuntimeConstraints.RAM) / (float64(container.RuntimeConstraints.VCPUs) * 1048576)) return exec.Command("sbatch", "--share", "--parsable", fmt.Sprintf("--job-name=%s", container.UUID), fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)), - fmt.Sprintf("--cpus-per-task=%d", int(container.RuntimeConstraints["vcpus"])), + fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs), fmt.Sprintf("--priority=%d", container.Priority)) } // scancelCmd -func scancelFunc(container dispatch.Container) *exec.Cmd { +func scancelFunc(container arvados.Container) *exec.Cmd { return exec.Command("scancel", "--name="+container.UUID) } @@ -89,7 +90,7 @@ var scancelCmd = scancelFunc // Submit job to slurm using sbatch. func submit(dispatcher *dispatch.Dispatcher, - container dispatch.Container, crunchRunCommand string) (jobid string, submitErr error) { + container arvados.Container, crunchRunCommand string) (jobid string, submitErr error) { submitErr = nil defer func() { @@ -181,7 +182,7 @@ func submit(dispatcher *dispatch.Dispatcher, // // If the container is marked as Running, check if it is in the slurm queue. // If not, mark it as Cancelled. -func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container dispatch.Container, monitorDone *bool) { +func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Container, monitorDone *bool) { submitted := false for !*monitorDone { if squeueUpdater.CheckSqueue(container.UUID) { @@ -207,13 +208,13 @@ func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container dispatch.C // release it back to the Queue, if it is Running then // clean up the record. - var con dispatch.Container + var con arvados.Container err := dispatcher.Arv.Get("containers", container.UUID, nil, &con) if err != nil { log.Printf("Error getting final container state: %v", err) } - var st string + var st arvados.ContainerState switch con.State { case dispatch.Locked: st = dispatch.Queued @@ -236,8 +237,8 @@ func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container dispatch.C // Monitor status updates. If the priority changes to zero, cancel the // container using scancel. func run(dispatcher *dispatch.Dispatcher, - container dispatch.Container, - status chan dispatch.Container) { + container arvados.Container, + status chan arvados.Container) { log.Printf("Monitoring container %v started", container.UUID) defer log.Printf("Monitoring container %v finished", container.UUID)