8 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9 "git.curoverse.com/arvados.git/sdk/go/keepclient"
10 "git.curoverse.com/arvados.git/sdk/go/manifest"
11 "github.com/curoverse/dockerclient"
25 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
26 type IArvadosClient interface {
27 Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
28 Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
29 Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
30 Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) (err error)
33 // ErrCancelled is the error returned when the container is cancelled.
34 var ErrCancelled = errors.New("Cancelled")
36 // IKeepClient is the minimal Keep API methods used by crunch-run.
37 type IKeepClient interface {
38 PutHB(hash string, buf []byte) (string, int, error)
39 ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
42 // Mount describes the mount points to create inside the container.
44 Kind string `json:"kind"`
45 Writable bool `json:"writable"`
46 PortableDataHash string `json:"portable_data_hash"`
47 UUID string `json:"uuid"`
48 DeviceType string `json:"device_type"`
49 Path string `json:"path"`
52 // Collection record returned by the API server.
53 type CollectionRecord struct {
54 ManifestText string `json:"manifest_text"`
55 PortableDataHash string `json:"portable_data_hash"`
58 type RuntimeConstraints struct {
62 // ContainerRecord is the container record returned by the API server.
63 type ContainerRecord struct {
64 UUID string `json:"uuid"`
65 Command []string `json:"command"`
66 ContainerImage string `json:"container_image"`
67 Cwd string `json:"cwd"`
68 Environment map[string]string `json:"environment"`
69 Mounts map[string]Mount `json:"mounts"`
70 OutputPath string `json:"output_path"`
71 Priority int `json:"priority"`
72 RuntimeConstraints RuntimeConstraints `json:"runtime_constraints"`
73 State string `json:"state"`
74 Output string `json:"output"`
77 // APIClientAuthorization is an arvados#api_client_authorization resource.
78 type APIClientAuthorization struct {
79 UUID string `json:"uuid"`
80 APIToken string `json:"api_token"`
83 // NewLogWriter is a factory function to create a new log writer.
84 type NewLogWriter func(name string) io.WriteCloser
86 type RunArvMount func(args []string, tok string) (*exec.Cmd, error)
88 type MkTempDir func(string, string) (string, error)
90 // ThinDockerClient is the minimal Docker client interface used by crunch-run.
91 type ThinDockerClient interface {
92 StopContainer(id string, timeout int) error
93 InspectImage(id string) (*dockerclient.ImageInfo, error)
94 LoadImage(reader io.Reader) error
95 CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error)
96 StartContainer(id string, config *dockerclient.HostConfig) error
97 AttachContainer(id string, options *dockerclient.AttachOptions) (io.ReadCloser, error)
98 Wait(id string) <-chan dockerclient.WaitResult
99 RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error)
102 // ContainerRunner is the main stateful struct used for a single execution of a
104 type ContainerRunner struct {
105 Docker ThinDockerClient
106 ArvClient IArvadosClient
109 dockerclient.ContainerConfig
110 dockerclient.HostConfig
115 loggingDone chan bool
116 CrunchLog *ThrottledLogger
117 Stdout io.WriteCloser
118 Stderr *ThrottledLogger
119 LogCollection *CollectionWriter
126 CleanupTempDir []string
129 CancelLock sync.Mutex
131 SigChan chan os.Signal
132 ArvMountExit chan error
136 // SetupSignals sets up signal handling to gracefully terminate the underlying
137 // Docker container and update state when receiving a TERM, INT or QUIT signal.
138 func (runner *ContainerRunner) SetupSignals() {
139 runner.SigChan = make(chan os.Signal, 1)
140 signal.Notify(runner.SigChan, syscall.SIGTERM)
141 signal.Notify(runner.SigChan, syscall.SIGINT)
142 signal.Notify(runner.SigChan, syscall.SIGQUIT)
144 go func(sig <-chan os.Signal) {
146 if !runner.Cancelled {
147 runner.CancelLock.Lock()
148 runner.Cancelled = true
149 if runner.ContainerID != "" {
150 runner.Docker.StopContainer(runner.ContainerID, 10)
152 runner.CancelLock.Unlock()
158 // LoadImage determines the docker image id from the container record and
159 // checks if it is available in the local Docker image store. If not, it loads
160 // the image from Keep.
161 func (runner *ContainerRunner) LoadImage() (err error) {
163 runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.ContainerRecord.ContainerImage)
165 var collection CollectionRecord
166 err = runner.ArvClient.Get("collections", runner.ContainerRecord.ContainerImage, nil, &collection)
168 return fmt.Errorf("While getting container image collection: %v", err)
170 manifest := manifest.Manifest{Text: collection.ManifestText}
171 var img, imageID string
172 for ms := range manifest.StreamIter() {
173 img = ms.FileStreamSegments[0].Name
174 if !strings.HasSuffix(img, ".tar") {
175 return fmt.Errorf("First file in the container image collection does not end in .tar")
177 imageID = img[:len(img)-4]
180 runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
182 _, err = runner.Docker.InspectImage(imageID)
184 runner.CrunchLog.Print("Loading Docker image from keep")
186 var readCloser io.ReadCloser
187 readCloser, err = runner.Kc.ManifestFileReader(manifest, img)
189 return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
192 err = runner.Docker.LoadImage(readCloser)
194 return fmt.Errorf("While loading container image into Docker: %v", err)
197 runner.CrunchLog.Print("Docker image is available")
200 runner.ContainerConfig.Image = imageID
205 func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (c *exec.Cmd, err error) {
206 c = exec.Command("arv-mount", arvMountCmd...)
208 // Copy our environment, but override ARVADOS_API_TOKEN with
209 // the container auth token.
211 for _, s := range os.Environ() {
212 if !strings.HasPrefix(s, "ARVADOS_API_TOKEN=") {
213 c.Env = append(c.Env, s)
216 c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token)
218 nt := NewThrottledLogger(runner.NewLogWriter("arv-mount"))
227 statReadme := make(chan bool)
228 runner.ArvMountExit = make(chan error)
233 time.Sleep(100 * time.Millisecond)
234 _, err = os.Stat(fmt.Sprintf("%s/by_id/README", runner.ArvMountPoint))
244 runner.ArvMountExit <- c.Wait()
245 close(runner.ArvMountExit)
251 case err := <-runner.ArvMountExit:
252 runner.ArvMount = nil
260 func (runner *ContainerRunner) SetupMounts() (err error) {
261 runner.ArvMountPoint, err = runner.MkTempDir("", "keep")
263 return fmt.Errorf("While creating keep mount temp dir: %v", err)
266 runner.CleanupTempDir = append(runner.CleanupTempDir, runner.ArvMountPoint)
270 arvMountCmd := []string{"--foreground", "--allow-other", "--read-write"}
271 collectionPaths := []string{}
274 for bind, mnt := range runner.ContainerRecord.Mounts {
275 if bind == "stdout" {
276 // Is it a "file" mount kind?
277 if mnt.Kind != "file" {
278 return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind)
281 // Does path start with OutputPath?
282 prefix := runner.ContainerRecord.OutputPath
283 if !strings.HasSuffix(prefix, "/") {
286 if !strings.HasPrefix(mnt.Path, prefix) {
287 return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix)
291 if mnt.Kind == "collection" {
293 if mnt.UUID != "" && mnt.PortableDataHash != "" {
294 return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
298 return fmt.Errorf("Writing to existing collections currently not permitted.")
301 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
302 } else if mnt.PortableDataHash != "" {
304 return fmt.Errorf("Can never write to a collection specified by portable data hash")
306 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash)
308 src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
309 arvMountCmd = append(arvMountCmd, "--mount-tmp")
310 arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount))
314 if bind == runner.ContainerRecord.OutputPath {
315 runner.HostOutputDir = src
317 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
319 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
321 collectionPaths = append(collectionPaths, src)
322 } else if mnt.Kind == "tmp" {
323 if bind == runner.ContainerRecord.OutputPath {
324 runner.HostOutputDir, err = runner.MkTempDir("", "")
326 return fmt.Errorf("While creating mount temp dir: %v", err)
328 st, staterr := os.Stat(runner.HostOutputDir)
330 return fmt.Errorf("While Stat on temp dir: %v", staterr)
332 err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid|0777)
334 return fmt.Errorf("While Chmod temp dir: %v", err)
336 runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir)
337 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
339 runner.Binds = append(runner.Binds, bind)
344 if runner.HostOutputDir == "" {
345 return fmt.Errorf("Output path does not correspond to a writable mount point")
349 arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
351 arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
353 arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
355 token, err := runner.ContainerToken()
357 return fmt.Errorf("could not get container token: %s", err)
360 runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token)
362 return fmt.Errorf("While trying to start arv-mount: %v", err)
365 for _, p := range collectionPaths {
368 return fmt.Errorf("While checking that input files exist: %v", err)
375 func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
376 // Handle docker log protocol
377 // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
379 header := make([]byte, 8)
381 _, readerr := io.ReadAtLeast(containerReader, header, 8)
384 readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
387 _, readerr = io.CopyN(runner.Stdout, containerReader, readsize)
390 _, readerr = io.CopyN(runner.Stderr, containerReader, readsize)
395 if readerr != io.EOF {
396 runner.CrunchLog.Printf("While reading docker logs: %v", readerr)
399 closeerr := runner.Stdout.Close()
401 runner.CrunchLog.Printf("While closing stdout logs: %v", closeerr)
404 closeerr = runner.Stderr.Close()
406 runner.CrunchLog.Printf("While closing stderr logs: %v", closeerr)
409 runner.loggingDone <- true
410 close(runner.loggingDone)
416 // AttachLogs connects the docker container stdout and stderr logs to the
417 // Arvados logger which logs to Keep and the API server logs table.
418 func (runner *ContainerRunner) AttachStreams() (err error) {
420 runner.CrunchLog.Print("Attaching container streams")
422 var containerReader io.Reader
423 containerReader, err = runner.Docker.AttachContainer(runner.ContainerID,
424 &dockerclient.AttachOptions{Stream: true, Stdout: true, Stderr: true})
426 return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
429 runner.loggingDone = make(chan bool)
431 if stdoutMnt, ok := runner.ContainerRecord.Mounts["stdout"]; ok {
432 stdoutPath := stdoutMnt.Path[len(runner.ContainerRecord.OutputPath):]
433 index := strings.LastIndex(stdoutPath, "/")
435 subdirs := stdoutPath[:index]
437 st, err := os.Stat(runner.HostOutputDir)
439 return fmt.Errorf("While Stat on temp dir: %v", err)
441 stdoutPath := path.Join(runner.HostOutputDir, subdirs)
442 err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
444 return fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
448 stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
450 return fmt.Errorf("While creating stdout file: %v", err)
452 runner.Stdout = stdoutFile
454 runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
456 runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
458 go runner.ProcessDockerAttach(containerReader)
463 // CreateContainer creates the docker container.
464 func (runner *ContainerRunner) CreateContainer() error {
465 runner.CrunchLog.Print("Creating Docker container")
467 runner.ContainerConfig.Cmd = runner.ContainerRecord.Command
468 if runner.ContainerRecord.Cwd != "." {
469 runner.ContainerConfig.WorkingDir = runner.ContainerRecord.Cwd
472 for k, v := range runner.ContainerRecord.Environment {
473 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
475 if wantAPI := runner.ContainerRecord.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
476 tok, err := runner.ContainerToken()
480 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env,
481 "ARVADOS_API_TOKEN="+tok,
482 "ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"),
483 "ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"),
485 runner.ContainerConfig.NetworkDisabled = false
487 runner.ContainerConfig.NetworkDisabled = true
491 runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
493 return fmt.Errorf("While creating container: %v", err)
496 runner.HostConfig = dockerclient.HostConfig{Binds: runner.Binds,
497 LogConfig: dockerclient.LogConfig{Type: "none"}}
499 return runner.AttachStreams()
502 // StartContainer starts the docker container created by CreateContainer.
503 func (runner *ContainerRunner) StartContainer() error {
504 runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
505 err := runner.Docker.StartContainer(runner.ContainerID, &runner.HostConfig)
507 return fmt.Errorf("could not start container: %v", err)
512 // WaitFinish waits for the container to terminate, capture the exit code, and
513 // close the stdout/stderr logging.
514 func (runner *ContainerRunner) WaitFinish() error {
515 runner.CrunchLog.Print("Waiting for container to finish")
517 result := runner.Docker.Wait(runner.ContainerID)
520 return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
522 runner.ExitCode = &wr.ExitCode
524 // wait for stdout/stderr to complete
530 // HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
531 func (runner *ContainerRunner) CaptureOutput() error {
532 if runner.finalState != "Complete" {
536 if runner.HostOutputDir == "" {
540 _, err := os.Stat(runner.HostOutputDir)
542 return fmt.Errorf("While checking host output path: %v", err)
545 var manifestText string
547 collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir)
548 _, err = os.Stat(collectionMetafile)
551 cw := CollectionWriter{runner.Kc, nil, sync.Mutex{}}
552 manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
554 return fmt.Errorf("While uploading output files: %v", err)
557 // FUSE mount directory
558 file, openerr := os.Open(collectionMetafile)
560 return fmt.Errorf("While opening FUSE metafile: %v", err)
564 rec := CollectionRecord{}
565 err = json.NewDecoder(file).Decode(&rec)
567 return fmt.Errorf("While reading FUSE metafile: %v", err)
569 manifestText = rec.ManifestText
572 var response CollectionRecord
573 err = runner.ArvClient.Create("collections",
575 "collection": arvadosclient.Dict{
576 "manifest_text": manifestText}},
579 return fmt.Errorf("While creating output collection: %v", err)
582 runner.OutputPDH = new(string)
583 *runner.OutputPDH = response.PortableDataHash
588 func (runner *ContainerRunner) CleanupDirs() {
589 if runner.ArvMount != nil {
590 umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
591 umnterr := umount.Run()
593 runner.CrunchLog.Printf("While running fusermount: %v", umnterr)
596 mnterr := <-runner.ArvMountExit
598 runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr)
602 for _, tmpdir := range runner.CleanupTempDir {
603 rmerr := os.RemoveAll(tmpdir)
605 runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr)
610 // CommitLogs posts the collection containing the final container logs.
611 func (runner *ContainerRunner) CommitLogs() error {
612 runner.CrunchLog.Print(runner.finalState)
613 runner.CrunchLog.Close()
615 // Closing CrunchLog above allows it to be committed to Keep at this
616 // point, but re-open crunch log with ArvClient in case there are any
617 // other further (such as failing to write the log to Keep!) while
619 runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID,
622 if runner.LogsPDH != nil {
623 // If we have already assigned something to LogsPDH,
624 // we must be closing the re-opened log, which won't
625 // end up getting attached to the container record and
626 // therefore doesn't need to be saved as a collection
627 // -- it exists only to send logs to other channels.
631 mt, err := runner.LogCollection.ManifestText()
633 return fmt.Errorf("While creating log manifest: %v", err)
636 var response CollectionRecord
637 err = runner.ArvClient.Create("collections",
639 "collection": arvadosclient.Dict{
640 "name": "logs for " + runner.ContainerRecord.UUID,
641 "manifest_text": mt}},
644 return fmt.Errorf("While creating log collection: %v", err)
647 runner.LogsPDH = &response.PortableDataHash
652 // UpdateContainerRecordRunning updates the container state to "Running"
653 func (runner *ContainerRunner) UpdateContainerRecordRunning() error {
654 runner.CancelLock.Lock()
655 defer runner.CancelLock.Unlock()
656 if runner.Cancelled {
659 return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID,
660 arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
663 // ContainerToken returns the api_token the container (and any
664 // arv-mount processes) are allowed to use.
665 func (runner *ContainerRunner) ContainerToken() (string, error) {
666 if runner.token != "" {
667 return runner.token, nil
670 var auth APIClientAuthorization
671 err := runner.ArvClient.Call("GET", "containers", runner.ContainerRecord.UUID, "auth", nil, &auth)
675 runner.token = auth.APIToken
676 return runner.token, nil
679 // UpdateContainerRecordComplete updates the container record state on API
680 // server to "Complete" or "Cancelled"
681 func (runner *ContainerRunner) UpdateContainerRecordFinal() error {
682 update := arvadosclient.Dict{}
683 update["state"] = runner.finalState
684 if runner.finalState == "Complete" {
685 if runner.LogsPDH != nil {
686 update["log"] = *runner.LogsPDH
688 if runner.ExitCode != nil {
689 update["exit_code"] = *runner.ExitCode
691 if runner.OutputPDH != nil {
692 update["output"] = *runner.OutputPDH
695 return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, arvadosclient.Dict{"container": update}, nil)
698 // IsCancelled returns the value of Cancelled, with goroutine safety.
699 func (runner *ContainerRunner) IsCancelled() bool {
700 runner.CancelLock.Lock()
701 defer runner.CancelLock.Unlock()
702 return runner.Cancelled
705 // NewArvLogWriter creates an ArvLogWriter
706 func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
707 return &ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID, name, runner.LogCollection.Open(name + ".txt")}
710 // Run the full container lifecycle.
711 func (runner *ContainerRunner) Run() (err error) {
712 runner.CrunchLog.Printf("Executing container '%s'", runner.ContainerRecord.UUID)
714 hostname, hosterr := os.Hostname()
716 runner.CrunchLog.Printf("Error getting hostname '%v'", hosterr)
718 runner.CrunchLog.Printf("Executing on host '%s'", hostname)
721 // Clean up temporary directories _after_ finalizing
722 // everything (if we've made any by then)
723 defer runner.CleanupDirs()
725 runner.finalState = "Queued"
728 // checkErr prints e (unless it's nil) and sets err to
729 // e (unless err is already non-nil). Thus, if err
730 // hasn't already been assigned when Run() returns,
731 // this cleanup func will cause Run() to return the
732 // first non-nil error that is passed to checkErr().
733 checkErr := func(e error) {
737 runner.CrunchLog.Print(e)
743 // Log the error encountered in Run(), if any
746 if runner.finalState == "Queued" {
747 runner.UpdateContainerRecordFinal()
751 if runner.IsCancelled() {
752 runner.finalState = "Cancelled"
753 // but don't return yet -- we still want to
754 // capture partial output and write logs
757 checkErr(runner.CaptureOutput())
758 checkErr(runner.CommitLogs())
759 checkErr(runner.UpdateContainerRecordFinal())
761 // The real log is already closed, but then we opened
762 // a new one in case we needed to log anything while
764 runner.CrunchLog.Close()
767 err = runner.ArvClient.Get("containers", runner.ContainerRecord.UUID, nil, &runner.ContainerRecord)
769 err = fmt.Errorf("While getting container record: %v", err)
773 // setup signal handling
774 runner.SetupSignals()
776 // check for and/or load image
777 err = runner.LoadImage()
779 err = fmt.Errorf("While loading container image: %v", err)
783 // set up FUSE mount and binds
784 err = runner.SetupMounts()
786 err = fmt.Errorf("While setting up mounts: %v", err)
790 err = runner.CreateContainer()
795 if runner.IsCancelled() {
799 err = runner.UpdateContainerRecordRunning()
803 runner.finalState = "Cancelled"
805 err = runner.StartContainer()
810 err = runner.WaitFinish()
812 runner.finalState = "Complete"
817 // NewContainerRunner creates a new container runner.
818 func NewContainerRunner(api IArvadosClient,
820 docker ThinDockerClient,
821 containerUUID string) *ContainerRunner {
823 cr := &ContainerRunner{ArvClient: api, Kc: kc, Docker: docker}
824 cr.NewLogWriter = cr.NewArvLogWriter
825 cr.RunArvMount = cr.ArvMountCmd
826 cr.MkTempDir = ioutil.TempDir
827 cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}}
828 cr.ContainerRecord.UUID = containerUUID
829 cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
830 cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
837 containerId := flag.Arg(0)
839 api, err := arvadosclient.MakeArvadosClient()
841 log.Fatalf("%s: %v", containerId, err)
845 var kc *keepclient.KeepClient
846 kc, err = keepclient.MakeKeepClient(&api)
848 log.Fatalf("%s: %v", containerId, err)
852 var docker *dockerclient.DockerClient
853 docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
855 log.Fatalf("%s: %v", containerId, err)
858 cr := NewContainerRunner(api, kc, docker, containerId)
862 log.Fatalf("%s: %v", containerId, err)