First step in a 1-to-1 abstraction with docker
authorNico Cesar <nico@nicocesar.com>
Mon, 15 Mar 2021 20:42:26 +0000 (16:42 -0400)
committerNico Cesar <nico@nicocesar.com>
Mon, 15 Mar 2021 20:42:26 +0000 (16:42 -0400)
Arvados-DCO-1.1-Signed-off-by: Nico Cesar <nico@curii.com>

lib/crunchrun/container_exec_types.go [new file with mode: 0644]
lib/crunchrun/crunchrun.go
lib/crunchrun/crunchrun_test.go
lib/crunchrun/docker_adapter.go [new file with mode: 0644]
lib/crunchrun/singularity.go [new file with mode: 0644]

diff --git a/lib/crunchrun/container_exec_types.go b/lib/crunchrun/container_exec_types.go
new file mode 100644 (file)
index 0000000..cc78a29
--- /dev/null
@@ -0,0 +1,294 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package crunchrun
+
+import (
+       "bufio"
+       "io"
+       "net"
+
+       "golang.org/x/net/context"
+)
+
+// ContainerConfig holds all values needed for Docker and Singularity
+// to run a container. In the case of docker is similar to
+// github.com/docker/docker/api/types/container/Config
+// see https://github.com/moby/moby/blob/master/api/types/container/config.go
+// "It should hold only portable information about the container."
+// and for Singularity TBD
+type ContainerConfig struct {
+       OpenStdin    bool
+       StdinOnce    bool
+       AttachStdin  bool
+       AttachStdout bool
+       AttachStderr bool
+
+       Cmd        []string
+       WorkingDir string
+       Env        []string
+       Volumes    map[string]struct{}
+}
+
+// HostConfig holds all values needed for Docker and Singularity
+// to run a container related to the host. In the case of docker is
+// similar to github.com/docker/docker/api/types/container/HostConfig
+// see https://github.com/moby/moby/blob/master/api/types/container/host_config.go
+// "dependent of the host we are running on".
+// and for Singularity TBD
+type HostConfig struct {
+       //important bits:
+       // - Binds:
+       // LogConfig
+       // Resources: see dockercontainer.Resources
+       // NetworkMode: see dockercontainer.NetworkMode
+}
+
+// ---- NETROWKING STUFF
+// EndpointIPAMConfig represents IPAM configurations for the endpoint
+type EndpointIPAMConfig struct {
+       IPv4Address  string   `json:",omitempty"`
+       IPv6Address  string   `json:",omitempty"`
+       LinkLocalIPs []string `json:",omitempty"`
+}
+
+// EndpointSettings stores the network endpoint details
+type EndpointSettings struct {
+       // Configurations
+       IPAMConfig *EndpointIPAMConfig
+       Links      []string
+       Aliases    []string
+       // Operational data
+       NetworkID           string
+       EndpointID          string
+       Gateway             string
+       IPAddress           string
+       IPPrefixLen         int
+       IPv6Gateway         string
+       GlobalIPv6Address   string
+       GlobalIPv6PrefixLen int
+       MacAddress          string
+       DriverOpts          map[string]string
+}
+
+// ----
+// NetworkingConfig holds all values needed for Docker and Singularity
+// related network. In the case of docker is similar to
+// github.com/docker/docker/api/types/network/NetworkingConfig
+// and for Singularity TBD.
+type NetworkingConfig struct {
+       EndpointsConfig map[string]*EndpointSettings
+}
+
+// ContainerCreateResponse in the case of docker will be similar to
+// github.com/docker/docker/api/types/container/ContainerCreateCreatedBody
+// and for Singularity TBD.
+type ContainerCreateResponse struct {
+       // The ID of the created container
+       // Required: true
+       ID string
+       // Warnings encountered when creating the container
+       // Required: true
+       Warnings []string
+}
+
+// ContainerStartOptions in the case of docker will be similar to
+// github.com/docker/docker/api/types/container/ContainerStartOptions
+// and for Singularity TBD.
+type ContainerStartOptions struct {
+       // FIXME: do we need this in this wrapping? since we only use it's zero value
+       // just to comply with Docker's ContainerStart API
+       // maybe not using it will be the best
+       CheckpointID  string
+       CheckpointDir string
+}
+
+// ContainerRemoveOptions in the case of docker will be similar to
+// github.com/docker/docker/api/types/container/ContainerRemoveOptions
+// and for Singularity TBD.
+type ContainerRemoveOptions struct {
+       // FIXME: we *only* call it with dockertypes.ContainerRemoveOptions{Force: true})
+       // may be should not be in this
+       Force bool
+}
+
+// ContainerAttachOptions in the case of docker will be similar to
+// github.com/docker/docker/api/types/container/ContainerAttachOptions
+// and for Singularity TBD.
+type ContainerAttachOptions struct {
+       Stream bool
+       Stdin  bool
+       Stdout bool
+       Stderr bool
+}
+
+// ImageRemoveOptions in the case of docker will be similar to
+// github.com/docker/docker/api/types/container/ImageRemoveOptions
+// and for Singularity TBD.
+type ImageRemoveOptions struct {
+       Force         bool
+       PruneChildren bool
+       //not used as far as I know
+}
+
+// ContainerInspectResponse in the case of docker will be similar to
+// github.com/docker/docker/api/types/ContainerJSON
+// and for Singularity TBD.
+// MAYBE call it ExecRunnerContainer? since the struct is describing  a container
+// from the underlying ExecRunner
+type ContainerInspectResponse struct {
+       //Important bits for us
+       // State = current checks: (nil, Running, Created)
+       State *ContainerState
+}
+
+// ImageInspectResponse  in the case of docker is similar to
+//  github.com/docker/docker/api/types/ImageInspect
+// and for Singularity TBD.
+// MAYBE call it ExecRunnerImage? since the struct is describing an image
+// from the underlying ExecRunner
+type ImageInspectResponse struct {
+       // we don't use the respones but we use ImageInspectWithRaw(context.TODO(), imageID)
+       // to check if we already have the docker image, maybe we can do the
+       // a imagePresent(id string) (bool)
+       ID string
+}
+
+// ImageLoadResponse returns information to the client about a load process.
+type ImageLoadResponse struct {
+       // Body must be closed to avoid a resource leak
+       Body io.ReadCloser
+       JSON bool
+}
+
+// ImageDeleteResponseItem is a reply from ImageRemove.
+type ImageDeleteResponseItem struct {
+
+       // The image ID of an image that was deleted
+       Deleted string `json:"Deleted,omitempty"`
+
+       // The image ID of an image that was untagged
+       Untagged string `json:"Untagged,omitempty"`
+}
+
+// ContainerState stores container's running state
+// it's part of ContainerJSONBase and will return by "inspect" command
+type ContainerState struct {
+       Status     string // String representation of the container state. Can be one of "created", "running", "paused", "restarting", "removing", "exited", or "dead"
+       Running    bool
+       Paused     bool
+       Restarting bool
+       OOMKilled  bool
+       Dead       bool
+       Pid        int
+       ExitCode   int
+       Error      string
+       StartedAt  string
+       FinishedAt string
+}
+
+// HijackedResponse holds connection information for a hijacked request.
+
+// HijackedResponse is needed as an artifact that comes from docker.
+// We need to figure out if this is the best abstraction at this level
+// for now this is a copy and paste from docker package. Might evolve later.
+type HijackedResponse struct {
+       Conn   net.Conn
+       Reader *bufio.Reader
+}
+
+// Close closes the hijacked connection and reader.
+func (h *HijackedResponse) Close() {
+       h.Conn.Close()
+}
+
+// CloseWriter is an interface that implements structs
+// that close input streams to prevent from writing.
+type CloseWriter interface {
+       CloseWrite() error
+}
+
+// CloseWrite closes a readWriter for writing.
+func (h *HijackedResponse) CloseWrite() error {
+       if conn, ok := h.Conn.(CloseWriter); ok {
+               return conn.CloseWrite()
+       }
+       return nil
+}
+
+//------------ End of HijackedResponse
+
+// Similar to HijackedResponse, Waitcondtion is here and will decide later how is implemented in Singularity
+
+// WaitCondition is a type used to specify a container state for which
+// to wait.
+type WaitCondition string
+
+// Possible WaitCondition Values.
+//
+// WaitConditionNotRunning (default) is used to wait for any of the non-running
+// states: "created", "exited", "dead", "removing", or "removed".
+//
+// WaitConditionNextExit is used to wait for the next time the state changes
+// to a non-running state. If the state is currently "created" or "exited",
+// this would cause Wait() to block until either the container runs and exits
+// or is removed.
+//
+// WaitConditionRemoved is used to wait for the container to be removed.
+const (
+       WaitConditionNotRunning WaitCondition = "not-running"
+       WaitConditionNextExit   WaitCondition = "next-exit"
+       WaitConditionRemoved    WaitCondition = "removed"
+)
+
+//------------ End of WaitCondition
+
+// ContainerWaitOKBody
+/// yetanother copy, this time from  from docker/api/types/container/container_wait.go.
+// That file is generated from swagger, so I don't think is a good idea to have it
+// here. but for now, I'll finish creating the abstraction layer
+
+// ContainerWaitOKBodyError container waiting error, if any
+// swagger:model ContainerWaitOKBodyError
+type ContainerWaitOKBodyError struct {
+
+       // Details of an error
+       Message string `json:"Message,omitempty"`
+}
+
+// ContainerWaitOKBody OK response to ContainerWait operation
+// swagger:model ContainerWaitOKBody
+type ContainerWaitOKBody struct {
+
+       // error
+       // Required: true
+       Error *ContainerWaitOKBodyError `json:"Error"`
+
+       // Exit code of the container
+       // Required: true
+       StatusCode int64 `json:"StatusCode"`
+}
+
+//------------ End of ContainerWaitOKBody
+
+// ThinContainerExecRunner is the "common denominator" interface for all ExecRunners
+// (either Docker or Singularity or more to come). For now is based in the
+//  ThinDockerClient interface with our own objects (instead of the ones that come
+// from docker)
+type ThinContainerExecRunner interface {
+       GetContainerConfig() (ContainerConfig, error)
+       GetHostConfig() (HostConfig, error)
+
+       ContainerAttach(ctx context.Context, container string, options ContainerAttachOptions) (HijackedResponse, error)
+       ContainerCreate(ctx context.Context, config ContainerConfig, hostConfig HostConfig, networkingConfig *NetworkingConfig, containerName string) (ContainerCreateResponse, error)
+       ContainerStart(ctx context.Context, container string, options ContainerStartOptions) error
+       ContainerRemove(ctx context.Context, container string, options ContainerRemoveOptions) error
+       ContainerWait(ctx context.Context, container string, condition WaitCondition) (<-chan ContainerWaitOKBody, <-chan error)
+
+       ContainerInspect(ctx context.Context, id string) (ContainerInspectResponse, error)
+       ImageInspectWithRaw(ctx context.Context, image string) (ImageInspectResponse, []byte, error)
+
+       ImageLoad(ctx context.Context, input io.Reader, quiet bool) (ImageLoadResponse, error)
+       ImageRemove(ctx context.Context, image string, options ImageRemoveOptions) ([]ImageDeleteResponseItem, error)
+}
index 969682f465cefe56a0430b80bef2a461d7022436..fb6aaee9996d4e6dd03f371bb685cea2919e5c14 100644 (file)
@@ -95,7 +95,12 @@ type PsProcess interface {
 // ContainerRunner is the main stateful struct used for a single execution of a
 // container.
 type ContainerRunner struct {
-       Docker ThinDockerClient
+       ContainerExecRunner ThinContainerExecRunner
+
+       //Docker          ThinDockerClient
+       ContainerConfig dockercontainer.Config     //FIXME: translate this to the ThinContainerRunner interface
+       HostConfig      dockercontainer.HostConfig //FIXME: translate this to the ThinContainerRunner interface
+       //--------------
 
        // Dispatcher client is initialized with the Dispatcher token.
        // This is a privileged token used to manage container status
@@ -119,35 +124,33 @@ type ContainerRunner struct {
        ContainerArvClient  IArvadosClient
        ContainerKeepClient IKeepClient
 
-       Container       arvados.Container
-       ContainerConfig dockercontainer.Config
-       HostConfig      dockercontainer.HostConfig
-       token           string
-       ContainerID     string
-       ExitCode        *int
-       NewLogWriter    NewLogWriter
-       loggingDone     chan bool
-       CrunchLog       *ThrottledLogger
-       Stdout          io.WriteCloser
-       Stderr          io.WriteCloser
-       logUUID         string
-       logMtx          sync.Mutex
-       LogCollection   arvados.CollectionFileSystem
-       LogsPDH         *string
-       RunArvMount     RunArvMount
-       MkTempDir       MkTempDir
-       ArvMount        *exec.Cmd
-       ArvMountPoint   string
-       HostOutputDir   string
-       Binds           []string
-       Volumes         map[string]struct{}
-       OutputPDH       *string
-       SigChan         chan os.Signal
-       ArvMountExit    chan error
-       SecretMounts    map[string]arvados.Mount
-       MkArvClient     func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
-       finalState      string
-       parentTemp      string
+       Container     arvados.Container
+       token         string
+       ContainerID   string
+       ExitCode      *int
+       NewLogWriter  NewLogWriter
+       loggingDone   chan bool
+       CrunchLog     *ThrottledLogger
+       Stdout        io.WriteCloser
+       Stderr        io.WriteCloser
+       logUUID       string
+       logMtx        sync.Mutex
+       LogCollection arvados.CollectionFileSystem
+       LogsPDH       *string
+       RunArvMount   RunArvMount
+       MkTempDir     MkTempDir
+       ArvMount      *exec.Cmd
+       ArvMountPoint string
+       HostOutputDir string
+       Binds         []string
+       Volumes       map[string]struct{}
+       OutputPDH     *string
+       SigChan       chan os.Signal
+       ArvMountExit  chan error
+       SecretMounts  map[string]arvados.Mount
+       MkArvClient   func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
+       finalState    string
+       parentTemp    string
 
        statLogger       io.WriteCloser
        statReporter     *crunchstat.Reporter
@@ -209,7 +212,7 @@ func (runner *ContainerRunner) stop(sig os.Signal) {
        }
        runner.cCancelled = true
        runner.CrunchLog.Printf("removing container")
-       err := runner.Docker.ContainerRemove(context.TODO(), runner.ContainerID, dockertypes.ContainerRemoveOptions{Force: true})
+       err := runner.ContainerExecRunner.ContainerRemove(context.TODO(), runner.ContainerID, ContainerRemoveOptions{Force: true})
        if err != nil {
                runner.CrunchLog.Printf("error removing container: %s", err)
        }
@@ -283,7 +286,7 @@ func (runner *ContainerRunner) LoadImage() (err error) {
 
        runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
 
-       _, _, err = runner.Docker.ImageInspectWithRaw(context.TODO(), imageID)
+       _, _, err = runner.ContainerExecRunner.ImageInspectWithRaw(context.TODO(), imageID)
        if err != nil {
                runner.CrunchLog.Print("Loading Docker image from keep")
 
@@ -293,7 +296,7 @@ func (runner *ContainerRunner) LoadImage() (err error) {
                        return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
                }
 
-               response, err := runner.Docker.ImageLoad(context.TODO(), readCloser, true)
+               response, err := runner.ContainerExecRunner.ImageLoad(context.TODO(), readCloser, true)
                if err != nil {
                        return fmt.Errorf("While loading container image into Docker: %v", err)
                }
@@ -976,8 +979,8 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
        }
 
        stdinUsed := stdinRdr != nil || len(stdinJSON) != 0
-       response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
-               dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
+       response, err := runner.ContainerExecRunner.ContainerAttach(context.TODO(), runner.ContainerID,
+               ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
        if err != nil {
                return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
        }
@@ -1063,16 +1066,19 @@ func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) {
 func (runner *ContainerRunner) CreateContainer() error {
        runner.CrunchLog.Print("Creating Docker container")
 
-       runner.ContainerConfig.Cmd = runner.Container.Command
+       containerConfig, err := runner.ContainerExecRunner.GetContainerConfig()
+       hostConfig, err := runner.ContainerExecRunner.GetHostConfig()
+
+       containerConfig.Cmd = runner.Container.Command
        if runner.Container.Cwd != "." {
-               runner.ContainerConfig.WorkingDir = runner.Container.Cwd
+               containerConfig.WorkingDir = runner.Container.Cwd
        }
 
        for k, v := range runner.Container.Environment {
-               runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
+               containerConfig.Env = append(containerConfig.Env, k+"="+v)
        }
 
-       runner.ContainerConfig.Volumes = runner.Volumes
+       containerConfig.Volumes = runner.Volumes
 
        maxRAM := int64(runner.Container.RuntimeConstraints.RAM)
        minDockerRAM := int64(16)
@@ -1099,7 +1105,7 @@ func (runner *ContainerRunner) CreateContainer() error {
                if err != nil {
                        return err
                }
-               runner.ContainerConfig.Env = append(runner.ContainerConfig.Env,
+               containerConfig.Env = append(containerConfig.Env,
                        "ARVADOS_API_TOKEN="+tok,
                        "ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"),
                        "ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"),
@@ -1114,13 +1120,13 @@ func (runner *ContainerRunner) CreateContainer() error {
        }
 
        _, stdinUsed := runner.Container.Mounts["stdin"]
-       runner.ContainerConfig.OpenStdin = stdinUsed
-       runner.ContainerConfig.StdinOnce = stdinUsed
-       runner.ContainerConfig.AttachStdin = stdinUsed
-       runner.ContainerConfig.AttachStdout = true
-       runner.ContainerConfig.AttachStderr = true
+       containerConfig.OpenStdin = stdinUsed
+       containerConfig.StdinOnce = stdinUsed
+       containerConfig.AttachStdin = stdinUsed
+       containerConfig.AttachStdout = true
+       containerConfig.AttachStderr = true
 
-       createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID)
+       createdBody, err := runner.ContainerExecRunner.ContainerCreate(context.TODO(), containerConfig, hostConfig, nil, runner.Container.UUID)
        if err != nil {
                return fmt.Errorf("While creating container: %v", err)
        }
@@ -1138,8 +1144,8 @@ func (runner *ContainerRunner) StartContainer() error {
        if runner.cCancelled {
                return ErrCancelled
        }
-       err := runner.Docker.ContainerStart(context.TODO(), runner.ContainerID,
-               dockertypes.ContainerStartOptions{})
+       err := runner.ContainerExecRunner.ContainerStart(context.TODO(), runner.ContainerID,
+               ContainerStartOptions{})
        if err != nil {
                var advice string
                if m, e := regexp.MatchString("(?ms).*(exec|System error).*(no such file or directory|file not found).*", err.Error()); m && e == nil {
@@ -1156,7 +1162,7 @@ func (runner *ContainerRunner) WaitFinish() error {
        var runTimeExceeded <-chan time.Time
        runner.CrunchLog.Print("Waiting for container to finish")
 
-       waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, dockercontainer.WaitConditionNotRunning)
+       waitOk, waitErr := runner.ContainerExecRunner.ContainerWait(context.TODO(), runner.ContainerID, WaitConditionNotRunning)
        arvMountExit := runner.ArvMountExit
        if timeout := runner.Container.SchedulingParameters.MaxRunTime; timeout > 0 {
                runTimeExceeded = time.After(time.Duration(timeout) * time.Second)
@@ -1170,7 +1176,7 @@ func (runner *ContainerRunner) WaitFinish() error {
                }
                for range time.NewTicker(runner.containerWatchdogInterval).C {
                        ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(runner.containerWatchdogInterval))
-                       ctr, err := runner.Docker.ContainerInspect(ctx, runner.ContainerID)
+                       ctr, err := runner.ContainerExecRunner.ContainerInspect(ctx, runner.ContainerID)
                        cancel()
                        runner.cStateLock.Lock()
                        done := runner.cRemoved || runner.ExitCode != nil
@@ -1727,14 +1733,14 @@ func (runner *ContainerRunner) fetchContainerRecord() error {
 func NewContainerRunner(dispatcherClient *arvados.Client,
        dispatcherArvClient IArvadosClient,
        dispatcherKeepClient IKeepClient,
-       docker ThinDockerClient,
+       containerRunner ThinContainerExecRunner,
        containerUUID string) (*ContainerRunner, error) {
 
        cr := &ContainerRunner{
                dispatcherClient:     dispatcherClient,
                DispatcherArvClient:  dispatcherArvClient,
                DispatcherKeepClient: dispatcherKeepClient,
-               Docker:               docker,
+               ContainerExecRunner:  containerRunner,
        }
        cr.NewLogWriter = cr.NewArvLogWriter
        cr.RunArvMount = cr.ArvMountCmd
@@ -1794,7 +1800,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        `)
        memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container")
        flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
-
+       containerRunner := flags.String("container-runner", "docker",
+               `Specify the container runner. available options: docker, singularity.
+       `)
        ignoreDetachFlag := false
        if len(args) > 0 && args[0] == "-no-detach" {
                // This process was invoked by a parent process, which
@@ -1863,22 +1871,41 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
        kc.Retries = 4
 
-       // API version 1.21 corresponds to Docker 1.9, which is currently the
-       // minimum version we want to support.
-       docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
+       var cr *ContainerRunner
+       if *containerRunner == "docker" {
+               // API version 1.21 corresponds to Docker 1.9, which is currently the
+               // minimum version we want to support.
+               docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
 
-       cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, docker, containerID)
-       if err != nil {
-               log.Print(err)
-               return 1
-       }
-       if dockererr != nil {
-               cr.CrunchLog.Printf("%s: %v", containerID, dockererr)
-               cr.checkBrokenNode(dockererr)
-               cr.CrunchLog.Close()
-               return 1
-       }
+               cr, err = NewContainerRunner(arvados.NewClientFromEnv(), api, kc, adapter(docker), containerID)
+               if err != nil {
+                       log.Print(err)
+                       return 1
+               }
+               if dockererr != nil {
+                       cr.CrunchLog.Printf("%s: %v", containerID, dockererr)
+                       cr.checkBrokenNode(dockererr)
+                       cr.CrunchLog.Close()
+                       return 1
+               }
+       } else {
+               // Singularity
+
+               singularity, singularityerr := NewSingularityClient()
+
+               cr, err = NewContainerRunner(arvados.NewClientFromEnv(), api, kc, singularity, containerID)
+               if err != nil {
+                       log.Print(err)
+                       return 1
+               }
 
+               if singularityerr != nil {
+                       cr.CrunchLog.Printf("%s: %v", containerID, singularityerr)
+                       //cr.checkBrokenNode(singularityrerr) //
+                       cr.CrunchLog.Close()
+                       return 1
+               }
+       }
        cr.gateway = Gateway{
                Address:           os.Getenv("GatewayAddress"),
                AuthSecret:        os.Getenv("GatewayAuthSecret"),
index dbdaa6293d28c964efc237c9e1b98e44b5ef921c..ddf40543e7102aa702c576a5a47620c09176bef2 100644 (file)
@@ -439,7 +439,7 @@ func (client *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename s
 
 func (s *TestSuite) TestLoadImage(c *C) {
        cr, err := NewContainerRunner(s.client, &ArvTestClient{},
-               &KeepTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+               &KeepTestClient{}, adapter(s.docker), "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        c.Assert(err, IsNil)
 
        kc := &KeepTestClient{}
@@ -447,10 +447,10 @@ func (s *TestSuite) TestLoadImage(c *C) {
        cr.ContainerArvClient = &ArvTestClient{}
        cr.ContainerKeepClient = kc
 
-       _, err = cr.Docker.ImageRemove(nil, hwImageID, dockertypes.ImageRemoveOptions{})
+       _, err = cr.ContainerExecRunner.ImageRemove(nil, hwImageID, ImageRemoveOptions{})
        c.Check(err, IsNil)
 
-       _, _, err = cr.Docker.ImageInspectWithRaw(nil, hwImageID)
+       _, _, err = cr.ContainerExecRunner.ImageInspectWithRaw(nil, hwImageID)
        c.Check(err, NotNil)
 
        cr.Container.ContainerImage = hwPDH
@@ -463,13 +463,13 @@ func (s *TestSuite) TestLoadImage(c *C) {
 
        c.Check(err, IsNil)
        defer func() {
-               cr.Docker.ImageRemove(nil, hwImageID, dockertypes.ImageRemoveOptions{})
+               cr.ContainerExecRunner.ImageRemove(nil, hwImageID, ImageRemoveOptions{})
        }()
 
        c.Check(kc.Called, Equals, true)
        c.Check(cr.ContainerConfig.Image, Equals, hwImageID)
 
-       _, _, err = cr.Docker.ImageInspectWithRaw(nil, hwImageID)
+       _, _, err = cr.ContainerExecRunner.ImageInspectWithRaw(nil, hwImageID)
        c.Check(err, IsNil)
 
        // (2) Test using image that's already loaded
@@ -574,7 +574,7 @@ func (s *TestSuite) TestLoadImageArvError(c *C) {
 func (s *TestSuite) TestLoadImageKeepError(c *C) {
        // (2) Keep error
        kc := &KeepErrorTestClient{}
-       cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, adapter(s.docker), "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        c.Assert(err, IsNil)
 
        cr.ContainerArvClient = &ArvTestClient{}
@@ -604,7 +604,7 @@ func (s *TestSuite) TestLoadImageCollectionError(c *C) {
 func (s *TestSuite) TestLoadImageKeepReadError(c *C) {
        // (4) Collection doesn't contain image
        kc := &KeepReadErrorTestClient{}
-       cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, adapter(s.docker), "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        c.Assert(err, IsNil)
        cr.Container.ContainerImage = hwPDH
        cr.ContainerArvClient = &ArvTestClient{}
@@ -653,7 +653,7 @@ func (s *TestSuite) TestRunContainer(c *C) {
        }
        kc := &KeepTestClient{}
        defer kc.Close()
-       cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, adapter(s.docker), "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        c.Assert(err, IsNil)
 
        cr.ContainerArvClient = &ArvTestClient{}
@@ -777,7 +777,7 @@ func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, exi
        s.docker.api = api
        kc := &KeepTestClient{}
        defer kc.Close()
-       cr, err = NewContainerRunner(s.client, api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr, err = NewContainerRunner(s.client, api, kc, adapter(s.docker), "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        c.Assert(err, IsNil)
        s.runner = cr
        cr.statInterval = 100 * time.Millisecond
@@ -1136,7 +1136,7 @@ func (s *TestSuite) testStopContainer(c *C, setup func(cr *ContainerRunner)) {
        api := &ArvTestClient{Container: rec}
        kc := &KeepTestClient{}
        defer kc.Close()
-       cr, err := NewContainerRunner(s.client, api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr, err := NewContainerRunner(s.client, api, kc, adapter(s.docker), "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        c.Assert(err, IsNil)
        cr.RunArvMount = func([]string, string) (*exec.Cmd, error) { return nil, nil }
        cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
@@ -1621,7 +1621,7 @@ func (s *TestSuite) stdoutErrorRunHelper(c *C, record string, fn func(t *TestDoc
        api = &ArvTestClient{Container: rec}
        kc := &KeepTestClient{}
        defer kc.Close()
-       cr, err = NewContainerRunner(s.client, api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr, err = NewContainerRunner(s.client, api, kc, adapter(s.docker), "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        c.Assert(err, IsNil)
        am := &ArvMountCmdLine{}
        cr.RunArvMount = am.ArvMountTest
diff --git a/lib/crunchrun/docker_adapter.go b/lib/crunchrun/docker_adapter.go
new file mode 100644 (file)
index 0000000..795d06b
--- /dev/null
@@ -0,0 +1,219 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package crunchrun
+
+import (
+       "io"
+
+       dockertypes "github.com/docker/docker/api/types"
+       dockercontainer "github.com/docker/docker/api/types/container"
+       dockernetwork "github.com/docker/docker/api/types/network"
+       "golang.org/x/net/context"
+)
+
+type DockerAdapter struct {
+       docker          ThinDockerClient
+       containerConfig ContainerConfig
+       hostConfig      HostConfig
+}
+
+func (a *DockerAdapter) ContainerAttach(ctx context.Context, container string, options ContainerAttachOptions) (HijackedResponse, error) {
+       dockerOptions := dockertypes.ContainerAttachOptions{
+               Stream: options.Stream,
+               Stdin:  options.Stdin,
+               Stdout: options.Stdout,
+               Stderr: options.Stderr}
+       dockerResponse, docker_err := a.docker.ContainerAttach(ctx, container, dockerOptions)
+
+       adapterResponse := HijackedResponse{
+               Conn:   dockerResponse.Conn,
+               Reader: dockerResponse.Reader,
+       }
+
+       return adapterResponse, docker_err
+}
+
+func (a *DockerAdapter) ContainerCreate(ctx context.Context, config ContainerConfig, hostConfig HostConfig, networkingConfig *NetworkingConfig, containerName string) (ContainerCreateResponse, error) {
+       var dockerEndpointsConfig map[string]*dockernetwork.EndpointSettings
+
+       var dockerNetworkConfig *dockernetwork.NetworkingConfig
+       if networkingConfig != nil {
+               for k, v := range networkingConfig.EndpointsConfig {
+
+                       dockerIpamConfig := &dockernetwork.EndpointIPAMConfig{
+                               IPv4Address:  v.IPAMConfig.IPv4Address,
+                               IPv6Address:  v.IPAMConfig.IPv6Address,
+                               LinkLocalIPs: v.IPAMConfig.LinkLocalIPs,
+                       }
+                       dockerEndpointsConfig[k] = &dockernetwork.EndpointSettings{
+                               IPAMConfig:          dockerIpamConfig,
+                               Links:               v.Links,
+                               Aliases:             v.Aliases,
+                               NetworkID:           v.NetworkID,
+                               EndpointID:          v.EndpointID,
+                               Gateway:             v.Gateway,
+                               IPAddress:           v.IPAddress,
+                               IPPrefixLen:         v.IPPrefixLen,
+                               IPv6Gateway:         v.IPv6Gateway,
+                               GlobalIPv6Address:   v.GlobalIPv6Address,
+                               GlobalIPv6PrefixLen: v.GlobalIPv6PrefixLen,
+                               MacAddress:          v.MacAddress,
+                               DriverOpts:          v.DriverOpts,
+                       }
+
+               }
+
+               dockerNetworkConfig = &dockernetwork.NetworkingConfig{
+                       EndpointsConfig: dockerEndpointsConfig,
+               }
+       }
+       dockerConfig := dockercontainer.Config{
+               OpenStdin:    config.OpenStdin,
+               StdinOnce:    config.StdinOnce,
+               AttachStdin:  config.AttachStdin,
+               AttachStdout: config.AttachStdout,
+               AttachStderr: config.AttachStderr,
+               Cmd:          config.Cmd,
+               WorkingDir:   config.WorkingDir,
+               Env:          config.Env,
+               Volumes:      config.Volumes,
+       }
+       dockerHostConfig := dockercontainer.HostConfig{}
+
+       dockerResponse, dockerErr := a.docker.ContainerCreate(ctx,
+               &dockerConfig,
+               &dockerHostConfig, dockerNetworkConfig, containerName)
+       adapterResponse := ContainerCreateResponse{
+               ID:       dockerResponse.ID,
+               Warnings: dockerResponse.Warnings,
+       }
+       return adapterResponse, dockerErr
+}
+
+func (a *DockerAdapter) ContainerStart(ctx context.Context, container string, options ContainerStartOptions) error {
+       dockerContainerStartOptions := dockertypes.ContainerStartOptions{
+               CheckpointID:  options.CheckpointID,
+               CheckpointDir: options.CheckpointDir,
+       }
+
+       dockerErr := a.docker.ContainerStart(ctx, container, dockerContainerStartOptions)
+
+       return dockerErr
+}
+
+func (a *DockerAdapter) ContainerRemove(ctx context.Context, container string, options ContainerRemoveOptions) error {
+       dockerContainerRemoveOptions := dockertypes.ContainerRemoveOptions{Force: options.Force}
+
+       dockerErr := a.docker.ContainerRemove(ctx, container, dockerContainerRemoveOptions)
+
+       return dockerErr
+}
+
+func (a *DockerAdapter) ContainerInspect(ctx context.Context, id string) (ContainerInspectResponse, error) {
+
+       dockerContainerInspectResponse, dockerErr := a.docker.ContainerInspect(ctx, id)
+
+       containerState := &ContainerState{
+               Running:    dockerContainerInspectResponse.State.Running,
+               Paused:     dockerContainerInspectResponse.State.Paused,
+               Restarting: dockerContainerInspectResponse.State.Restarting,
+               OOMKilled:  dockerContainerInspectResponse.State.OOMKilled,
+               Dead:       dockerContainerInspectResponse.State.Dead,
+               Pid:        dockerContainerInspectResponse.State.Pid,
+               ExitCode:   dockerContainerInspectResponse.State.ExitCode,
+               Error:      dockerContainerInspectResponse.State.Error,
+               StartedAt:  dockerContainerInspectResponse.State.StartedAt,
+               FinishedAt: dockerContainerInspectResponse.State.FinishedAt,
+       }
+
+       adapterResponse := &ContainerInspectResponse{State: containerState}
+
+       return *adapterResponse, dockerErr
+}
+
+func (a *DockerAdapter) ContainerWait(ctx context.Context, container string, condition WaitCondition) (<-chan ContainerWaitOKBody, <-chan error) {
+
+       //var dockercontainerCondition dockercontainer.WaitCondition =
+       dockercontainerCondition := dockercontainer.WaitCondition(condition)
+
+       dockerContainerWaitOKBody, dockerErr := a.docker.ContainerWait(ctx, container, dockercontainerCondition)
+
+       // translate from <-chan dockercontainer.ContainerWaitOKBody to <-chan ContainerWaitOKBody,
+       adapterContainerWaitOKBody := make(chan ContainerWaitOKBody)
+       go func() {
+               for dockerMsg := range dockerContainerWaitOKBody {
+                       var adapterBodyMsg *ContainerWaitOKBody
+                       var adapterError *ContainerWaitOKBodyError
+
+                       if dockerMsg.Error != nil {
+                               adapterError = &ContainerWaitOKBodyError{
+                                       Message: dockerMsg.Error.Message,
+                               }
+                       }
+
+                       adapterBodyMsg = &ContainerWaitOKBody{
+                               Error:      adapterError,
+                               StatusCode: dockerMsg.StatusCode,
+                       }
+
+                       adapterContainerWaitOKBody <- *adapterBodyMsg
+               }
+       }()
+
+       return adapterContainerWaitOKBody, dockerErr
+}
+
+func (a *DockerAdapter) ImageInspectWithRaw(ctx context.Context, image string) (ImageInspectResponse, []byte, error) {
+       dockerImageInspectResponse, rawBytes, dockerErr := a.docker.ImageInspectWithRaw(ctx, image)
+
+       adapterImageInspectResponse := &ImageInspectResponse{
+               ID: dockerImageInspectResponse.ID,
+       }
+       return *adapterImageInspectResponse, rawBytes, dockerErr
+}
+
+func (a *DockerAdapter) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (ImageLoadResponse, error) {
+       dockerImageLoadResponse, dockerErr := a.docker.ImageLoad(ctx, input, quiet)
+
+       adapterImageLoadResponse := &ImageLoadResponse{
+               Body: dockerImageLoadResponse.Body,
+               JSON: dockerImageLoadResponse.JSON,
+       }
+
+       return *adapterImageLoadResponse, dockerErr
+}
+
+func (a *DockerAdapter) ImageRemove(ctx context.Context, image string, options ImageRemoveOptions) ([]ImageDeleteResponseItem, error) {
+
+       dockerOptions := &dockertypes.ImageRemoveOptions{
+               Force:         options.Force,
+               PruneChildren: options.PruneChildren,
+       }
+       dockerImageDeleteResponseItems, dockerErr := a.docker.ImageRemove(ctx, image, *dockerOptions)
+
+       var adapterResponses []ImageDeleteResponseItem
+       for _, dockerResponse := range dockerImageDeleteResponseItems {
+               adapterResponse := &ImageDeleteResponseItem{
+                       Deleted:  dockerResponse.Deleted,
+                       Untagged: dockerResponse.Untagged,
+               }
+               adapterResponses = append(adapterResponses, *adapterResponse)
+       }
+       return adapterResponses, dockerErr
+}
+
+func (a *DockerAdapter) GetContainerConfig() (ContainerConfig, error) {
+       return a.containerConfig, nil
+}
+
+func (a *DockerAdapter) GetHostConfig() (HostConfig, error) {
+       return a.hostConfig, nil
+}
+
+func adapter(docker ThinDockerClient) ThinContainerExecRunner {
+       return_object := &DockerAdapter{docker: docker}
+
+       return return_object
+}
diff --git a/lib/crunchrun/singularity.go b/lib/crunchrun/singularity.go
new file mode 100644 (file)
index 0000000..91ec0ea
--- /dev/null
@@ -0,0 +1,86 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package crunchrun
+
+import (
+       "fmt"
+       "io"
+
+       "golang.org/x/net/context"
+)
+
+type SingularityClient struct {
+       containerConfig ContainerConfig
+       hostConfig      HostConfig
+}
+
+func (c SingularityClient) GetContainerConfig() (ContainerConfig, error) {
+       return c.containerConfig, nil
+}
+
+func (c SingularityClient) GetHostConfig() (HostConfig, error) {
+       return c.hostConfig, nil
+}
+
+func (c SingularityClient) ContainerAttach(ctx context.Context, container string, options ContainerAttachOptions) (HijackedResponse, error) {
+       fmt.Printf("placeholder for container ContainerAttach %s", container)
+
+       return HijackedResponse{}, nil
+}
+
+func (c SingularityClient) ContainerCreate(ctx context.Context, config ContainerConfig, hostConfig HostConfig, networkingConfig *NetworkingConfig, containerName string) (ContainerCreateResponse, error) {
+       fmt.Printf("placeholder for container ContainerCreate %s", containerName)
+
+       return ContainerCreateResponse{}, nil
+}
+
+func (c SingularityClient) ContainerStart(ctx context.Context, container string, options ContainerStartOptions) error {
+       fmt.Printf("placeholder for container ContainerStart %s", container)
+
+       return nil
+}
+
+func (c SingularityClient) ContainerRemove(ctx context.Context, container string, options ContainerRemoveOptions) error {
+       fmt.Printf("placeholder for container ContainerRemove %s", container)
+
+       return nil
+}
+
+func (c SingularityClient) ContainerWait(ctx context.Context, container string, condition WaitCondition) (<-chan ContainerWaitOKBody, <-chan error) {
+       fmt.Printf("placeholder for ContainerWait")
+       chanC := make(chan ContainerWaitOKBody)
+       chanE := make(chan error)
+       return chanC, chanE
+}
+
+func (c SingularityClient) ContainerInspect(ctx context.Context, id string) (ContainerInspectResponse, error) {
+       fmt.Printf("placeholder for container ContainerInspect %s", id)
+
+       return ContainerInspectResponse{}, nil
+}
+
+func (c SingularityClient) ImageInspectWithRaw(ctx context.Context, image string) (ImageInspectResponse, []byte, error) {
+       fmt.Printf("placeholder for ImageInspectWithRaw() %s", image)
+
+       return ImageInspectResponse{}, []byte(""), nil
+}
+
+func (c SingularityClient) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (ImageLoadResponse, error) {
+       fmt.Printf("placeholder for ImageLoad")
+       return ImageLoadResponse{}, nil
+}
+
+func (c SingularityClient) ImageRemove(ctx context.Context, image string, options ImageRemoveOptions) ([]ImageDeleteResponseItem, error) {
+       fmt.Printf("placeholder for ImageRemove")
+       var responses []ImageDeleteResponseItem
+       tmp := ImageDeleteResponseItem{}
+       responses = append(responses, tmp)
+       return responses, nil
+}
+
+func NewSingularityClient() (SingularityClient, error) {
+       var s = &SingularityClient{}
+       return *s, nil
+}