"time"
"git.arvados.org/arvados.git/lib/cmd"
+ "git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/lib/crunchstat"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/keepclient"
"git.arvados.org/arvados.git/sdk/go/manifest"
"golang.org/x/sys/unix"
MkArvClient func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
finalState string
parentTemp string
+ costStartTime time.Time
keepstoreLogger io.WriteCloser
keepstoreLogbuf *bufThenWrite
enableMemoryLimit bool
enableNetwork string // one of "default" or "always"
networkMode string // "none", "host", or "" -- passed through to executor
+ brokenNodeHook string // script to run if node appears to be broken
arvMountLog *ThrottledLogger
containerWatchdogInterval time.Duration
"(?ms).*oci runtime error.*starting container process.*container init.*mounting.*to rootfs.*no such file or directory.*",
"(?ms).*grpc: the connection is unavailable.*",
}
-var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)")
func (runner *ContainerRunner) runBrokenNodeHook() {
- if *brokenNodeHook == "" {
+ if runner.brokenNodeHook == "" {
path := filepath.Join(lockdir, brokenfile)
runner.CrunchLog.Printf("Writing %s to mark node as broken", path)
f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0700)
}
f.Close()
} else {
- runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
+ runner.CrunchLog.Printf("Running broken node hook %q", runner.brokenNodeHook)
// run killme script
- c := exec.Command(*brokenNodeHook)
+ c := exec.Command(runner.brokenNodeHook)
c.Stdout = runner.CrunchLog
c.Stderr = runner.CrunchLog
err := c.Run()
"--storage-classes", strings.Join(runner.Container.OutputStorageClasses, ","),
fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())}
- if runner.executor.Runtime() == "docker" {
+ if _, isdocker := runner.executor.(*dockerExecutor); isdocker {
arvMountCmd = append(arvMountCmd, "--allow-other")
}
env["ARVADOS_API_TOKEN"] = tok
env["ARVADOS_API_HOST"] = os.Getenv("ARVADOS_API_HOST")
env["ARVADOS_API_HOST_INSECURE"] = os.Getenv("ARVADOS_API_HOST_INSECURE")
+ env["ARVADOS_KEEP_SERVICES"] = os.Getenv("ARVADOS_KEEP_SERVICES")
}
workdir := runner.Container.Cwd
if workdir == "." {
}
}
runner.CrunchLog.Printf("Container exited with status code %d%s", exitcode, extra)
+ err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
+ "container": arvadosclient.Dict{"exit_code": exitcode},
+ }, nil)
+ if err != nil {
+ runner.CrunchLog.Printf("ignoring error updating exit_code: %s", err)
+ }
var returnErr error
if err = runner.executorStdin.Close(); err != nil {
continue
}
- var updated arvados.Container
err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
"container": arvadosclient.Dict{"log": saved.PortableDataHash},
- }, &updated)
+ }, nil)
if err != nil {
runner.CrunchLog.Printf("error updating container log to %s: %s", saved.PortableDataHash, err)
continue
if runner.LogsPDH != nil {
update["log"] = *runner.LogsPDH
}
- if runner.finalState == "Complete" {
- if runner.ExitCode != nil {
- update["exit_code"] = *runner.ExitCode
- }
- if runner.OutputPDH != nil {
- update["output"] = *runner.OutputPDH
- }
+ if runner.ExitCode != nil {
+ update["exit_code"] = *runner.ExitCode
+ } else {
+ update["exit_code"] = nil
+ }
+ if runner.finalState == "Complete" && runner.OutputPDH != nil {
+ update["output"] = *runner.OutputPDH
+ }
+ var it arvados.InstanceType
+ if j := os.Getenv("InstanceType"); j != "" && json.Unmarshal([]byte(j), &it) == nil && it.Price > 0 {
+ update["cost"] = it.Price * time.Now().Sub(runner.costStartTime).Seconds() / time.Hour.Seconds()
}
return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
}
func (runner *ContainerRunner) Run() (err error) {
runner.CrunchLog.Printf("crunch-run %s started", cmd.Version.String())
runner.CrunchLog.Printf("%s", currentUserAndGroups())
- runner.CrunchLog.Printf("Executing container '%s' using %s runtime", runner.Container.UUID, runner.executor.Runtime())
+ v, _ := exec.Command("arv-mount", "--version").CombinedOutput()
+ runner.CrunchLog.Printf("Using FUSE mount: %s", v)
+ runner.CrunchLog.Printf("Using container runtime: %s", runner.executor.Runtime())
+ runner.CrunchLog.Printf("Executing container: %s", runner.Container.UUID)
+ runner.costStartTime = time.Now()
hostname, hosterr := os.Hostname()
if hosterr != nil {
caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates")
detach := flags.Bool("detach", false, "Detach from parent process and run in the background")
stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin")
+ configFile := flags.String("config", arvados.DefaultConfigFile, "filename of cluster config file to try loading if -stdin-config=false (default is $ARVADOS_CONFIG)")
sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)")
kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes")
networkMode := flags.String("container-network-mode", "default", `Docker network mode for container (use any argument valid for docker --net)`)
memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container")
runtimeEngine := flags.String("runtime-engine", "docker", "container runtime: docker or singularity")
+ brokenNodeHook := flags.String("broken-node-hook", "", "script to run if node is detected to be broken (for example, Docker daemon is not running)")
flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
+ version := flags.Bool("version", false, "Write version information to stdout and exit 0.")
ignoreDetachFlag := false
if len(args) > 0 && args[0] == "-no-detach" {
if ok, code := cmd.ParseFlags(flags, prog, args, "container-uuid", stderr); !ok {
return code
+ } else if *version {
+ fmt.Fprintln(stdout, prog, cmd.Version.String())
+ return 0
} else if !*list && flags.NArg() != 1 {
fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n")
return 2
return 1
}
+ var keepstoreLogbuf bufThenWrite
var conf ConfigData
if *stdinConfig {
err := json.NewDecoder(stdin).Decode(&conf)
// fill it using the container UUID prefix.
conf.Cluster.ClusterID = containerUUID[:5]
}
+ } else {
+ conf = hpcConfData(containerUUID, *configFile, io.MultiWriter(&keepstoreLogbuf, stderr))
}
log.Printf("crunch-run %s started", cmd.Version.String())
arvadosclient.CertFiles = []string{*caCertsPath}
}
- var keepstoreLogbuf bufThenWrite
keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr))
if err != nil {
log.Print(err)
}
defer cr.executor.Close()
+ cr.brokenNodeHook = *brokenNodeHook
+
gwAuthSecret := os.Getenv("GatewayAuthSecret")
os.Unsetenv("GatewayAuthSecret")
if gwAuthSecret == "" {
// not safe to run a gateway service without an auth
// secret
cr.CrunchLog.Printf("Not starting a gateway server (GatewayAuthSecret was not provided by dispatcher)")
- } else if gwListen := os.Getenv("GatewayAddress"); gwListen == "" {
- // dispatcher did not tell us which external IP
- // address to advertise --> no gateway service
- cr.CrunchLog.Printf("Not starting a gateway server (GatewayAddress was not provided by dispatcher)")
- } else if de, ok := cr.executor.(*dockerExecutor); ok {
+ } else {
+ gwListen := os.Getenv("GatewayAddress")
cr.gateway = Gateway{
- Address: gwListen,
- AuthSecret: gwAuthSecret,
- ContainerUUID: containerUUID,
- DockerContainerID: &de.containerID,
- Log: cr.CrunchLog,
- ContainerIPAddress: dockerContainerIPAddress(&de.containerID),
+ Address: gwListen,
+ AuthSecret: gwAuthSecret,
+ ContainerUUID: containerUUID,
+ Target: cr.executor,
+ Log: cr.CrunchLog,
+ }
+ if gwListen == "" {
+ // Direct connection won't work, so we use the
+ // gateway_address field to indicate the
+ // internalURL of the controller process that
+ // has the current tunnel connection.
+ cr.gateway.ArvadosClient = cr.dispatcherClient
+ cr.gateway.UpdateTunnelURL = func(url string) {
+ cr.gateway.Address = "tunnel " + url
+ cr.DispatcherArvClient.Update("containers", containerUUID,
+ arvadosclient.Dict{"container": arvadosclient.Dict{"gateway_address": cr.gateway.Address}}, nil)
+ }
}
err = cr.gateway.Start()
if err != nil {
cr.enableNetwork = *enableNetwork
cr.networkMode = *networkMode
if *cgroupParentSubsystem != "" {
- p := findCgroup(*cgroupParentSubsystem)
+ p, err := findCgroup(*cgroupParentSubsystem)
+ if err != nil {
+ log.Printf("fatal: cgroup parent subsystem: %s", err)
+ return 1
+ }
cr.setCgroupParent = p
cr.expectCgroupParent = p
}
return 0
}
+// Try to load ConfigData in hpc (slurm/lsf) environment. This means
+// loading the cluster config from the specified file and (if that
+// works) getting the runtime_constraints container field from
+// controller to determine # VCPUs so we can calculate KeepBuffers.
+func hpcConfData(uuid string, configFile string, stderr io.Writer) ConfigData {
+ var conf ConfigData
+ conf.Cluster = loadClusterConfigFile(configFile, stderr)
+ if conf.Cluster == nil {
+ // skip loading the container record -- we won't be
+ // able to start local keepstore anyway.
+ return conf
+ }
+ arv, err := arvadosclient.MakeArvadosClient()
+ if err != nil {
+ fmt.Fprintf(stderr, "error setting up arvadosclient: %s\n", err)
+ return conf
+ }
+ arv.Retries = 8
+ var ctr arvados.Container
+ err = arv.Call("GET", "containers", uuid, "", arvadosclient.Dict{"select": []string{"runtime_constraints"}}, &ctr)
+ if err != nil {
+ fmt.Fprintf(stderr, "error getting container record: %s\n", err)
+ return conf
+ }
+ if ctr.RuntimeConstraints.VCPUs > 0 {
+ conf.KeepBuffers = ctr.RuntimeConstraints.VCPUs * conf.Cluster.Containers.LocalKeepBlobBuffersPerVCPU
+ }
+ return conf
+}
+
+// Load cluster config file from given path. If an error occurs, log
+// the error to stderr and return nil.
+func loadClusterConfigFile(path string, stderr io.Writer) *arvados.Cluster {
+ ldr := config.NewLoader(&bytes.Buffer{}, ctxlog.New(stderr, "plain", "info"))
+ ldr.Path = path
+ cfg, err := ldr.Load()
+ if err != nil {
+ fmt.Fprintf(stderr, "could not load config file %s: %s\n", path, err)
+ return nil
+ }
+ cluster, err := cfg.GetCluster("")
+ if err != nil {
+ fmt.Fprintf(stderr, "could not use config file %s: %s\n", path, err)
+ return nil
+ }
+ fmt.Fprintf(stderr, "loaded config file %s\n", path)
+ return cluster
+}
+
func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, error) {
- if configData.Cluster == nil || configData.KeepBuffers < 1 {
+ if configData.KeepBuffers < 1 {
+ fmt.Fprintf(logbuf, "not starting a local keepstore process because KeepBuffers=%v in config\n", configData.KeepBuffers)
+ return nil, nil
+ }
+ if configData.Cluster == nil {
+ fmt.Fprint(logbuf, "not starting a local keepstore process because cluster config file was not loaded\n")
return nil, nil
}
for uuid, vol := range configData.Cluster.Volumes {
// modify the cluster configuration that we feed it on stdin.
configData.Cluster.API.MaxKeepBlobBuffers = configData.KeepBuffers
- ln, err := net.Listen("tcp", "localhost:0")
+ localaddr := localKeepstoreAddr()
+ ln, err := net.Listen("tcp", net.JoinHostPort(localaddr, "0"))
if err != nil {
return nil, err
}
return nil, err
}
ln.Close()
- url := "http://localhost:" + port
+ url := "http://" + net.JoinHostPort(localaddr, port)
fmt.Fprintf(logbuf, "starting keepstore on %s\n", url)
}
return s
}
+
+// Return a suitable local interface address for a local keepstore
+// service. Currently this is the numerically lowest non-loopback ipv4
+// address assigned to a local interface that is not in any of the
+// link-local/vpn/loopback ranges 169.254/16, 100.64/10, or 127/8.
+func localKeepstoreAddr() string {
+ var ips []net.IP
+ // Ignore error (proceed with zero IPs)
+ addrs, _ := processIPs(os.Getpid())
+ for addr := range addrs {
+ ip := net.ParseIP(addr)
+ if ip == nil {
+ // invalid
+ continue
+ }
+ if ip.Mask(net.CIDRMask(8, 32)).Equal(net.IPv4(127, 0, 0, 0)) ||
+ ip.Mask(net.CIDRMask(10, 32)).Equal(net.IPv4(100, 64, 0, 0)) ||
+ ip.Mask(net.CIDRMask(16, 32)).Equal(net.IPv4(169, 254, 0, 0)) {
+ // unsuitable
+ continue
+ }
+ ips = append(ips, ip)
+ }
+ if len(ips) == 0 {
+ return "0.0.0.0"
+ }
+ sort.Slice(ips, func(ii, jj int) bool {
+ i, j := ips[ii], ips[jj]
+ if len(i) != len(j) {
+ return len(i) < len(j)
+ }
+ for x := range i {
+ if i[x] != j[x] {
+ return i[x] < j[x]
+ }
+ }
+ return false
+ })
+ return ips[0].String()
+}