9187: Check LockedByUUID on container updates and terminate status updates if
authorPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 27 May 2016 21:30:07 +0000 (17:30 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 27 May 2016 21:30:07 +0000 (17:30 -0400)
not equal to dispatcher token.

sdk/go/dispatch/dispatch.go
services/crunch-dispatch-local/crunch-dispatch-local.go

index 355ed7c1084a0feed96cab06ab8bdbe5cd3d2d6c..a27971f90655ca8e49046688119092c837b77938 100644 (file)
@@ -52,7 +52,7 @@ type Dispatcher struct {
 
        mineMutex  sync.Mutex
        mineMap    map[string]chan Container
-       auth       apiClientAuthorization
+       Auth       apiClientAuthorization
        containers chan Container
 }
 
@@ -100,17 +100,18 @@ func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched m
        err := dispatcher.Arv.List("containers", params, &containers)
        if err != nil {
                log.Printf("Error getting list of containers: %q", err)
-       } else {
-               if containers.ItemsAvailable > len(containers.Items) {
-                       // TODO: support paging
-                       log.Printf("Warning!  %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
-                               containers.ItemsAvailable,
-                               len(containers.Items))
-               }
-               for _, container := range containers.Items {
-                       touched[container.UUID] = true
-                       dispatcher.containers <- container
-               }
+               return
+       }
+
+       if containers.ItemsAvailable > len(containers.Items) {
+               // TODO: support paging
+               log.Printf("Warning!  %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
+                       containers.ItemsAvailable,
+                       len(containers.Items))
+       }
+       for _, container := range containers.Items {
+               touched[container.UUID] = true
+               dispatcher.containers <- container
        }
 }
 
@@ -122,7 +123,7 @@ func (dispatcher *Dispatcher) pollContainers() {
                "order":   []string{"priority desc"},
                "limit":   "1000"}
        paramsP := arvadosclient.Dict{
-               "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.auth.UUID}},
+               "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.Auth.UUID}},
                "limit":   "1000"}
 
        for {
@@ -152,11 +153,19 @@ func (dispatcher *Dispatcher) pollContainers() {
 }
 
 func (dispatcher *Dispatcher) handleUpdate(container Container) {
+       if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued {
+               // If container is Complete, Cancelled, or Queued, LockedByUUID
+               // will be nil.  If the container was formally Locked, moved
+               // back to Queued and then locked by another dispatcher,
+               // LockedByUUID will be different.  In either case, we want
+               // to stop monitoring it.
+               log.Printf("Container %v now in state %v with locked_by_uuid %v", container.UUID, container.State, container.LockedByUUID)
+               dispatcher.notMine(container.UUID)
+               return
+       }
+
        if dispatcher.updateMine(container) {
-               if container.State == Complete || container.State == Cancelled {
-                       log.Printf("Container %v now in state %v", container.UUID, container.State)
-                       dispatcher.notMine(container.UUID)
-               }
+               // Already monitored, sent status update
                return
        }
 
@@ -169,6 +178,8 @@ func (dispatcher *Dispatcher) handleUpdate(container Container) {
        }
 
        if container.State == Locked || container.State == Running {
+               // Not currently monitored but in Locked or Running state and
+               // owned by this dispatcher, so start monitoring.
                go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID))
        }
 }
@@ -197,7 +208,7 @@ func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
 // The callback is responsible for draining the channel, if it fails to do so
 // it will deadlock the dispatcher.
 func (dispatcher *Dispatcher) RunDispatcher() (err error) {
-       err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.auth)
+       err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
        if err != nil {
                log.Printf("Error getting my token UUID: %v", err)
                return
index cc472a40311adc36f91202512c3f4f7fb52e2b8f..73a389533679a2ceef773a237891921ae5bf92fc 100644 (file)
@@ -159,7 +159,8 @@ func run(dispatcher *dispatch.Dispatcher,
        if err != nil {
                log.Printf("Error getting final container state: %v", err)
        }
-       if container.State != dispatch.Complete && container.State != dispatch.Cancelled {
+       if container.LockedByUUID == dispatcher.Auth.UUID &&
+               (container.State == dispatch.Locked || container.State == dispatch.Running) {
                log.Printf("After %s process termination, container state for %v is %q.  Updating it to %q",
                        *crunchRunCommand, container.State, uuid, dispatch.Cancelled)
                dispatcher.UpdateState(uuid, dispatch.Cancelled)