X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a6c7c8db01d37534622763dd385019e9dad17181..ae92d144610446849eb568247a44f02ae985c281:/sdk/go/dispatch/dispatch.go diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go index 94134950d2..d34ea68d7a 100644 --- a/sdk/go/dispatch/dispatch.go +++ b/sdk/go/dispatch/dispatch.go @@ -1,16 +1,22 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + // Package dispatch is a helper library for building Arvados container // dispatchers. package dispatch import ( + "bytes" "context" "fmt" - "log" "sync" "time" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.arvados.org/arvados.git/lib/dispatchcloud" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvadosclient" + "github.com/sirupsen/logrus" ) const ( @@ -21,10 +27,21 @@ const ( Cancelled = arvados.ContainerStateCancelled ) +type Logger interface { + Printf(string, ...interface{}) + Warnf(string, ...interface{}) + Debugf(string, ...interface{}) +} + // Dispatcher struct type Dispatcher struct { Arv *arvadosclient.ArvadosClient + Logger Logger + + // Batch size for container queries + BatchSize int + // Queue polling frequency PollPeriod time.Duration @@ -51,13 +68,17 @@ type Dispatcher struct { // running, and return. // // The DispatchFunc should not return until the container is finished. -type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container) +type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container) error // Run watches the API server's queue for containers that are either // ready to run and available to lock, or are already locked by this // dispatcher's token. When a new one appears, Run calls RunContainer // in a new goroutine. func (d *Dispatcher) Run(ctx context.Context) error { + if d.Logger == nil { + d.Logger = logrus.StandardLogger() + } + err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth) if err != nil { return fmt.Errorf("error getting my token UUID: %v", err) @@ -68,75 +89,190 @@ func (d *Dispatcher) Run(ctx context.Context) error { poll := time.NewTicker(d.PollPeriod) defer poll.Stop() + if d.BatchSize == 0 { + d.BatchSize = 100 + } + for { - tracked := d.trackedUUIDs() - d.checkForUpdates([][]interface{}{ - {"uuid", "in", tracked}}) - d.checkForUpdates([][]interface{}{ - {"locked_by_uuid", "=", d.auth.UUID}, - {"uuid", "not in", tracked}}) - d.checkForUpdates([][]interface{}{ - {"state", "=", Queued}, - {"priority", ">", "0"}, - {"uuid", "not in", tracked}}) select { case <-poll.C: - continue + break case <-ctx.Done(): + d.mtx.Lock() + defer d.mtx.Unlock() + for _, tracker := range d.trackers { + tracker.close() + } return ctx.Err() } - } -} -func (d *Dispatcher) trackedUUIDs() []string { - d.mtx.Lock() - defer d.mtx.Unlock() - if len(d.trackers) == 0 { - // API bug: ["uuid", "not in", []] does not work as - // expected, but this does: - return []string{"this-uuid-does-not-exist"} - } - uuids := make([]string, 0, len(d.trackers)) - for x := range d.trackers { - uuids = append(uuids, x) + todo := make(map[string]*runTracker) + d.mtx.Lock() + // Make a copy of trackers + for uuid, tracker := range d.trackers { + todo[uuid] = tracker + } + d.mtx.Unlock() + + // Containers I currently own (Locked/Running) + querySuccess := d.checkForUpdates([][]interface{}{ + {"locked_by_uuid", "=", d.auth.UUID}}, todo) + + // Containers I should try to dispatch + querySuccess = d.checkForUpdates([][]interface{}{ + {"state", "=", Queued}, + {"priority", ">", "0"}}, todo) && querySuccess + + if !querySuccess { + // There was an error in one of the previous queries, + // we probably didn't get updates for all the + // containers we should have. Don't check them + // individually because it may be expensive. + continue + } + + // Containers I know about but didn't fall into the + // above two categories (probably Complete/Cancelled) + var missed []string + for uuid := range todo { + missed = append(missed, uuid) + } + + for len(missed) > 0 { + var batch []string + if len(missed) > 20 { + batch = missed[0:20] + missed = missed[20:] + } else { + batch = missed + missed = missed[0:0] + } + querySuccess = d.checkForUpdates([][]interface{}{ + {"uuid", "in", batch}}, todo) && querySuccess + } + + if !querySuccess { + // There was an error in one of the previous queries, we probably + // didn't see all the containers we should have, so don't shut down + // the missed containers. + continue + } + + // Containers that I know about that didn't show up in any + // query should be let go. + for uuid, tracker := range todo { + d.Logger.Printf("Container %q not returned by any query, stopping tracking.", uuid) + tracker.close() + } + } - return uuids } // Start a runner in a new goroutine, and send the initial container // record to its updates channel. func (d *Dispatcher) start(c arvados.Container) *runTracker { - tracker := &runTracker{updates: make(chan arvados.Container, 1)} + tracker := &runTracker{ + updates: make(chan arvados.Container, 1), + logger: d.Logger, + } tracker.updates <- c go func() { - d.RunContainer(d, c, tracker.updates) - - d.mtx.Lock() - delete(d.trackers, c.UUID) - d.mtx.Unlock() + fallbackState := Queued + err := d.RunContainer(d, c, tracker.updates) + if err != nil { + text := fmt.Sprintf("Error running container %s: %s", c.UUID, err) + if err, ok := err.(dispatchcloud.ConstraintsNotSatisfiableError); ok { + fallbackState = Cancelled + var logBuf bytes.Buffer + fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", c.UUID, err) + if len(err.AvailableTypes) == 0 { + fmt.Fprint(&logBuf, "No instance types are configured.\n") + } else { + fmt.Fprint(&logBuf, "Available instance types:\n") + for _, t := range err.AvailableTypes { + fmt.Fprintf(&logBuf, + "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n", + t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price) + } + } + text = logBuf.String() + } + d.Logger.Printf("%s", text) + lr := arvadosclient.Dict{"log": arvadosclient.Dict{ + "object_uuid": c.UUID, + "event_type": "dispatch", + "properties": map[string]string{"text": text}}} + d.Arv.Create("logs", lr, nil) + } + // If checkListForUpdates() doesn't close the tracker + // after 2 queue updates, try to move the container to + // the fallback state, which should eventually work + // and cause the tracker to close. + updates := 0 + for upd := range tracker.updates { + updates++ + if upd.State == Locked || upd.State == Running { + // Tracker didn't clean up before + // returning -- or this is the first + // update and it contains stale + // information from before + // RunContainer() returned. + if updates < 2 { + // Avoid generating confusing + // logs / API calls in the + // stale-info case. + continue + } + d.Logger.Printf("container %s state is still %s, changing to %s", c.UUID, upd.State, fallbackState) + d.UpdateState(c.UUID, fallbackState) + } + } }() return tracker } -func (d *Dispatcher) checkForUpdates(filters [][]interface{}) { +func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool { + var countList arvados.ContainerList params := arvadosclient.Dict{ "filters": filters, + "count": "exact", + "limit": 0, "order": []string{"priority desc"}} - - var list arvados.ContainerList - for offset, more := 0, true; more; offset += len(list.Items) { + err := d.Arv.List("containers", params, &countList) + if err != nil { + d.Logger.Warnf("error getting count of containers: %q", err) + return false + } + itemsAvailable := countList.ItemsAvailable + params = arvadosclient.Dict{ + "filters": filters, + "count": "none", + "limit": d.BatchSize, + "order": []string{"priority desc"}} + offset := 0 + for { params["offset"] = offset + + // This list variable must be a new one declared + // inside the loop: otherwise, items in the API + // response would get deep-merged into the items + // loaded in previous iterations. + var list arvados.ContainerList + err := d.Arv.List("containers", params, &list) if err != nil { - log.Printf("Error getting list of containers: %q", err) - return + d.Logger.Warnf("error getting list of containers: %q", err) + return false + } + d.checkListForUpdates(list.Items, todo) + offset += len(list.Items) + if len(list.Items) == 0 || itemsAvailable <= offset { + return true } - more = len(list.Items) > 0 && list.ItemsAvailable > len(list.Items)+offset - d.checkListForUpdates(list.Items) } } -func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) { +func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo map[string]*runTracker) { d.mtx.Lock() defer d.mtx.Unlock() if d.trackers == nil { @@ -145,16 +281,19 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) { for _, c := range containers { tracker, alreadyTracking := d.trackers[c.UUID] + delete(todo, c.UUID) + if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID { - log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID) + d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID) } else if alreadyTracking { switch c.State { - case Queued: + case Queued, Cancelled, Complete: + d.Logger.Debugf("update has %s in state %s, closing tracker", c.UUID, c.State) tracker.close() + delete(d.trackers, c.UUID) case Locked, Running: + d.Logger.Debugf("update has %s in state %s, updating tracker", c.UUID, c.State) tracker.update(c) - case Cancelled, Complete: - tracker.close() } } else { switch c.State { @@ -164,7 +303,7 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) { } err := d.lock(c.UUID) if err != nil { - log.Printf("debug: error locking container %s: %s", c.UUID, err) + d.Logger.Warnf("error locking container %s: %s", c.UUID, err) break } c.State = Locked @@ -188,7 +327,7 @@ func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) erro "container": arvadosclient.Dict{"state": state}, }, nil) if err != nil { - log.Printf("Error updating container %s to state %q: %s", uuid, state, err) + d.Logger.Warnf("error updating container %s to state %q: %s", uuid, state, err) } return err } @@ -203,33 +342,47 @@ func (d *Dispatcher) Unlock(uuid string) error { return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil) } -// TrackContainer starts a tracker for given uuid if one is not already existing, despite its state. -func (d *Dispatcher) TrackContainer(uuid string) { +// TrackContainer ensures a tracker is running for the given UUID, +// regardless of the current state of the container (except: if the +// container is locked by a different dispatcher, a tracker will not +// be started). If the container is not in Locked or Running state, +// the new tracker will close down immediately. +// +// This allows the dispatcher to put its own RunContainer func into a +// cleanup phase (for example, to kill local processes created by a +// prevous dispatch process that are still running even though the +// container state is final) without the risk of having multiple +// goroutines monitoring the same UUID. +func (d *Dispatcher) TrackContainer(uuid string) error { + var cntr arvados.Container + err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr) + if err != nil { + return err + } + if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID { + return nil + } + d.mtx.Lock() defer d.mtx.Unlock() - + if _, alreadyTracking := d.trackers[uuid]; alreadyTracking { + return nil + } if d.trackers == nil { d.trackers = make(map[string]*runTracker) } - - _, alreadyTracking := d.trackers[uuid] - if alreadyTracking { - return - } - - var cntr arvados.Container - err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr) - if err != nil { - log.Printf("Error getting container %s: %s", uuid, err) - return + d.trackers[uuid] = d.start(cntr) + switch cntr.State { + case Queued, Cancelled, Complete: + d.trackers[uuid].close() } - - d.trackers[uuid] = d.start(c) + return nil } type runTracker struct { closing bool updates chan arvados.Container + logger Logger } func (tracker *runTracker) close() { @@ -245,7 +398,7 @@ func (tracker *runTracker) update(c arvados.Container) { } select { case <-tracker.updates: - log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID) + tracker.logger.Debugf("runner is handling updates slowly, discarded previous update for %s", c.UUID) default: } tracker.updates <- c