mineMutex sync.Mutex
mineMap map[string]chan Container
- auth apiClientAuthorization
+ Auth apiClientAuthorization
containers chan Container
}
err := dispatcher.Arv.List("containers", params, &containers)
if err != nil {
log.Printf("Error getting list of containers: %q", err)
- } else {
- if containers.ItemsAvailable > len(containers.Items) {
- // TODO: support paging
- log.Printf("Warning! %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
- containers.ItemsAvailable,
- len(containers.Items))
- }
- for _, container := range containers.Items {
- touched[container.UUID] = true
- dispatcher.containers <- container
- }
+ return
+ }
+
+ if containers.ItemsAvailable > len(containers.Items) {
+ // TODO: support paging
+ log.Printf("Warning! %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
+ containers.ItemsAvailable,
+ len(containers.Items))
+ }
+ for _, container := range containers.Items {
+ touched[container.UUID] = true
+ dispatcher.containers <- container
}
}
"order": []string{"priority desc"},
"limit": "1000"}
paramsP := arvadosclient.Dict{
- "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.auth.UUID}},
+ "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.Auth.UUID}},
"limit": "1000"}
for {
}
func (dispatcher *Dispatcher) handleUpdate(container Container) {
+ if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued {
+ // If container is Complete, Cancelled, or Queued, LockedByUUID
+ // will be nil. If the container was formally Locked, moved
+ // back to Queued and then locked by another dispatcher,
+ // LockedByUUID will be different. In either case, we want
+ // to stop monitoring it.
+ log.Printf("Container %v now in state %v with locked_by_uuid %v", container.UUID, container.State, container.LockedByUUID)
+ dispatcher.notMine(container.UUID)
+ return
+ }
+
if dispatcher.updateMine(container) {
- if container.State == Complete || container.State == Cancelled {
- log.Printf("Container %v now in state %v", container.UUID, container.State)
- dispatcher.notMine(container.UUID)
- }
+ // Already monitored, sent status update
return
}
}
if container.State == Locked || container.State == Running {
+ // Not currently monitored but in Locked or Running state and
+ // owned by this dispatcher, so start monitoring.
go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID))
}
}
// The callback is responsible for draining the channel, if it fails to do so
// it will deadlock the dispatcher.
func (dispatcher *Dispatcher) RunDispatcher() (err error) {
- err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.auth)
+ err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
if err != nil {
log.Printf("Error getting my token UUID: %v", err)
return