8 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9 "git.curoverse.com/arvados.git/sdk/go/keepclient"
10 "git.curoverse.com/arvados.git/sdk/go/manifest"
11 "github.com/curoverse/dockerclient"
25 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
26 type IArvadosClient interface {
27 Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
28 Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
29 Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
30 Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) (err error)
33 // ErrCancelled is the error returned when the container is cancelled.
34 var ErrCancelled = errors.New("Cancelled")
36 // IKeepClient is the minimal Keep API methods used by crunch-run.
37 type IKeepClient interface {
38 PutHB(hash string, buf []byte) (string, int, error)
39 ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
42 // Mount describes the mount points to create inside the container.
44 Kind string `json:"kind"`
45 Writable bool `json:"writable"`
46 PortableDataHash string `json:"portable_data_hash"`
47 UUID string `json:"uuid"`
48 DeviceType string `json:"device_type"`
49 Path string `json:"path"`
52 // Collection record returned by the API server.
53 type CollectionRecord struct {
54 ManifestText string `json:"manifest_text"`
55 PortableDataHash string `json:"portable_data_hash"`
58 type RuntimeConstraints struct {
62 // ContainerRecord is the container record returned by the API server.
63 type ContainerRecord struct {
64 UUID string `json:"uuid"`
65 Command []string `json:"command"`
66 ContainerImage string `json:"container_image"`
67 Cwd string `json:"cwd"`
68 Environment map[string]string `json:"environment"`
69 Mounts map[string]Mount `json:"mounts"`
70 OutputPath string `json:"output_path"`
71 Priority int `json:"priority"`
72 RuntimeConstraints RuntimeConstraints `json:"runtime_constraints"`
73 State string `json:"state"`
74 Output string `json:"output"`
77 // APIClientAuthorization is an arvados#api_client_authorization resource.
78 type APIClientAuthorization struct {
79 UUID string `json:"uuid"`
80 APIToken string `json:"api_token"`
83 // NewLogWriter is a factory function to create a new log writer.
84 type NewLogWriter func(name string) io.WriteCloser
86 type RunArvMount func(args []string, tok string) (*exec.Cmd, error)
88 type MkTempDir func(string, string) (string, error)
90 // ThinDockerClient is the minimal Docker client interface used by crunch-run.
91 type ThinDockerClient interface {
92 StopContainer(id string, timeout int) error
93 InspectImage(id string) (*dockerclient.ImageInfo, error)
94 LoadImage(reader io.Reader) error
95 CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error)
96 StartContainer(id string, config *dockerclient.HostConfig) error
97 AttachContainer(id string, options *dockerclient.AttachOptions) (io.ReadCloser, error)
98 Wait(id string) <-chan dockerclient.WaitResult
99 RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error)
102 // ContainerRunner is the main stateful struct used for a single execution of a
104 type ContainerRunner struct {
105 Docker ThinDockerClient
106 ArvClient IArvadosClient
109 dockerclient.ContainerConfig
110 dockerclient.HostConfig
115 loggingDone chan bool
116 CrunchLog *ThrottledLogger
117 Stdout io.WriteCloser
118 Stderr *ThrottledLogger
119 LogCollection *CollectionWriter
126 CleanupTempDir []string
129 CancelLock sync.Mutex
131 SigChan chan os.Signal
132 ArvMountExit chan error
136 // SetupSignals sets up signal handling to gracefully terminate the underlying
137 // Docker container and update state when receiving a TERM, INT or QUIT signal.
138 func (runner *ContainerRunner) SetupSignals() {
139 runner.SigChan = make(chan os.Signal, 1)
140 signal.Notify(runner.SigChan, syscall.SIGTERM)
141 signal.Notify(runner.SigChan, syscall.SIGINT)
142 signal.Notify(runner.SigChan, syscall.SIGQUIT)
144 go func(sig <-chan os.Signal) {
146 if !runner.Cancelled {
147 runner.CancelLock.Lock()
148 runner.Cancelled = true
149 if runner.ContainerID != "" {
150 runner.Docker.StopContainer(runner.ContainerID, 10)
152 runner.CancelLock.Unlock()
158 // LoadImage determines the docker image id from the container record and
159 // checks if it is available in the local Docker image store. If not, it loads
160 // the image from Keep.
161 func (runner *ContainerRunner) LoadImage() (err error) {
163 runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.ContainerRecord.ContainerImage)
165 var collection CollectionRecord
166 err = runner.ArvClient.Get("collections", runner.ContainerRecord.ContainerImage, nil, &collection)
168 return fmt.Errorf("While getting container image collection: %v", err)
170 manifest := manifest.Manifest{Text: collection.ManifestText}
171 var img, imageID string
172 for ms := range manifest.StreamIter() {
173 img = ms.FileStreamSegments[0].Name
174 if !strings.HasSuffix(img, ".tar") {
175 return fmt.Errorf("First file in the container image collection does not end in .tar")
177 imageID = img[:len(img)-4]
180 runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
182 _, err = runner.Docker.InspectImage(imageID)
184 runner.CrunchLog.Print("Loading Docker image from keep")
186 var readCloser io.ReadCloser
187 readCloser, err = runner.Kc.ManifestFileReader(manifest, img)
189 return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
192 err = runner.Docker.LoadImage(readCloser)
194 return fmt.Errorf("While loading container image into Docker: %v", err)
197 runner.CrunchLog.Print("Docker image is available")
200 runner.ContainerConfig.Image = imageID
205 func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (c *exec.Cmd, err error) {
206 c = exec.Command("arv-mount", arvMountCmd...)
208 // Copy our environment, but override ARVADOS_API_TOKEN with
209 // the container auth token.
211 for _, s := range os.Environ() {
212 if !strings.HasPrefix(s, "ARVADOS_API_TOKEN=") {
213 c.Env = append(c.Env, s)
216 c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token)
218 nt := NewThrottledLogger(runner.NewLogWriter("arv-mount"))
227 statReadme := make(chan bool)
228 runner.ArvMountExit = make(chan error)
233 time.Sleep(100 * time.Millisecond)
234 _, err = os.Stat(fmt.Sprintf("%s/by_id/README", runner.ArvMountPoint))
244 runner.ArvMountExit <- c.Wait()
245 close(runner.ArvMountExit)
251 case err := <-runner.ArvMountExit:
252 runner.ArvMount = nil
260 func (runner *ContainerRunner) SetupMounts() (err error) {
261 runner.ArvMountPoint, err = runner.MkTempDir("", "keep")
263 return fmt.Errorf("While creating keep mount temp dir: %v", err)
266 runner.CleanupTempDir = append(runner.CleanupTempDir, runner.ArvMountPoint)
270 arvMountCmd := []string{"--foreground", "--allow-other", "--read-write"}
271 collectionPaths := []string{}
274 for bind, mnt := range runner.ContainerRecord.Mounts {
275 if bind == "stdout" {
276 // Is it a "file" mount kind?
277 if mnt.Kind != "file" {
278 return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind)
281 // Does path start with OutputPath?
282 prefix := runner.ContainerRecord.OutputPath
283 if !strings.HasSuffix(prefix, "/") {
286 if !strings.HasPrefix(mnt.Path, prefix) {
287 return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix)
291 if mnt.Kind == "collection" {
293 if mnt.UUID != "" && mnt.PortableDataHash != "" {
294 return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
298 return fmt.Errorf("Writing to existing collections currently not permitted.")
301 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
302 } else if mnt.PortableDataHash != "" {
304 return fmt.Errorf("Can never write to a collection specified by portable data hash")
306 src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash)
308 src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
309 arvMountCmd = append(arvMountCmd, "--mount-tmp")
310 arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount))
314 if bind == runner.ContainerRecord.OutputPath {
315 runner.HostOutputDir = src
317 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
319 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
321 collectionPaths = append(collectionPaths, src)
322 } else if mnt.Kind == "tmp" {
323 if bind == runner.ContainerRecord.OutputPath {
324 runner.HostOutputDir, err = runner.MkTempDir("", "")
326 return fmt.Errorf("While creating mount temp dir: %v", err)
328 st, staterr := os.Stat(runner.HostOutputDir)
330 return fmt.Errorf("While Stat on temp dir: %v", staterr)
332 err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid|0777)
334 return fmt.Errorf("While Chmod temp dir: %v", err)
336 runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir)
337 runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
339 runner.Binds = append(runner.Binds, bind)
344 if runner.HostOutputDir == "" {
345 return fmt.Errorf("Output path does not correspond to a writable mount point")
349 arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
351 arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
353 arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
355 token, err := runner.ContainerToken()
357 return fmt.Errorf("could not get container token: %s", err)
360 runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token)
362 return fmt.Errorf("While trying to start arv-mount: %v", err)
365 for _, p := range collectionPaths {
368 return fmt.Errorf("While checking that input files exist: %v", err)
375 func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
376 // Handle docker log protocol
377 // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
379 header := make([]byte, 8)
381 _, readerr := io.ReadAtLeast(containerReader, header, 8)
384 readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
387 _, readerr = io.CopyN(runner.Stdout, containerReader, readsize)
390 _, readerr = io.CopyN(runner.Stderr, containerReader, readsize)
395 if readerr != io.EOF {
396 runner.CrunchLog.Printf("While reading docker logs: %v", readerr)
399 closeerr := runner.Stdout.Close()
401 runner.CrunchLog.Printf("While closing stdout logs: %v", closeerr)
404 closeerr = runner.Stderr.Close()
406 runner.CrunchLog.Printf("While closing stderr logs: %v", closeerr)
409 runner.loggingDone <- true
410 close(runner.loggingDone)
416 // AttachLogs connects the docker container stdout and stderr logs to the
417 // Arvados logger which logs to Keep and the API server logs table.
418 func (runner *ContainerRunner) AttachStreams() (err error) {
420 runner.CrunchLog.Print("Attaching container streams")
422 var containerReader io.Reader
423 containerReader, err = runner.Docker.AttachContainer(runner.ContainerID,
424 &dockerclient.AttachOptions{Stream: true, Stdout: true, Stderr: true})
426 return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
429 runner.loggingDone = make(chan bool)
431 if stdoutMnt, ok := runner.ContainerRecord.Mounts["stdout"]; ok {
432 stdoutPath := stdoutMnt.Path[len(runner.ContainerRecord.OutputPath):]
433 index := strings.LastIndex(stdoutPath, "/")
435 subdirs := stdoutPath[:index]
437 st, err := os.Stat(runner.HostOutputDir)
439 return fmt.Errorf("While Stat on temp dir: %v", err)
441 stdoutPath := path.Join(runner.HostOutputDir, subdirs)
442 err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
444 return fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
448 stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
450 return fmt.Errorf("While creating stdout file: %v", err)
452 runner.Stdout = stdoutFile
454 runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
456 runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
458 go runner.ProcessDockerAttach(containerReader)
463 // CreateContainer creates the docker container.
464 func (runner *ContainerRunner) CreateContainer() error {
465 runner.CrunchLog.Print("Creating Docker container")
467 runner.ContainerConfig.Cmd = runner.ContainerRecord.Command
468 if runner.ContainerRecord.Cwd != "." {
469 runner.ContainerConfig.WorkingDir = runner.ContainerRecord.Cwd
472 for k, v := range runner.ContainerRecord.Environment {
473 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
475 if wantAPI := runner.ContainerRecord.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
476 tok, err := runner.ContainerToken()
480 runner.ContainerConfig.Env = append(runner.ContainerConfig.Env,
481 "ARVADOS_API_TOKEN="+tok,
482 "ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"),
483 "ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"),
487 runner.ContainerConfig.NetworkDisabled = true
490 runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
492 return fmt.Errorf("While creating container: %v", err)
495 runner.HostConfig = dockerclient.HostConfig{Binds: runner.Binds,
496 LogConfig: dockerclient.LogConfig{Type: "none"}}
498 return runner.AttachStreams()
501 // StartContainer starts the docker container created by CreateContainer.
502 func (runner *ContainerRunner) StartContainer() error {
503 runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
504 err := runner.Docker.StartContainer(runner.ContainerID, &runner.HostConfig)
506 return fmt.Errorf("could not start container: %v", err)
511 // WaitFinish waits for the container to terminate, capture the exit code, and
512 // close the stdout/stderr logging.
513 func (runner *ContainerRunner) WaitFinish() error {
514 runner.CrunchLog.Print("Waiting for container to finish")
516 result := runner.Docker.Wait(runner.ContainerID)
519 return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
521 runner.ExitCode = &wr.ExitCode
523 // wait for stdout/stderr to complete
529 // HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
530 func (runner *ContainerRunner) CaptureOutput() error {
531 if runner.finalState != "Complete" {
535 if runner.HostOutputDir == "" {
539 _, err := os.Stat(runner.HostOutputDir)
541 return fmt.Errorf("While checking host output path: %v", err)
544 var manifestText string
546 collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir)
547 _, err = os.Stat(collectionMetafile)
550 cw := CollectionWriter{runner.Kc, nil, sync.Mutex{}}
551 manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
553 return fmt.Errorf("While uploading output files: %v", err)
556 // FUSE mount directory
557 file, openerr := os.Open(collectionMetafile)
559 return fmt.Errorf("While opening FUSE metafile: %v", err)
563 rec := CollectionRecord{}
564 err = json.NewDecoder(file).Decode(&rec)
566 return fmt.Errorf("While reading FUSE metafile: %v", err)
568 manifestText = rec.ManifestText
571 var response CollectionRecord
572 err = runner.ArvClient.Create("collections",
574 "collection": arvadosclient.Dict{
575 "manifest_text": manifestText}},
578 return fmt.Errorf("While creating output collection: %v", err)
581 runner.OutputPDH = new(string)
582 *runner.OutputPDH = response.PortableDataHash
587 func (runner *ContainerRunner) CleanupDirs() {
588 if runner.ArvMount != nil {
589 umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
590 umnterr := umount.Run()
592 runner.CrunchLog.Printf("While running fusermount: %v", umnterr)
595 mnterr := <-runner.ArvMountExit
597 runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr)
601 for _, tmpdir := range runner.CleanupTempDir {
602 rmerr := os.RemoveAll(tmpdir)
604 runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr)
609 // CommitLogs posts the collection containing the final container logs.
610 func (runner *ContainerRunner) CommitLogs() error {
611 runner.CrunchLog.Print(runner.finalState)
612 runner.CrunchLog.Close()
614 // Closing CrunchLog above allows it to be committed to Keep at this
615 // point, but re-open crunch log with ArvClient in case there are any
616 // other further (such as failing to write the log to Keep!) while
618 runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID,
621 if runner.LogsPDH != nil {
622 // If we have already assigned something to LogsPDH,
623 // we must be closing the re-opened log, which won't
624 // end up getting attached to the container record and
625 // therefore doesn't need to be saved as a collection
626 // -- it exists only to send logs to other channels.
630 mt, err := runner.LogCollection.ManifestText()
632 return fmt.Errorf("While creating log manifest: %v", err)
635 var response CollectionRecord
636 err = runner.ArvClient.Create("collections",
638 "collection": arvadosclient.Dict{
639 "name": "logs for " + runner.ContainerRecord.UUID,
640 "manifest_text": mt}},
643 return fmt.Errorf("While creating log collection: %v", err)
646 runner.LogsPDH = &response.PortableDataHash
651 // UpdateContainerRecordRunning updates the container state to "Running"
652 func (runner *ContainerRunner) UpdateContainerRecordRunning() error {
653 runner.CancelLock.Lock()
654 defer runner.CancelLock.Unlock()
655 if runner.Cancelled {
658 return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID,
659 arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
662 // ContainerToken returns the api_token the container (and any
663 // arv-mount processes) are allowed to use.
664 func (runner *ContainerRunner) ContainerToken() (string, error) {
665 if runner.token != "" {
666 return runner.token, nil
669 var auth APIClientAuthorization
670 err := runner.ArvClient.Call("GET", "containers", runner.ContainerRecord.UUID, "auth", nil, &auth)
674 runner.token = auth.APIToken
675 return runner.token, nil
678 // UpdateContainerRecordComplete updates the container record state on API
679 // server to "Complete" or "Cancelled"
680 func (runner *ContainerRunner) UpdateContainerRecordFinal() error {
681 update := arvadosclient.Dict{}
682 update["state"] = runner.finalState
683 if runner.finalState == "Complete" {
684 if runner.LogsPDH != nil {
685 update["log"] = *runner.LogsPDH
687 if runner.ExitCode != nil {
688 update["exit_code"] = *runner.ExitCode
690 if runner.OutputPDH != nil {
691 update["output"] = *runner.OutputPDH
694 return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, arvadosclient.Dict{"container": update}, nil)
697 // IsCancelled returns the value of Cancelled, with goroutine safety.
698 func (runner *ContainerRunner) IsCancelled() bool {
699 runner.CancelLock.Lock()
700 defer runner.CancelLock.Unlock()
701 return runner.Cancelled
704 // NewArvLogWriter creates an ArvLogWriter
705 func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
706 return &ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID, name, runner.LogCollection.Open(name + ".txt")}
709 // Run the full container lifecycle.
710 func (runner *ContainerRunner) Run() (err error) {
711 runner.CrunchLog.Printf("Executing container '%s'", runner.ContainerRecord.UUID)
713 hostname, hosterr := os.Hostname()
715 runner.CrunchLog.Printf("Error getting hostname '%v'", hosterr)
717 runner.CrunchLog.Printf("Executing on host '%s'", hostname)
720 // Clean up temporary directories _after_ finalizing
721 // everything (if we've made any by then)
722 defer runner.CleanupDirs()
724 runner.finalState = "Queued"
727 // checkErr prints e (unless it's nil) and sets err to
728 // e (unless err is already non-nil). Thus, if err
729 // hasn't already been assigned when Run() returns,
730 // this cleanup func will cause Run() to return the
731 // first non-nil error that is passed to checkErr().
732 checkErr := func(e error) {
736 runner.CrunchLog.Print(e)
742 // Log the error encountered in Run(), if any
745 if runner.finalState == "Queued" {
746 runner.UpdateContainerRecordFinal()
750 if runner.IsCancelled() {
751 runner.finalState = "Cancelled"
752 // but don't return yet -- we still want to
753 // capture partial output and write logs
756 checkErr(runner.CaptureOutput())
757 checkErr(runner.CommitLogs())
758 checkErr(runner.UpdateContainerRecordFinal())
760 // The real log is already closed, but then we opened
761 // a new one in case we needed to log anything while
763 runner.CrunchLog.Close()
766 err = runner.ArvClient.Get("containers", runner.ContainerRecord.UUID, nil, &runner.ContainerRecord)
768 err = fmt.Errorf("While getting container record: %v", err)
772 // setup signal handling
773 runner.SetupSignals()
775 // check for and/or load image
776 err = runner.LoadImage()
778 err = fmt.Errorf("While loading container image: %v", err)
782 // set up FUSE mount and binds
783 err = runner.SetupMounts()
785 err = fmt.Errorf("While setting up mounts: %v", err)
789 err = runner.CreateContainer()
794 if runner.IsCancelled() {
798 err = runner.UpdateContainerRecordRunning()
802 runner.finalState = "Cancelled"
804 err = runner.StartContainer()
809 err = runner.WaitFinish()
811 runner.finalState = "Complete"
816 // NewContainerRunner creates a new container runner.
817 func NewContainerRunner(api IArvadosClient,
819 docker ThinDockerClient,
820 containerUUID string) *ContainerRunner {
822 cr := &ContainerRunner{ArvClient: api, Kc: kc, Docker: docker}
823 cr.NewLogWriter = cr.NewArvLogWriter
824 cr.RunArvMount = cr.ArvMountCmd
825 cr.MkTempDir = ioutil.TempDir
826 cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}}
827 cr.ContainerRecord.UUID = containerUUID
828 cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
829 cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
836 containerId := flag.Arg(0)
838 api, err := arvadosclient.MakeArvadosClient()
840 log.Fatalf("%s: %v", containerId, err)
844 var kc *keepclient.KeepClient
845 kc, err = keepclient.MakeKeepClient(&api)
847 log.Fatalf("%s: %v", containerId, err)
851 var docker *dockerclient.DockerClient
852 docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
854 log.Fatalf("%s: %v", containerId, err)
857 cr := NewContainerRunner(api, kc, docker, containerId)
861 log.Fatalf("%s: %v", containerId, err)