ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error
ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error)
- ContainerList(ctx context.Context, opts dockertypes.ContainerListOptions) ([]dockertypes.Container, error)
+ ContainerInspect(ctx context.Context, id string) (dockertypes.ContainerJSON, error)
ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
SigChan chan os.Signal
ArvMountExit chan error
SecretMounts map[string]arvados.Mount
- MkArvClient func(token string) (IArvadosClient, error)
+ MkArvClient func(token string) (IArvadosClient, IKeepClient, error)
finalState string
parentTemp string
arvMountLog *ThrottledLogger
checkContainerd time.Duration
- containerWaitGracePeriod time.Duration
+ containerWatchdogInterval time.Duration
}
// setupSignals sets up signal handling to gracefully terminate the underlying
runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.Container.ContainerImage)
+ tok, err := runner.ContainerToken()
+ if err != nil {
+ return fmt.Errorf("While getting container token (LoadImage): %v", err)
+ }
+ arvClient, kc, err := runner.MkArvClient(tok)
+ if err != nil {
+ return fmt.Errorf("While creating arv client (LoadImage): %v", err)
+ }
+
var collection arvados.Collection
- err = runner.ArvClient.Get("collections", runner.Container.ContainerImage, nil, &collection)
+ err = arvClient.Get("collections", runner.Container.ContainerImage, nil, &collection)
if err != nil {
return fmt.Errorf("While getting container image collection: %v", err)
}
runner.CrunchLog.Print("Loading Docker image from keep")
var readCloser io.ReadCloser
- readCloser, err = runner.Kc.ManifestFileReader(manifest, img)
+ readCloser, err = kc.ManifestFileReader(manifest, img)
if err != nil {
return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
}
runner.ContainerConfig.Image = imageID
- runner.Kc.ClearBlockCache()
+ kc.ClearBlockCache()
return nil
}
containerGone := make(chan struct{})
go func() {
defer close(containerGone)
- if runner.containerWaitGracePeriod < 1 {
- runner.containerWaitGracePeriod = 30 * time.Second
+ if runner.containerWatchdogInterval < 1 {
+ runner.containerWatchdogInterval = time.Minute
}
- found := time.Now()
- polling:
- for range time.NewTicker(runner.containerWaitGracePeriod / 30).C {
- ctrs, err := runner.Docker.ContainerList(context.Background(), dockertypes.ContainerListOptions{})
- if err != nil {
- runner.CrunchLog.Printf("error checking container list: %s", err)
- continue polling
- }
- for _, ctr := range ctrs {
- if ctr.ID == runner.ContainerID {
- found = time.Now()
- continue polling
- }
- }
+ for range time.NewTicker(runner.containerWatchdogInterval).C {
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(runner.containerWatchdogInterval))
+ ctr, err := runner.Docker.ContainerInspect(ctx, runner.ContainerID)
+ cancel()
runner.cStateLock.Lock()
done := runner.cRemoved || runner.ExitCode != nil
runner.cStateLock.Unlock()
if done {
- // Skip the grace period and warning
- // log if the container disappeared
- // because it finished, or we removed
- // it ourselves.
return
- }
- if time.Since(found) > runner.containerWaitGracePeriod {
- runner.CrunchLog.Printf("container %s no longer exists", runner.ContainerID)
+ } else if err != nil {
+ runner.CrunchLog.Printf("Error inspecting container: %s", err)
+ runner.checkBrokenNode(err)
+ return
+ } else if ctr.State == nil || !(ctr.State.Running || ctr.State.Status == "created") {
+ runner.CrunchLog.Printf("Container is not running: State=%v", ctr.State)
return
}
}
if err != nil {
return "", err
}
- runner.token = auth.APIToken
+ runner.token = fmt.Sprintf("v2/%s/%s/%s", auth.UUID, auth.APIToken, runner.Container.UUID)
return runner.token, nil
}
return fmt.Errorf("error getting container token: %v", err)
}
- containerClient, err := runner.MkArvClient(containerToken)
+ containerClient, _, err := runner.MkArvClient(containerToken)
if err != nil {
return fmt.Errorf("error creating container API client: %v", err)
}
}
return ps, nil
}
- cr.MkArvClient = func(token string) (IArvadosClient, error) {
+ cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, error) {
cl, err := arvadosclient.MakeArvadosClient()
if err != nil {
- return nil, err
+ return nil, nil, err
}
cl.ApiToken = token
- return cl, nil
+ kc, err := keepclient.MakeKeepClient(cl)
+ if err != nil {
+ return nil, nil, err
+ }
+ return cl, kc, nil
}
var err error
cr.LogCollection, err = (&arvados.Collection{}).FileSystem(cr.client, cr.Kc)