// ContainerRunner is the main stateful struct used for a single execution of a
// container.
type ContainerRunner struct {
- Docker ThinDockerClient
+ ContainerExecRunner ThinContainerExecRunner
// Dispatcher client is initialized with the Dispatcher token.
// This is a privileged token used to manage container status
ContainerArvClient IArvadosClient
ContainerKeepClient IKeepClient
- Container arvados.Container
- ContainerConfig dockercontainer.Config
- HostConfig dockercontainer.HostConfig
- token string
- ContainerID string
- ExitCode *int
- NewLogWriter NewLogWriter
- loggingDone chan bool
- CrunchLog *ThrottledLogger
- Stdout io.WriteCloser
- Stderr io.WriteCloser
- logUUID string
- logMtx sync.Mutex
- LogCollection arvados.CollectionFileSystem
- LogsPDH *string
- RunArvMount RunArvMount
- MkTempDir MkTempDir
- ArvMount *exec.Cmd
- ArvMountPoint string
- HostOutputDir string
- Binds []string
- Volumes map[string]struct{}
- OutputPDH *string
- SigChan chan os.Signal
- ArvMountExit chan error
- SecretMounts map[string]arvados.Mount
- MkArvClient func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
- finalState string
- parentTemp string
+ Container arvados.Container
+ token string
+ ContainerID string
+ ExitCode *int
+ NewLogWriter NewLogWriter
+ loggingDone chan bool
+ CrunchLog *ThrottledLogger
+ Stdout io.WriteCloser
+ Stderr io.WriteCloser
+ logUUID string
+ logMtx sync.Mutex
+ LogCollection arvados.CollectionFileSystem
+ LogsPDH *string
+ RunArvMount RunArvMount
+ MkTempDir MkTempDir
+ ArvMount *exec.Cmd
+ ArvMountPoint string
+ HostOutputDir string
+ Binds []string
+ Volumes map[string]struct{}
+ OutputPDH *string
+ SigChan chan os.Signal
+ ArvMountExit chan error
+ SecretMounts map[string]arvados.Mount
+ MkArvClient func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
+ finalState string
+ parentTemp string
statLogger io.WriteCloser
statReporter *crunchstat.Reporter
}
runner.cCancelled = true
runner.CrunchLog.Printf("removing container")
- err := runner.Docker.ContainerRemove(context.TODO(), runner.ContainerID, dockertypes.ContainerRemoveOptions{Force: true})
+ err := runner.ContainerExecRunner.ContainerRemove(context.TODO(), runner.ContainerID, ContainerRemoveOptions{Force: true})
if err != nil {
runner.CrunchLog.Printf("error removing container: %s", err)
}
runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
- _, _, err = runner.Docker.ImageInspectWithRaw(context.TODO(), imageID)
+ _, _, err = runner.ContainerExecRunner.ImageInspectWithRaw(context.TODO(), imageID)
if err != nil {
runner.CrunchLog.Print("Loading Docker image from keep")
return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
}
- response, err := runner.Docker.ImageLoad(context.TODO(), readCloser, true)
+ response, err := runner.ContainerExecRunner.ImageLoad(context.TODO(), readCloser, true)
if err != nil {
return fmt.Errorf("While loading container image into Docker: %v", err)
}
runner.CrunchLog.Print("Docker image is available")
}
- runner.ContainerConfig.Image = imageID
+ runner.ContainerExecRunner.SetImage(imageID)
runner.ContainerKeepClient.ClearBlockCache()
}
stdinUsed := stdinRdr != nil || len(stdinJSON) != 0
- response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
- dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
+ response, err := runner.ContainerExecRunner.ContainerAttach(context.TODO(), runner.ContainerID,
+ ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
if err != nil {
return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
}
func (runner *ContainerRunner) CreateContainer() error {
runner.CrunchLog.Print("Creating Docker container")
- runner.ContainerConfig.Cmd = runner.Container.Command
+ containerConfig, err := runner.ContainerExecRunner.GetContainerConfig()
+ hostConfig, err := runner.ContainerExecRunner.GetHostConfig()
+
+ containerConfig.Cmd = runner.Container.Command
if runner.Container.Cwd != "." {
- runner.ContainerConfig.WorkingDir = runner.Container.Cwd
+ containerConfig.WorkingDir = runner.Container.Cwd
}
for k, v := range runner.Container.Environment {
- runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
+ containerConfig.Env = append(containerConfig.Env, k+"="+v)
}
- runner.ContainerConfig.Volumes = runner.Volumes
+ containerConfig.Volumes = runner.Volumes
maxRAM := int64(runner.Container.RuntimeConstraints.RAM)
minDockerRAM := int64(16)
// Docker daemon won't let you set a limit less than ~10 MiB
maxRAM = minDockerRAM * 1024 * 1024
}
- runner.HostConfig = dockercontainer.HostConfig{
+ hostConfig = HostConfig{
Binds: runner.Binds,
- LogConfig: dockercontainer.LogConfig{
+ LogConfig: LogConfig{
Type: "none",
},
- Resources: dockercontainer.Resources{
+ Resources: Resources{
CgroupParent: runner.setCgroupParent,
NanoCPUs: int64(runner.Container.RuntimeConstraints.VCPUs) * 1000000000,
Memory: maxRAM, // RAM
KernelMemory: maxRAM, // kernel portion
},
}
-
+ runner.ContainerExecRunner.SetHostConfig(hostConfig)
if runner.Container.RuntimeConstraints.API {
tok, err := runner.ContainerToken()
if err != nil {
return err
}
- runner.ContainerConfig.Env = append(runner.ContainerConfig.Env,
+ containerConfig.Env = append(containerConfig.Env,
"ARVADOS_API_TOKEN="+tok,
"ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"),
"ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"),
)
- runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode)
+ runner.ContainerExecRunner.SetNetworkMode(NetworkMode(runner.networkMode))
} else {
if runner.enableNetwork == "always" {
- runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode)
+ runner.ContainerExecRunner.SetNetworkMode(NetworkMode(runner.networkMode))
} else {
- runner.HostConfig.NetworkMode = dockercontainer.NetworkMode("none")
+ runner.ContainerExecRunner.SetNetworkMode("none")
}
}
_, stdinUsed := runner.Container.Mounts["stdin"]
- runner.ContainerConfig.OpenStdin = stdinUsed
- runner.ContainerConfig.StdinOnce = stdinUsed
- runner.ContainerConfig.AttachStdin = stdinUsed
- runner.ContainerConfig.AttachStdout = true
- runner.ContainerConfig.AttachStderr = true
+ containerConfig.OpenStdin = stdinUsed
+ containerConfig.StdinOnce = stdinUsed
+ containerConfig.AttachStdin = stdinUsed
+ containerConfig.AttachStdout = true
+ containerConfig.AttachStderr = true
- createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID)
+ createdBody, err := runner.ContainerExecRunner.ContainerCreate(context.TODO(), containerConfig, hostConfig, nil, runner.Container.UUID)
if err != nil {
return fmt.Errorf("While creating container: %v", err)
}
if runner.cCancelled {
return ErrCancelled
}
- err := runner.Docker.ContainerStart(context.TODO(), runner.ContainerID,
- dockertypes.ContainerStartOptions{})
+ err := runner.ContainerExecRunner.ContainerStart(context.TODO(), runner.ContainerID,
+ ContainerStartOptions{})
if err != nil {
var advice string
if m, e := regexp.MatchString("(?ms).*(exec|System error).*(no such file or directory|file not found).*", err.Error()); m && e == nil {
var runTimeExceeded <-chan time.Time
runner.CrunchLog.Print("Waiting for container to finish")
- waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, dockercontainer.WaitConditionNotRunning)
+ waitOk, waitErr := runner.ContainerExecRunner.ContainerWait(context.TODO(), runner.ContainerID, WaitConditionNotRunning)
arvMountExit := runner.ArvMountExit
if timeout := runner.Container.SchedulingParameters.MaxRunTime; timeout > 0 {
runTimeExceeded = time.After(time.Duration(timeout) * time.Second)
}
for range time.NewTicker(runner.containerWatchdogInterval).C {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(runner.containerWatchdogInterval))
- ctr, err := runner.Docker.ContainerInspect(ctx, runner.ContainerID)
+ ctr, err := runner.ContainerExecRunner.ContainerInspect(ctx, runner.ContainerID)
cancel()
runner.cStateLock.Lock()
done := runner.cRemoved || runner.ExitCode != nil
func NewContainerRunner(dispatcherClient *arvados.Client,
dispatcherArvClient IArvadosClient,
dispatcherKeepClient IKeepClient,
- docker ThinDockerClient,
+ containerRunner ThinContainerExecRunner,
containerUUID string) (*ContainerRunner, error) {
cr := &ContainerRunner{
dispatcherClient: dispatcherClient,
DispatcherArvClient: dispatcherArvClient,
DispatcherKeepClient: dispatcherKeepClient,
- Docker: docker,
+ ContainerExecRunner: containerRunner,
}
cr.NewLogWriter = cr.NewArvLogWriter
cr.RunArvMount = cr.ArvMountCmd
`)
memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container")
flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
-
+ containerRunner := flags.String("container-runner", "docker",
+ `Specify the container runner. available options: docker, singularity.
+ `)
ignoreDetachFlag := false
if len(args) > 0 && args[0] == "-no-detach" {
// This process was invoked by a parent process, which
kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
kc.Retries = 4
- // API version 1.21 corresponds to Docker 1.9, which is currently the
- // minimum version we want to support.
- docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
+ var cr *ContainerRunner
+ if *containerRunner == "docker" {
+ // API version 1.21 corresponds to Docker 1.9, which is currently the
+ // minimum version we want to support.
+ docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
- cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, docker, containerID)
- if err != nil {
- log.Print(err)
- return 1
- }
- if dockererr != nil {
- cr.CrunchLog.Printf("%s: %v", containerID, dockererr)
- cr.checkBrokenNode(dockererr)
- cr.CrunchLog.Close()
- return 1
- }
+ cr, err = NewContainerRunner(arvados.NewClientFromEnv(), api, kc, adapter(docker), containerID)
+ if err != nil {
+ log.Print(err)
+ return 1
+ }
+ if dockererr != nil {
+ cr.CrunchLog.Printf("%s: %v", containerID, dockererr)
+ cr.checkBrokenNode(dockererr)
+ cr.CrunchLog.Close()
+ return 1
+ }
+ } else {
+ // Singularity
+
+ singularity, singularityerr := NewSingularityClient()
+
+ cr, err = NewContainerRunner(arvados.NewClientFromEnv(), api, kc, singularity, containerID)
+ if err != nil {
+ log.Print(err)
+ return 1
+ }
+ if singularityerr != nil {
+ cr.CrunchLog.Printf("%s: %v", containerID, singularityerr)
+ //cr.checkBrokenNode(singularityrerr) //
+ cr.CrunchLog.Close()
+ return 1
+ }
+ }
cr.gateway = Gateway{
Address: os.Getenv("GatewayAddress"),
AuthSecret: os.Getenv("GatewayAuthSecret"),
Log: cr.CrunchLog,
}
os.Unsetenv("GatewayAuthSecret")
- err = cr.gateway.Start()
- if err != nil {
- log.Printf("error starting gateway server: %s", err)
- return 1
+ if cr.gateway.Address != "" {
+ err = cr.gateway.Start()
+ if err != nil {
+ log.Printf("error starting gateway server: %s", err)
+ return 1
+ }
}
parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerID+".")