From cc13854bd4675d9f1de807c38dfada0315bf3291 Mon Sep 17 00:00:00 2001 From: radhika Date: Thu, 8 Sep 2016 12:53:54 -0400 Subject: [PATCH] 9898: add Lock and Unlock methods to dispatch go sdk. --- sdk/go/dispatch/dispatch.go | 32 ++++++++++++------- .../crunch-dispatch-slurm.go | 15 +++++---- 2 files changed, 29 insertions(+), 18 deletions(-) diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go index 53f41d29d0..aa79747564 100644 --- a/sdk/go/dispatch/dispatch.go +++ b/sdk/go/dispatch/dispatch.go @@ -178,7 +178,7 @@ func (dispatcher *Dispatcher) handleUpdate(container arvados.Container) { if container.State == Queued && container.Priority > 0 { // Try to take the lock - if err := dispatcher.UpdateState(container.UUID, Locked); err != nil { + if err := dispatcher.Lock(container.UUID); err != nil { return } container.State = Locked @@ -194,17 +194,9 @@ func (dispatcher *Dispatcher) handleUpdate(container arvados.Container) { // UpdateState makes an API call to change the state of a container. func (dispatcher *Dispatcher) UpdateState(uuid string, newState arvados.ContainerState) error { if newState == Locked { - err := dispatcher.Arv.Call("POST", "containers", uuid, "lock", nil, nil) - if err != nil { - log.Printf("Error locking container %s: %q", uuid, err) - } - return err + return dispatcher.Lock(uuid) } else if newState == Queued { - err := dispatcher.Arv.Call("POST", "containers", uuid, "unlock", nil, nil) - if err != nil { - log.Printf("Error unlocking container %s: %q", uuid, err) - } - return err + return dispatcher.Unlock(uuid) } // All other states @@ -218,6 +210,24 @@ func (dispatcher *Dispatcher) UpdateState(uuid string, newState arvados.Containe return err } +// Lock makes the lock API call which updates the state of a container to Locked. +func (dispatcher *Dispatcher) Lock(uuid string) error { + err := dispatcher.Arv.Call("POST", "containers", uuid, "lock", nil, nil) + if err != nil { + log.Printf("Error locking container %s: %q", uuid, err) + } + return err +} + +// Unlock makes the unlock API call which updates the state of a container to Queued. +func (dispatcher *Dispatcher) Unlock(uuid string) error { + err := dispatcher.Arv.Call("POST", "containers", uuid, "unlock", nil, nil) + if err != nil { + log.Printf("Error unlocking container %s: %q", uuid, err) + } + return err +} + // RunDispatcher runs the main loop of the dispatcher until receiving a message // on the dispatcher.DoneProcessing channel. It also installs a signal handler // to terminate gracefully on SIGINT, SIGTERM or SIGQUIT. diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go index fa77e70ea5..ac776445d2 100644 --- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go +++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go @@ -151,7 +151,7 @@ func submit(dispatcher *dispatch.Dispatcher, // OK, no cleanup needed return } - err := dispatcher.UpdateState(container.UUID, dispatch.Queued) + err := dispatcher.Unlock(container.UUID) if err != nil { log.Printf("Error unlocking container %s: %v", container.UUID, err) } @@ -244,7 +244,7 @@ func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Co log.Printf("Error submitting container %s to slurm: %v", container.UUID, err) // maybe sbatch is broken, put it back to queued - dispatcher.UpdateState(container.UUID, dispatch.Queued) + dispatcher.Unlock(container.UUID) } submitted = true } else { @@ -263,17 +263,18 @@ func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Co var st arvados.ContainerState switch con.State { case dispatch.Locked: - st = dispatch.Queued + log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.", + container.UUID, con.State, dispatch.Queued) + dispatcher.Unlock(container.UUID) case dispatch.Running: st = dispatch.Cancelled + log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.", + container.UUID, con.State, st) + dispatcher.UpdateState(container.UUID, st) default: // Container state is Queued, Complete or Cancelled so stop monitoring it. return } - - log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.", - container.UUID, con.State, st) - dispatcher.UpdateState(container.UUID, st) } } } -- 2.30.2