projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
20457: Log at info level when flapping lock at concurrency limit.
[arvados.git]
/
lib
/
dispatchcloud
/
container
/
queue.go
diff --git
a/lib/dispatchcloud/container/queue.go
b/lib/dispatchcloud/container/queue.go
index a4a270dd1013eee601cd214eb7e336f6cd8a29e2..938ef915f251e4d27e1ea4f714b82f10425d4224 100644
(file)
--- a/
lib/dispatchcloud/container/queue.go
+++ b/
lib/dispatchcloud/container/queue.go
@@
-26,10
+26,12
@@
type APIClient interface {
// A QueueEnt is an entry in the queue, consisting of a container
// record and the instance type that should be used to run it.
type QueueEnt struct {
// A QueueEnt is an entry in the queue, consisting of a container
// record and the instance type that should be used to run it.
type QueueEnt struct {
- // The container to run. Only the UUID, State, Priority, and
- // RuntimeConstraints fields are populated.
+ // The container to run. Only the UUID, State, Priority,
+ // RuntimeConstraints, Mounts, and ContainerImage fields are
+ // populated.
Container arvados.Container `json:"container"`
InstanceType arvados.InstanceType `json:"instance_type"`
Container arvados.Container `json:"container"`
InstanceType arvados.InstanceType `json:"instance_type"`
+ FirstSeenAt time.Time `json:"first_seen_at"`
}
// String implements fmt.Stringer by returning the queued container's
}
// String implements fmt.Stringer by returning the queued container's
@@
-144,11
+146,11
@@
func (cq *Queue) Forget(uuid string) {
func (cq *Queue) Get(uuid string) (arvados.Container, bool) {
cq.mtx.Lock()
defer cq.mtx.Unlock()
func (cq *Queue) Get(uuid string) (arvados.Container, bool) {
cq.mtx.Lock()
defer cq.mtx.Unlock()
- if ctr, ok := cq.current[uuid]; !ok {
+ ctr, ok := cq.current[uuid]
+ if !ok {
return arvados.Container{}, false
return arvados.Container{}, false
- } else {
- return ctr.Container, true
}
}
+ return ctr.Container, true
}
// Entries returns all cache entries, keyed by container UUID.
}
// Entries returns all cache entries, keyed by container UUID.
@@
-228,6
+230,7
@@
func (cq *Queue) delEnt(uuid string, state arvados.ContainerState) {
delete(cq.current, uuid)
}
delete(cq.current, uuid)
}
+// Caller must have lock.
func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
it, err := cq.chooseType(&ctr)
if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) {
func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
it, err := cq.chooseType(&ctr)
if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) {
@@
-283,7
+286,7
@@
func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
"Priority": ctr.Priority,
"InstanceType": it.Name,
}).Info("adding container to queue")
"Priority": ctr.Priority,
"InstanceType": it.Name,
}).Info("adding container to queue")
- cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it}
+ cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it
, FirstSeenAt: time.Now()
}
}
// Lock acquires the dispatch lock for the given container.
}
// Lock acquires the dispatch lock for the given container.
@@
-381,7
+384,7
@@
func (cq *Queue) poll() (map[string]*arvados.Container, error) {
*next[upd.UUID] = upd
}
}
*next[upd.UUID] = upd
}
}
- selectParam := []string{"uuid", "state", "priority", "runtime_constraints"}
+ selectParam := []string{"uuid", "state", "priority", "runtime_constraints"
, "container_image", "mounts", "scheduling_parameters", "created_at"
}
limitParam := 1000
mine, err := cq.fetchAll(arvados.ResourceListParams{
limitParam := 1000
mine, err := cq.fetchAll(arvados.ResourceListParams{