ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
LocalLocator(locator string) (string, error)
ClearBlockCache()
+ SetStorageClasses(sc []string)
}
// NewLogWriter is a factory function to create a new log writer.
// ContainerRunner is the main stateful struct used for a single execution of a
// container.
type ContainerRunner struct {
- executor containerExecutor
+ executor containerExecutor
+ executorStdin io.Closer
+ executorStdout io.Closer
+ executorStderr io.Closer
// Dispatcher client is initialized with the Dispatcher token.
// This is a privileged token used to manage container status
ExitCode *int
NewLogWriter NewLogWriter
CrunchLog *ThrottledLogger
- Stdout io.WriteCloser
- Stderr io.WriteCloser
logUUID string
logMtx sync.Mutex
LogCollection arvados.CollectionFileSystem
cStateLock sync.Mutex
cCancelled bool // StopContainer() invoked
- enableNetwork string // one of "default" or "always"
- networkMode string // "none", "host", or "" -- passed through to executor
- arvMountLog *ThrottledLogger
+ enableMemoryLimit bool
+ enableNetwork string // one of "default" or "always"
+ networkMode string // "none", "host", or "" -- passed through to executor
+ arvMountLog *ThrottledLogger
containerWatchdogInterval time.Duration
}
runner.arvMountLog = NewThrottledLogger(w)
c.Stdout = runner.arvMountLog
- c.Stderr = runner.arvMountLog
+ c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr)
runner.CrunchLog.Printf("Running %v", c.Args)
if err != nil {
return nil, fmt.Errorf("could not get container token: %s", err)
}
+ runner.CrunchLog.Printf("container token %q", token)
pdhOnly := true
tmpcount := 0
"--foreground",
"--allow-other",
"--read-write",
+ "--storage-classes", strings.Join(runner.Container.OutputStorageClasses, ","),
fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())}
if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
// CreateContainer creates the docker container.
func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[string]bindmount) error {
- var stdin io.ReadCloser
+ var stdin io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil))
if mnt, ok := runner.Container.Mounts["stdin"]; ok {
switch mnt.Kind {
case "collection":
// both "" and "." mean default
workdir = ""
}
-
+ ram := runner.Container.RuntimeConstraints.RAM
+ if !runner.enableMemoryLimit {
+ ram = 0
+ }
+ runner.executorStdin = stdin
+ runner.executorStdout = stdout
+ runner.executorStderr = stderr
return runner.executor.Create(containerSpec{
Image: imageID,
VCPUs: runner.Container.RuntimeConstraints.VCPUs,
- RAM: runner.Container.RuntimeConstraints.RAM,
+ RAM: ram,
WorkingDir: workdir,
Env: env,
BindMounts: bindmounts,
}
runner.ExitCode = &exitcode
+ var returnErr error
+ if err = runner.executorStdin.Close(); err != nil {
+ err = fmt.Errorf("error closing container stdin: %s", err)
+ runner.CrunchLog.Printf("%s", err)
+ returnErr = err
+ }
+ if err = runner.executorStdout.Close(); err != nil {
+ err = fmt.Errorf("error closing container stdout: %s", err)
+ runner.CrunchLog.Printf("%s", err)
+ if returnErr == nil {
+ returnErr = err
+ }
+ }
+ if err = runner.executorStderr.Close(); err != nil {
+ err = fmt.Errorf("error closing container stderr: %s", err)
+ runner.CrunchLog.Printf("%s", err)
+ if returnErr == nil {
+ returnErr = err
+ }
+ }
+
if runner.statReporter != nil {
runner.statReporter.Stop()
err = runner.statLogger.Close()
runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
}
}
- return nil
+ return returnErr
}
func (runner *ContainerRunner) updateLogs() {
return fmt.Errorf("error creating container API client: %v", err)
}
+ runner.ContainerKeepClient.SetStorageClasses(runner.Container.OutputStorageClasses)
+ runner.DispatcherKeepClient.SetStorageClasses(runner.Container.OutputStorageClasses)
+
err = runner.ContainerArvClient.Call("GET", "containers", runner.Container.UUID, "secret_mounts", nil, &sm)
if err != nil {
if apierr, ok := err.(arvadosclient.APIServerError); !ok || apierr.HttpStatusCode != 404 {
sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)")
kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes")
+ enableMemoryLimit := flags.Bool("enable-memory-limit", true, "tell container runtime to limit container's memory usage")
enableNetwork := flags.String("container-enable-networking", "default", "enable networking \"always\" (for all containers) or \"default\" (for containers that request it)")
networkMode := flags.String("container-network-mode", "default", `Docker network mode for container (use any argument valid for docker --net)`)
memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container")
if gwAuthSecret == "" {
// not safe to run a gateway service without an auth
// secret
+ cr.CrunchLog.Printf("Not starting a gateway server (GatewayAuthSecret was not provided by dispatcher)")
} else if gwListen := os.Getenv("GatewayAddress"); gwListen == "" {
// dispatcher did not tell us which external IP
// address to advertise --> no gateway service
+ cr.CrunchLog.Printf("Not starting a gateway server (GatewayAddress was not provided by dispatcher)")
} else if de, ok := cr.executor.(*dockerExecutor); ok {
cr.gateway = Gateway{
Address: gwListen,
cr.statInterval = *statInterval
cr.cgroupRoot = *cgroupRoot
cr.expectCgroupParent = *cgroupParent
+ cr.enableMemoryLimit = *enableMemoryLimit
cr.enableNetwork = *enableNetwork
cr.networkMode = *networkMode
if *cgroupParentSubsystem != "" {