First step in a 1-to-1 abstraction with docker
[arvados.git] / lib / crunchrun / crunchrun.go
index 969682f465cefe56a0430b80bef2a461d7022436..fb6aaee9996d4e6dd03f371bb685cea2919e5c14 100644 (file)
@@ -95,7 +95,12 @@ type PsProcess interface {
 // ContainerRunner is the main stateful struct used for a single execution of a
 // container.
 type ContainerRunner struct {
-       Docker ThinDockerClient
+       ContainerExecRunner ThinContainerExecRunner
+
+       //Docker          ThinDockerClient
+       ContainerConfig dockercontainer.Config     //FIXME: translate this to the ThinContainerRunner interface
+       HostConfig      dockercontainer.HostConfig //FIXME: translate this to the ThinContainerRunner interface
+       //--------------
 
        // Dispatcher client is initialized with the Dispatcher token.
        // This is a privileged token used to manage container status
@@ -119,35 +124,33 @@ type ContainerRunner struct {
        ContainerArvClient  IArvadosClient
        ContainerKeepClient IKeepClient
 
-       Container       arvados.Container
-       ContainerConfig dockercontainer.Config
-       HostConfig      dockercontainer.HostConfig
-       token           string
-       ContainerID     string
-       ExitCode        *int
-       NewLogWriter    NewLogWriter
-       loggingDone     chan bool
-       CrunchLog       *ThrottledLogger
-       Stdout          io.WriteCloser
-       Stderr          io.WriteCloser
-       logUUID         string
-       logMtx          sync.Mutex
-       LogCollection   arvados.CollectionFileSystem
-       LogsPDH         *string
-       RunArvMount     RunArvMount
-       MkTempDir       MkTempDir
-       ArvMount        *exec.Cmd
-       ArvMountPoint   string
-       HostOutputDir   string
-       Binds           []string
-       Volumes         map[string]struct{}
-       OutputPDH       *string
-       SigChan         chan os.Signal
-       ArvMountExit    chan error
-       SecretMounts    map[string]arvados.Mount
-       MkArvClient     func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
-       finalState      string
-       parentTemp      string
+       Container     arvados.Container
+       token         string
+       ContainerID   string
+       ExitCode      *int
+       NewLogWriter  NewLogWriter
+       loggingDone   chan bool
+       CrunchLog     *ThrottledLogger
+       Stdout        io.WriteCloser
+       Stderr        io.WriteCloser
+       logUUID       string
+       logMtx        sync.Mutex
+       LogCollection arvados.CollectionFileSystem
+       LogsPDH       *string
+       RunArvMount   RunArvMount
+       MkTempDir     MkTempDir
+       ArvMount      *exec.Cmd
+       ArvMountPoint string
+       HostOutputDir string
+       Binds         []string
+       Volumes       map[string]struct{}
+       OutputPDH     *string
+       SigChan       chan os.Signal
+       ArvMountExit  chan error
+       SecretMounts  map[string]arvados.Mount
+       MkArvClient   func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
+       finalState    string
+       parentTemp    string
 
        statLogger       io.WriteCloser
        statReporter     *crunchstat.Reporter
@@ -209,7 +212,7 @@ func (runner *ContainerRunner) stop(sig os.Signal) {
        }
        runner.cCancelled = true
        runner.CrunchLog.Printf("removing container")
-       err := runner.Docker.ContainerRemove(context.TODO(), runner.ContainerID, dockertypes.ContainerRemoveOptions{Force: true})
+       err := runner.ContainerExecRunner.ContainerRemove(context.TODO(), runner.ContainerID, ContainerRemoveOptions{Force: true})
        if err != nil {
                runner.CrunchLog.Printf("error removing container: %s", err)
        }
@@ -283,7 +286,7 @@ func (runner *ContainerRunner) LoadImage() (err error) {
 
        runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
 
-       _, _, err = runner.Docker.ImageInspectWithRaw(context.TODO(), imageID)
+       _, _, err = runner.ContainerExecRunner.ImageInspectWithRaw(context.TODO(), imageID)
        if err != nil {
                runner.CrunchLog.Print("Loading Docker image from keep")
 
@@ -293,7 +296,7 @@ func (runner *ContainerRunner) LoadImage() (err error) {
                        return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
                }
 
-               response, err := runner.Docker.ImageLoad(context.TODO(), readCloser, true)
+               response, err := runner.ContainerExecRunner.ImageLoad(context.TODO(), readCloser, true)
                if err != nil {
                        return fmt.Errorf("While loading container image into Docker: %v", err)
                }
@@ -976,8 +979,8 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
        }
 
        stdinUsed := stdinRdr != nil || len(stdinJSON) != 0
-       response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
-               dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
+       response, err := runner.ContainerExecRunner.ContainerAttach(context.TODO(), runner.ContainerID,
+               ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
        if err != nil {
                return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
        }
@@ -1063,16 +1066,19 @@ func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) {
 func (runner *ContainerRunner) CreateContainer() error {
        runner.CrunchLog.Print("Creating Docker container")
 
-       runner.ContainerConfig.Cmd = runner.Container.Command
+       containerConfig, err := runner.ContainerExecRunner.GetContainerConfig()
+       hostConfig, err := runner.ContainerExecRunner.GetHostConfig()
+
+       containerConfig.Cmd = runner.Container.Command
        if runner.Container.Cwd != "." {
-               runner.ContainerConfig.WorkingDir = runner.Container.Cwd
+               containerConfig.WorkingDir = runner.Container.Cwd
        }
 
        for k, v := range runner.Container.Environment {
-               runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
+               containerConfig.Env = append(containerConfig.Env, k+"="+v)
        }
 
-       runner.ContainerConfig.Volumes = runner.Volumes
+       containerConfig.Volumes = runner.Volumes
 
        maxRAM := int64(runner.Container.RuntimeConstraints.RAM)
        minDockerRAM := int64(16)
@@ -1099,7 +1105,7 @@ func (runner *ContainerRunner) CreateContainer() error {
                if err != nil {
                        return err
                }
-               runner.ContainerConfig.Env = append(runner.ContainerConfig.Env,
+               containerConfig.Env = append(containerConfig.Env,
                        "ARVADOS_API_TOKEN="+tok,
                        "ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"),
                        "ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"),
@@ -1114,13 +1120,13 @@ func (runner *ContainerRunner) CreateContainer() error {
        }
 
        _, stdinUsed := runner.Container.Mounts["stdin"]
-       runner.ContainerConfig.OpenStdin = stdinUsed
-       runner.ContainerConfig.StdinOnce = stdinUsed
-       runner.ContainerConfig.AttachStdin = stdinUsed
-       runner.ContainerConfig.AttachStdout = true
-       runner.ContainerConfig.AttachStderr = true
+       containerConfig.OpenStdin = stdinUsed
+       containerConfig.StdinOnce = stdinUsed
+       containerConfig.AttachStdin = stdinUsed
+       containerConfig.AttachStdout = true
+       containerConfig.AttachStderr = true
 
-       createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID)
+       createdBody, err := runner.ContainerExecRunner.ContainerCreate(context.TODO(), containerConfig, hostConfig, nil, runner.Container.UUID)
        if err != nil {
                return fmt.Errorf("While creating container: %v", err)
        }
@@ -1138,8 +1144,8 @@ func (runner *ContainerRunner) StartContainer() error {
        if runner.cCancelled {
                return ErrCancelled
        }
-       err := runner.Docker.ContainerStart(context.TODO(), runner.ContainerID,
-               dockertypes.ContainerStartOptions{})
+       err := runner.ContainerExecRunner.ContainerStart(context.TODO(), runner.ContainerID,
+               ContainerStartOptions{})
        if err != nil {
                var advice string
                if m, e := regexp.MatchString("(?ms).*(exec|System error).*(no such file or directory|file not found).*", err.Error()); m && e == nil {
@@ -1156,7 +1162,7 @@ func (runner *ContainerRunner) WaitFinish() error {
        var runTimeExceeded <-chan time.Time
        runner.CrunchLog.Print("Waiting for container to finish")
 
-       waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, dockercontainer.WaitConditionNotRunning)
+       waitOk, waitErr := runner.ContainerExecRunner.ContainerWait(context.TODO(), runner.ContainerID, WaitConditionNotRunning)
        arvMountExit := runner.ArvMountExit
        if timeout := runner.Container.SchedulingParameters.MaxRunTime; timeout > 0 {
                runTimeExceeded = time.After(time.Duration(timeout) * time.Second)
@@ -1170,7 +1176,7 @@ func (runner *ContainerRunner) WaitFinish() error {
                }
                for range time.NewTicker(runner.containerWatchdogInterval).C {
                        ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(runner.containerWatchdogInterval))
-                       ctr, err := runner.Docker.ContainerInspect(ctx, runner.ContainerID)
+                       ctr, err := runner.ContainerExecRunner.ContainerInspect(ctx, runner.ContainerID)
                        cancel()
                        runner.cStateLock.Lock()
                        done := runner.cRemoved || runner.ExitCode != nil
@@ -1727,14 +1733,14 @@ func (runner *ContainerRunner) fetchContainerRecord() error {
 func NewContainerRunner(dispatcherClient *arvados.Client,
        dispatcherArvClient IArvadosClient,
        dispatcherKeepClient IKeepClient,
-       docker ThinDockerClient,
+       containerRunner ThinContainerExecRunner,
        containerUUID string) (*ContainerRunner, error) {
 
        cr := &ContainerRunner{
                dispatcherClient:     dispatcherClient,
                DispatcherArvClient:  dispatcherArvClient,
                DispatcherKeepClient: dispatcherKeepClient,
-               Docker:               docker,
+               ContainerExecRunner:  containerRunner,
        }
        cr.NewLogWriter = cr.NewArvLogWriter
        cr.RunArvMount = cr.ArvMountCmd
@@ -1794,7 +1800,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        `)
        memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container")
        flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
-
+       containerRunner := flags.String("container-runner", "docker",
+               `Specify the container runner. available options: docker, singularity.
+       `)
        ignoreDetachFlag := false
        if len(args) > 0 && args[0] == "-no-detach" {
                // This process was invoked by a parent process, which
@@ -1863,22 +1871,41 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
        kc.Retries = 4
 
-       // API version 1.21 corresponds to Docker 1.9, which is currently the
-       // minimum version we want to support.
-       docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
+       var cr *ContainerRunner
+       if *containerRunner == "docker" {
+               // API version 1.21 corresponds to Docker 1.9, which is currently the
+               // minimum version we want to support.
+               docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
 
-       cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, docker, containerID)
-       if err != nil {
-               log.Print(err)
-               return 1
-       }
-       if dockererr != nil {
-               cr.CrunchLog.Printf("%s: %v", containerID, dockererr)
-               cr.checkBrokenNode(dockererr)
-               cr.CrunchLog.Close()
-               return 1
-       }
+               cr, err = NewContainerRunner(arvados.NewClientFromEnv(), api, kc, adapter(docker), containerID)
+               if err != nil {
+                       log.Print(err)
+                       return 1
+               }
+               if dockererr != nil {
+                       cr.CrunchLog.Printf("%s: %v", containerID, dockererr)
+                       cr.checkBrokenNode(dockererr)
+                       cr.CrunchLog.Close()
+                       return 1
+               }
+       } else {
+               // Singularity
+
+               singularity, singularityerr := NewSingularityClient()
+
+               cr, err = NewContainerRunner(arvados.NewClientFromEnv(), api, kc, singularity, containerID)
+               if err != nil {
+                       log.Print(err)
+                       return 1
+               }
 
+               if singularityerr != nil {
+                       cr.CrunchLog.Printf("%s: %v", containerID, singularityerr)
+                       //cr.checkBrokenNode(singularityrerr) //
+                       cr.CrunchLog.Close()
+                       return 1
+               }
+       }
        cr.gateway = Gateway{
                Address:           os.Getenv("GatewayAddress"),
                AuthSecret:        os.Getenv("GatewayAuthSecret"),