9187: Add comments.
[arvados.git] / sdk / go / dispatch / dispatch.go
index fb7b5fb799c6b3d30b9c0975a6318be1626d1289..54d596fee86ed400d595432fd7e75d72b0c086e7 100644 (file)
@@ -102,15 +102,17 @@ func (dispatcher *Dispatcher) notMine(uuid string) {
        }
 }
 
-// Check if there is a channel for updates associated with this container.  If
-// so send the container record on the channel and return true, if not return
-// false.
-func (dispatcher *Dispatcher) updateMine(c Container) bool {
+// checkMine returns true/false if there is a channel for updates associated
+// with container c.  If update is true, also send the container record on
+// the channel.
+func (dispatcher *Dispatcher) checkMine(c Container, update bool) bool {
        dispatcher.mineMutex.Lock()
        defer dispatcher.mineMutex.Unlock()
        ch, ok := dispatcher.mineMap[c.UUID]
        if ok {
-               ch <- c
+               if update {
+                       ch <- c
+               }
                return true
        }
        return false
@@ -174,6 +176,14 @@ func (dispatcher *Dispatcher) pollContainers() {
 }
 
 func (dispatcher *Dispatcher) handleUpdate(container Container) {
+       if container.State == Queued && dispatcher.checkMine(container, false) {
+               // If we previously started the job, something failed, and it
+               // was re-queued, this dispatcher might still be monitoring it.
+               // Stop the existing monitor, then try to lock and run it
+               // again.
+               dispatcher.notMine(container.UUID)
+       }
+
        if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued {
                // If container is Complete, Cancelled, or Queued, LockedByUUID
                // will be nil.  If the container was formerly Locked, moved
@@ -185,12 +195,12 @@ func (dispatcher *Dispatcher) handleUpdate(container Container) {
                return
        }
 
-       if dispatcher.updateMine(container) {
+       if dispatcher.checkMine(container, true) {
                // Already monitored, sent status update
                return
        }
 
-       if container.State == Queued {
+       if container.State == Queued && container.Priority > 0 {
                // Try to take the lock
                if err := dispatcher.UpdateState(container.UUID, Locked); err != nil {
                        return