X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d514a085a9caa5a48cf939fa505e7c94fd121fb6..12495b9c8e1cbde47d0a96c021d96141c51f10d8:/services/crunch-run/crunchrun.go diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index de397891c6..d055106d35 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -79,6 +79,7 @@ type ThinDockerClient interface { ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) + ContainerInspect(ctx context.Context, id string) (dockertypes.ContainerJSON, error) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) @@ -121,7 +122,7 @@ type ContainerRunner struct { SigChan chan os.Signal ArvMountExit chan error SecretMounts map[string]arvados.Mount - MkArvClient func(token string) (IArvadosClient, error) + MkArvClient func(token string) (IArvadosClient, IKeepClient, error) finalState string parentTemp string @@ -149,11 +150,14 @@ type ContainerRunner struct { cStateLock sync.Mutex cCancelled bool // StopContainer() invoked + cRemoved bool // docker confirmed the container no longer exists enableNetwork string // one of "default" or "always" networkMode string // passed through to HostConfig.NetworkMode arvMountLog *ThrottledLogger checkContainerd time.Duration + + containerWatchdogInterval time.Duration } // setupSignals sets up signal handling to gracefully terminate the underlying @@ -187,6 +191,9 @@ func (runner *ContainerRunner) stop(sig os.Signal) { if err != nil { runner.CrunchLog.Printf("error removing container: %s", err) } + if err == nil || strings.Contains(err.Error(), "No such container: "+runner.ContainerID) { + runner.cRemoved = true + } } var errorBlacklist = []string{ @@ -230,8 +237,17 @@ func (runner *ContainerRunner) LoadImage() (err error) { runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.Container.ContainerImage) + tok, err := runner.ContainerToken() + if err != nil { + return fmt.Errorf("While getting container token (LoadImage): %v", err) + } + arvClient, kc, err := runner.MkArvClient(tok) + if err != nil { + return fmt.Errorf("While creating arv client (LoadImage): %v", err) + } + var collection arvados.Collection - err = runner.ArvClient.Get("collections", runner.Container.ContainerImage, nil, &collection) + err = arvClient.Get("collections", runner.Container.ContainerImage, nil, &collection) if err != nil { return fmt.Errorf("While getting container image collection: %v", err) } @@ -252,7 +268,7 @@ func (runner *ContainerRunner) LoadImage() (err error) { runner.CrunchLog.Print("Loading Docker image from keep") var readCloser io.ReadCloser - readCloser, err = runner.Kc.ManifestFileReader(manifest, img) + readCloser, err = kc.ManifestFileReader(manifest, img) if err != nil { return fmt.Errorf("While creating ManifestFileReader for container image: %v", err) } @@ -274,7 +290,7 @@ func (runner *ContainerRunner) LoadImage() (err error) { runner.ContainerConfig.Image = imageID - runner.Kc.ClearBlockCache() + kc.ClearBlockCache() return nil } @@ -1124,6 +1140,32 @@ func (runner *ContainerRunner) WaitFinish() error { runTimeExceeded = time.After(time.Duration(timeout) * time.Second) } + containerGone := make(chan struct{}) + go func() { + defer close(containerGone) + if runner.containerWatchdogInterval < 1 { + runner.containerWatchdogInterval = time.Minute + } + for range time.NewTicker(runner.containerWatchdogInterval).C { + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(runner.containerWatchdogInterval)) + ctr, err := runner.Docker.ContainerInspect(ctx, runner.ContainerID) + cancel() + runner.cStateLock.Lock() + done := runner.cRemoved || runner.ExitCode != nil + runner.cStateLock.Unlock() + if done { + return + } else if err != nil { + runner.CrunchLog.Printf("Error inspecting container: %s", err) + runner.checkBrokenNode(err) + return + } else if ctr.State == nil || !(ctr.State.Running || ctr.State.Status == "created") { + runner.CrunchLog.Printf("Container is not running: State=%v", ctr.State) + return + } + } + }() + containerdGone := make(chan error) defer close(containerdGone) if runner.checkContainerd > 0 { @@ -1171,6 +1213,9 @@ func (runner *ContainerRunner) WaitFinish() error { runner.stop(nil) runTimeExceeded = nil + case <-containerGone: + return errors.New("docker client never returned status") + case err := <-containerdGone: return err } @@ -1206,7 +1251,7 @@ func (runner *ContainerRunner) updateLogs() { } saveAtTime = time.Now().Add(crunchLogUpdatePeriod) saveAtSize = runner.LogCollection.Size() + crunchLogUpdateSize - saved, err := runner.saveLogCollection() + saved, err := runner.saveLogCollection(false) if err != nil { runner.CrunchLog.Printf("error updating log collection: %s", err) continue @@ -1362,7 +1407,7 @@ func (runner *ContainerRunner) CommitLogs() error { // -- it exists only to send logs to other channels. return nil } - saved, err := runner.saveLogCollection() + saved, err := runner.saveLogCollection(true) if err != nil { return fmt.Errorf("error saving log collection: %s", err) } @@ -1372,7 +1417,7 @@ func (runner *ContainerRunner) CommitLogs() error { return nil } -func (runner *ContainerRunner) saveLogCollection() (response arvados.Collection, err error) { +func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.Collection, err error) { runner.logMtx.Lock() defer runner.logMtx.Unlock() if runner.LogsPDH != nil { @@ -1384,11 +1429,18 @@ func (runner *ContainerRunner) saveLogCollection() (response arvados.Collection, err = fmt.Errorf("error creating log manifest: %v", err) return } - reqBody := arvadosclient.Dict{ - "collection": arvadosclient.Dict{ - "is_trashed": true, - "name": "logs for " + runner.Container.UUID, - "manifest_text": mt}} + updates := arvadosclient.Dict{ + "name": "logs for " + runner.Container.UUID, + "manifest_text": mt, + } + if final { + updates["is_trashed"] = true + } else { + exp := time.Now().Add(crunchLogUpdatePeriod * 24) + updates["trash_at"] = exp + updates["delete_at"] = exp + } + reqBody := arvadosclient.Dict{"collection": updates} if runner.logUUID == "" { reqBody["ensure_unique_name"] = true err = runner.ArvClient.Create("collections", reqBody, &response) @@ -1425,7 +1477,7 @@ func (runner *ContainerRunner) ContainerToken() (string, error) { if err != nil { return "", err } - runner.token = auth.APIToken + runner.token = fmt.Sprintf("v2/%s/%s/%s", auth.UUID, auth.APIToken, runner.Container.UUID) return runner.token, nil } @@ -1636,7 +1688,7 @@ func (runner *ContainerRunner) fetchContainerRecord() error { return fmt.Errorf("error getting container token: %v", err) } - containerClient, err := runner.MkArvClient(containerToken) + containerClient, _, err := runner.MkArvClient(containerToken) if err != nil { return fmt.Errorf("error creating container API client: %v", err) } @@ -1676,13 +1728,17 @@ func NewContainerRunner(client *arvados.Client, api IArvadosClient, kc IKeepClie } return ps, nil } - cr.MkArvClient = func(token string) (IArvadosClient, error) { + cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, error) { cl, err := arvadosclient.MakeArvadosClient() if err != nil { - return nil, err + return nil, nil, err } cl.ApiToken = token - return cl, nil + kc, err := keepclient.MakeKeepClient(cl) + if err != nil { + return nil, nil, err + } + return cl, kc, nil } var err error cr.LogCollection, err = (&arvados.Collection{}).FileSystem(cr.client, cr.Kc)