PutB(buf []byte) (string, int, error)
ReadAt(locator string, p []byte, off int) (int, error)
ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
+ LocalLocator(locator string) (string, error)
ClearBlockCache()
}
ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error
ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error)
+ ContainerInspect(ctx context.Context, id string) (dockertypes.ContainerJSON, error)
ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
SigChan chan os.Signal
ArvMountExit chan error
SecretMounts map[string]arvados.Mount
- MkArvClient func(token string) (IArvadosClient, error)
+ MkArvClient func(token string) (IArvadosClient, IKeepClient, error)
finalState string
parentTemp string
cStateLock sync.Mutex
cCancelled bool // StopContainer() invoked
+ cRemoved bool // docker confirmed the container no longer exists
enableNetwork string // one of "default" or "always"
networkMode string // passed through to HostConfig.NetworkMode
arvMountLog *ThrottledLogger
checkContainerd time.Duration
+
+ containerWatchdogInterval time.Duration
}
// setupSignals sets up signal handling to gracefully terminate the underlying
if err != nil {
runner.CrunchLog.Printf("error removing container: %s", err)
}
+ if err == nil || strings.Contains(err.Error(), "No such container: "+runner.ContainerID) {
+ runner.cRemoved = true
+ }
}
var errorBlacklist = []string{
runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.Container.ContainerImage)
+ tok, err := runner.ContainerToken()
+ if err != nil {
+ return fmt.Errorf("While getting container token (LoadImage): %v", err)
+ }
+ arvClient, kc, err := runner.MkArvClient(tok)
+ if err != nil {
+ return fmt.Errorf("While creating arv client (LoadImage): %v", err)
+ }
+
var collection arvados.Collection
- err = runner.ArvClient.Get("collections", runner.Container.ContainerImage, nil, &collection)
+ err = arvClient.Get("collections", runner.Container.ContainerImage, nil, &collection)
if err != nil {
return fmt.Errorf("While getting container image collection: %v", err)
}
runner.CrunchLog.Print("Loading Docker image from keep")
var readCloser io.ReadCloser
- readCloser, err = runner.Kc.ManifestFileReader(manifest, img)
+ readCloser, err = kc.ManifestFileReader(manifest, img)
if err != nil {
return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
}
runner.ContainerConfig.Image = imageID
- runner.Kc.ClearBlockCache()
+ kc.ClearBlockCache()
return nil
}
runTimeExceeded = time.After(time.Duration(timeout) * time.Second)
}
+ containerGone := make(chan struct{})
+ go func() {
+ defer close(containerGone)
+ if runner.containerWatchdogInterval < 1 {
+ runner.containerWatchdogInterval = time.Minute
+ }
+ for range time.NewTicker(runner.containerWatchdogInterval).C {
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(runner.containerWatchdogInterval))
+ ctr, err := runner.Docker.ContainerInspect(ctx, runner.ContainerID)
+ cancel()
+ runner.cStateLock.Lock()
+ done := runner.cRemoved || runner.ExitCode != nil
+ runner.cStateLock.Unlock()
+ if done {
+ return
+ } else if err != nil {
+ runner.CrunchLog.Printf("Error inspecting container: %s", err)
+ runner.checkBrokenNode(err)
+ return
+ } else if ctr.State == nil || !(ctr.State.Running || ctr.State.Status == "created") {
+ runner.CrunchLog.Printf("Container is not running: State=%v", ctr.State)
+ return
+ }
+ }
+ }()
+
containerdGone := make(chan error)
defer close(containerdGone)
if runner.checkContainerd > 0 {
runner.stop(nil)
runTimeExceeded = nil
+ case <-containerGone:
+ return errors.New("docker client never returned status")
+
case err := <-containerdGone:
return err
}
}
}
-func (runner *ContainerRunner) checkpointLogs() {
- logCheckpointTicker := time.NewTicker(crunchLogCheckpointMaxDuration / 360)
- defer logCheckpointTicker.Stop()
+func (runner *ContainerRunner) updateLogs() {
+ ticker := time.NewTicker(crunchLogUpdatePeriod / 360)
+ defer ticker.Stop()
+
+ sigusr1 := make(chan os.Signal, 1)
+ signal.Notify(sigusr1, syscall.SIGUSR1)
+ defer signal.Stop(sigusr1)
- logCheckpointTime := time.Now().Add(crunchLogCheckpointMaxDuration)
- logCheckpointBytes := crunchLogCheckpointMaxBytes
+ saveAtTime := time.Now().Add(crunchLogUpdatePeriod)
+ saveAtSize := crunchLogUpdateSize
var savedSize int64
- for range logCheckpointTicker.C {
+ for {
+ select {
+ case <-ticker.C:
+ case <-sigusr1:
+ saveAtTime = time.Now()
+ }
runner.logMtx.Lock()
done := runner.LogsPDH != nil
runner.logMtx.Unlock()
return
}
size := runner.LogCollection.Size()
- if size == savedSize || (time.Now().Before(logCheckpointTime) && size < logCheckpointBytes) {
+ if size == savedSize || (time.Now().Before(saveAtTime) && size < saveAtSize) {
continue
}
- logCheckpointTime = time.Now().Add(crunchLogCheckpointMaxDuration)
- logCheckpointBytes = runner.LogCollection.Size() + crunchLogCheckpointMaxBytes
- saved, err := runner.saveLogCollection()
+ saveAtTime = time.Now().Add(crunchLogUpdatePeriod)
+ saveAtSize = runner.LogCollection.Size() + crunchLogUpdateSize
+ saved, err := runner.saveLogCollection(false)
if err != nil {
runner.CrunchLog.Printf("error updating log collection: %s", err)
continue
if err != nil {
return err
}
+ if n := len(regexp.MustCompile(` [0-9a-f]+\+\S*\+R`).FindAllStringIndex(txt, -1)); n > 0 {
+ runner.CrunchLog.Printf("Copying %d data blocks from remote input collections...", n)
+ fs, err := (&arvados.Collection{ManifestText: txt}).FileSystem(runner.client, runner.Kc)
+ if err != nil {
+ return err
+ }
+ txt, err = fs.MarshalManifest(".")
+ if err != nil {
+ return err
+ }
+ }
var resp arvados.Collection
err = runner.ArvClient.Create("collections", arvadosclient.Dict{
"ensure_unique_name": true,
// -- it exists only to send logs to other channels.
return nil
}
- saved, err := runner.saveLogCollection()
+ saved, err := runner.saveLogCollection(true)
if err != nil {
- return err
+ return fmt.Errorf("error saving log collection: %s", err)
}
runner.logMtx.Lock()
defer runner.logMtx.Unlock()
return nil
}
-func (runner *ContainerRunner) saveLogCollection() (response arvados.Collection, err error) {
+func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.Collection, err error) {
runner.logMtx.Lock()
defer runner.logMtx.Unlock()
if runner.LogsPDH != nil {
err = fmt.Errorf("error creating log manifest: %v", err)
return
}
- reqBody := arvadosclient.Dict{
- "collection": arvadosclient.Dict{
- "is_trashed": true,
- "name": "logs for " + runner.Container.UUID,
- "manifest_text": mt}}
+ updates := arvadosclient.Dict{
+ "name": "logs for " + runner.Container.UUID,
+ "manifest_text": mt,
+ }
+ if final {
+ updates["is_trashed"] = true
+ } else {
+ exp := time.Now().Add(crunchLogUpdatePeriod * 24)
+ updates["trash_at"] = exp
+ updates["delete_at"] = exp
+ }
+ reqBody := arvadosclient.Dict{"collection": updates}
if runner.logUUID == "" {
reqBody["ensure_unique_name"] = true
err = runner.ArvClient.Create("collections", reqBody, &response)
err = runner.ArvClient.Update("collections", runner.logUUID, reqBody, &response)
}
if err != nil {
- err = fmt.Errorf("error saving log collection: %v", err)
return
}
runner.logUUID = response.UUID
if err != nil {
return "", err
}
- runner.token = auth.APIToken
+ runner.token = fmt.Sprintf("v2/%s/%s/%s", auth.UUID, auth.APIToken, runner.Container.UUID)
return runner.token, nil
}
return fmt.Errorf("error getting container token: %v", err)
}
- containerClient, err := runner.MkArvClient(containerToken)
+ containerClient, _, err := runner.MkArvClient(containerToken)
if err != nil {
return fmt.Errorf("error creating container API client: %v", err)
}
}
return ps, nil
}
- cr.MkArvClient = func(token string) (IArvadosClient, error) {
+ cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, error) {
cl, err := arvadosclient.MakeArvadosClient()
if err != nil {
- return nil, err
+ return nil, nil, err
}
cl.ApiToken = token
- return cl, nil
+ kc, err := keepclient.MakeKeepClient(cl)
+ if err != nil {
+ return nil, nil, err
+ }
+ return cl, kc, nil
}
var err error
cr.LogCollection, err = (&arvados.Collection{}).FileSystem(cr.client, cr.Kc)
cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
loadLogThrottleParams(api)
- go cr.checkpointLogs()
+ go cr.updateLogs()
return cr, nil
}