14291: Support EBS attached storage and preemptible instances
[arvados.git] / services / crunch-run / crunchrun.go
index 36d8394c7fcd67f111095004e03d8a3f68fea74a..0576337aa13c280841187db3a7aea2dcf4af65c0 100644 (file)
@@ -92,10 +92,30 @@ type PsProcess interface {
 // ContainerRunner is the main stateful struct used for a single execution of a
 // container.
 type ContainerRunner struct {
-       Docker          ThinDockerClient
-       client          *arvados.Client
-       ArvClient       IArvadosClient
-       Kc              IKeepClient
+       Docker ThinDockerClient
+
+       // Dispatcher client is initialized with the Dispatcher token.
+       // This is a priviledged token used to manage container status
+       // and logs.
+       //
+       // We have both dispatcherClient and DispatcherArvClient
+       // because there are two different incompatible Arvados Go
+       // SDKs and we have to use both (hopefully this gets fixed in
+       // #14467)
+       dispatcherClient     *arvados.Client
+       DispatcherArvClient  IArvadosClient
+       DispatcherKeepClient IKeepClient
+
+       // Container client is initialized with the Container token
+       // This token controls the permissions of the container, and
+       // must be used for operations such as reading collections.
+       //
+       // Same comment as above applies to
+       // containerClient/ContainerArvClient.
+       containerClient     *arvados.Client
+       ContainerArvClient  IArvadosClient
+       ContainerKeepClient IKeepClient
+
        Container       arvados.Container
        ContainerConfig dockercontainer.Config
        HostConfig      dockercontainer.HostConfig
@@ -122,7 +142,7 @@ type ContainerRunner struct {
        SigChan         chan os.Signal
        ArvMountExit    chan error
        SecretMounts    map[string]arvados.Mount
-       MkArvClient     func(token string) (IArvadosClient, IKeepClient, error)
+       MkArvClient     func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
        finalState      string
        parentTemp      string
 
@@ -234,17 +254,8 @@ func (runner *ContainerRunner) LoadImage() (err error) {
 
        runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.Container.ContainerImage)
 
-       tok, err := runner.ContainerToken()
-       if err != nil {
-               return fmt.Errorf("While getting container token (LoadImage): %v", err)
-       }
-       arvClient, kc, err := runner.MkArvClient(tok)
-       if err != nil {
-               return fmt.Errorf("While creating arv client (LoadImage): %v", err)
-       }
-
        var collection arvados.Collection
-       err = arvClient.Get("collections", runner.Container.ContainerImage, nil, &collection)
+       err = runner.ContainerArvClient.Get("collections", runner.Container.ContainerImage, nil, &collection)
        if err != nil {
                return fmt.Errorf("While getting container image collection: %v", err)
        }
@@ -265,7 +276,7 @@ func (runner *ContainerRunner) LoadImage() (err error) {
                runner.CrunchLog.Print("Loading Docker image from keep")
 
                var readCloser io.ReadCloser
-               readCloser, err = kc.ManifestFileReader(manifest, img)
+               readCloser, err = runner.ContainerKeepClient.ManifestFileReader(manifest, img)
                if err != nil {
                        return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
                }
@@ -287,7 +298,7 @@ func (runner *ContainerRunner) LoadImage() (err error) {
 
        runner.ContainerConfig.Image = imageID
 
-       kc.ClearBlockCache()
+       runner.ContainerKeepClient.ClearBlockCache()
 
        return nil
 }
@@ -588,7 +599,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        if err != nil {
                                return fmt.Errorf("creating temp dir: %v", err)
                        }
-                       err = gitMount(mnt).extractTree(runner.ArvClient, tmpdir, token)
+                       err = gitMount(mnt).extractTree(runner.ContainerArvClient, tmpdir, token)
                        if err != nil {
                                return err
                        }
@@ -855,13 +866,13 @@ func (runner *ContainerRunner) logAPIResponse(label, path string, params map[str
                return false, err
        }
        w := &ArvLogWriter{
-               ArvClient:     runner.ArvClient,
+               ArvClient:     runner.DispatcherArvClient,
                UUID:          runner.Container.UUID,
                loggingStream: label,
                writeCloser:   writer,
        }
 
-       reader, err := runner.ArvClient.CallRaw("GET", path, "", "", arvadosclient.Dict(params))
+       reader, err := runner.DispatcherArvClient.CallRaw("GET", path, "", "", arvadosclient.Dict(params))
        if err != nil {
                return false, fmt.Errorf("error getting %s record: %v", label, err)
        }
@@ -911,12 +922,14 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
                        if collId == "" {
                                collId = stdinMnt.PortableDataHash
                        }
-                       err = runner.ArvClient.Get("collections", collId, nil, &stdinColl)
+                       err = runner.ContainerArvClient.Get("collections", collId, nil, &stdinColl)
                        if err != nil {
-                               return fmt.Errorf("While getting stding collection: %v", err)
+                               return fmt.Errorf("While getting stdin collection: %v", err)
                        }
 
-                       stdinRdr, err = runner.Kc.ManifestFileReader(manifest.Manifest{Text: stdinColl.ManifestText}, stdinMnt.Path)
+                       stdinRdr, err = runner.ContainerKeepClient.ManifestFileReader(
+                               manifest.Manifest{Text: stdinColl.ManifestText},
+                               stdinMnt.Path)
                        if os.IsNotExist(err) {
                                return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path)
                        } else if err != nil {
@@ -1210,7 +1223,7 @@ func (runner *ContainerRunner) updateLogs() {
                }
 
                var updated arvados.Container
-               err = runner.ArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
+               err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
                        "container": arvadosclient.Dict{"log": saved.PortableDataHash},
                }, &updated)
                if err != nil {
@@ -1228,7 +1241,7 @@ func (runner *ContainerRunner) CaptureOutput() error {
        if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
                // Output may have been set directly by the container, so
                // refresh the container record to check.
-               err := runner.ArvClient.Get("containers", runner.Container.UUID,
+               err := runner.DispatcherArvClient.Get("containers", runner.Container.UUID,
                        nil, &runner.Container)
                if err != nil {
                        return err
@@ -1241,9 +1254,9 @@ func (runner *ContainerRunner) CaptureOutput() error {
        }
 
        txt, err := (&copier{
-               client:        runner.client,
-               arvClient:     runner.ArvClient,
-               keepClient:    runner.Kc,
+               client:        runner.containerClient,
+               arvClient:     runner.ContainerArvClient,
+               keepClient:    runner.ContainerKeepClient,
                hostOutputDir: runner.HostOutputDir,
                ctrOutputDir:  runner.Container.OutputPath,
                binds:         runner.Binds,
@@ -1256,7 +1269,7 @@ func (runner *ContainerRunner) CaptureOutput() error {
        }
        if n := len(regexp.MustCompile(` [0-9a-f]+\+\S*\+R`).FindAllStringIndex(txt, -1)); n > 0 {
                runner.CrunchLog.Printf("Copying %d data blocks from remote input collections...", n)
-               fs, err := (&arvados.Collection{ManifestText: txt}).FileSystem(runner.client, runner.Kc)
+               fs, err := (&arvados.Collection{ManifestText: txt}).FileSystem(runner.containerClient, runner.ContainerKeepClient)
                if err != nil {
                        return err
                }
@@ -1266,7 +1279,7 @@ func (runner *ContainerRunner) CaptureOutput() error {
                }
        }
        var resp arvados.Collection
-       err = runner.ArvClient.Create("collections", arvadosclient.Dict{
+       err = runner.ContainerArvClient.Create("collections", arvadosclient.Dict{
                "ensure_unique_name": true,
                "collection": arvadosclient.Dict{
                        "is_trashed":    true,
@@ -1354,7 +1367,7 @@ func (runner *ContainerRunner) CommitLogs() error {
                // other further errors (such as failing to write the log to Keep!)
                // while shutting down
                runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{
-                       ArvClient:     runner.ArvClient,
+                       ArvClient:     runner.DispatcherArvClient,
                        UUID:          runner.Container.UUID,
                        loggingStream: "crunch-run",
                        writeCloser:   nil,
@@ -1406,9 +1419,9 @@ func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.C
        reqBody := arvadosclient.Dict{"collection": updates}
        if runner.logUUID == "" {
                reqBody["ensure_unique_name"] = true
-               err = runner.ArvClient.Create("collections", reqBody, &response)
+               err = runner.DispatcherArvClient.Create("collections", reqBody, &response)
        } else {
-               err = runner.ArvClient.Update("collections", runner.logUUID, reqBody, &response)
+               err = runner.DispatcherArvClient.Update("collections", runner.logUUID, reqBody, &response)
        }
        if err != nil {
                return
@@ -1424,7 +1437,7 @@ func (runner *ContainerRunner) UpdateContainerRunning() error {
        if runner.cCancelled {
                return ErrCancelled
        }
-       return runner.ArvClient.Update("containers", runner.Container.UUID,
+       return runner.DispatcherArvClient.Update("containers", runner.Container.UUID,
                arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
 }
 
@@ -1436,7 +1449,7 @@ func (runner *ContainerRunner) ContainerToken() (string, error) {
        }
 
        var auth arvados.APIClientAuthorization
-       err := runner.ArvClient.Call("GET", "containers", runner.Container.UUID, "auth", nil, &auth)
+       err := runner.DispatcherArvClient.Call("GET", "containers", runner.Container.UUID, "auth", nil, &auth)
        if err != nil {
                return "", err
        }
@@ -1460,7 +1473,7 @@ func (runner *ContainerRunner) UpdateContainerFinal() error {
                        update["output"] = *runner.OutputPDH
                }
        }
-       return runner.ArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
+       return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
 }
 
 // IsCancelled returns the value of Cancelled, with goroutine safety.
@@ -1477,7 +1490,7 @@ func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, err
                return nil, err
        }
        return &ArvLogWriter{
-               ArvClient:     runner.ArvClient,
+               ArvClient:     runner.DispatcherArvClient,
                UUID:          runner.Container.UUID,
                loggingStream: name,
                writeCloser:   writer,
@@ -1505,6 +1518,14 @@ func (runner *ContainerRunner) Run() (err error) {
                runner.CrunchLog.Close()
        }()
 
+       err = runner.fetchContainerRecord()
+       if err != nil {
+               return
+       }
+       if runner.Container.State != "Locked" {
+               return fmt.Errorf("dispatch error detected: container %q has state %q", runner.Container.UUID, runner.Container.State)
+       }
+
        defer func() {
                // checkErr prints e (unless it's nil) and sets err to
                // e (unless err is already non-nil). Thus, if err
@@ -1545,10 +1566,6 @@ func (runner *ContainerRunner) Run() (err error) {
                checkErr("UpdateContainerFinal", runner.UpdateContainerFinal())
        }()
 
-       err = runner.fetchContainerRecord()
-       if err != nil {
-               return
-       }
        runner.setupSignals()
        err = runner.startHoststat()
        if err != nil {
@@ -1623,7 +1640,7 @@ func (runner *ContainerRunner) Run() (err error) {
 // Fetch the current container record (uuid = runner.Container.UUID)
 // into runner.Container.
 func (runner *ContainerRunner) fetchContainerRecord() error {
-       reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
+       reader, err := runner.DispatcherArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
        if err != nil {
                return fmt.Errorf("error fetching container record: %v", err)
        }
@@ -1645,12 +1662,13 @@ func (runner *ContainerRunner) fetchContainerRecord() error {
                return fmt.Errorf("error getting container token: %v", err)
        }
 
-       containerClient, _, err := runner.MkArvClient(containerToken)
+       runner.ContainerArvClient, runner.ContainerKeepClient,
+               runner.containerClient, err = runner.MkArvClient(containerToken)
        if err != nil {
                return fmt.Errorf("error creating container API client: %v", err)
        }
 
-       err = containerClient.Call("GET", "containers", runner.Container.UUID, "secret_mounts", nil, &sm)
+       err = runner.ContainerArvClient.Call("GET", "containers", runner.Container.UUID, "secret_mounts", nil, &sm)
        if err != nil {
                if apierr, ok := err.(arvadosclient.APIServerError); !ok || apierr.HttpStatusCode != 404 {
                        return fmt.Errorf("error fetching secret_mounts: %v", err)
@@ -1664,30 +1682,37 @@ func (runner *ContainerRunner) fetchContainerRecord() error {
 }
 
 // NewContainerRunner creates a new container runner.
-func NewContainerRunner(client *arvados.Client, api IArvadosClient, kc IKeepClient, docker ThinDockerClient, containerUUID string) (*ContainerRunner, error) {
+func NewContainerRunner(dispatcherClient *arvados.Client,
+       dispatcherArvClient IArvadosClient,
+       dispatcherKeepClient IKeepClient,
+       docker ThinDockerClient,
+       containerUUID string) (*ContainerRunner, error) {
+
        cr := &ContainerRunner{
-               client:    client,
-               ArvClient: api,
-               Kc:        kc,
-               Docker:    docker,
+               dispatcherClient:     dispatcherClient,
+               DispatcherArvClient:  dispatcherArvClient,
+               DispatcherKeepClient: dispatcherKeepClient,
+               Docker:               docker,
        }
        cr.NewLogWriter = cr.NewArvLogWriter
        cr.RunArvMount = cr.ArvMountCmd
        cr.MkTempDir = ioutil.TempDir
-       cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, error) {
+       cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
                cl, err := arvadosclient.MakeArvadosClient()
                if err != nil {
-                       return nil, nil, err
+                       return nil, nil, nil, err
                }
                cl.ApiToken = token
                kc, err := keepclient.MakeKeepClient(cl)
                if err != nil {
-                       return nil, nil, err
+                       return nil, nil, nil, err
                }
-               return cl, kc, nil
+               c2 := arvados.NewClientFromEnv()
+               c2.AuthToken = token
+               return cl, kc, c2, nil
        }
        var err error
-       cr.LogCollection, err = (&arvados.Collection{}).FileSystem(cr.client, cr.Kc)
+       cr.LogCollection, err = (&arvados.Collection{}).FileSystem(cr.dispatcherClient, cr.DispatcherKeepClient)
        if err != nil {
                return nil, err
        }
@@ -1699,7 +1724,7 @@ func NewContainerRunner(client *arvados.Client, api IArvadosClient, kc IKeepClie
        cr.CrunchLog = NewThrottledLogger(w)
        cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
 
-       loadLogThrottleParams(api)
+       loadLogThrottleParams(dispatcherArvClient)
        go cr.updateLogs()
 
        return cr, nil
@@ -1711,6 +1736,11 @@ func main() {
        cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
        cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
        caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates")
+       detach := flag.Bool("detach", false, "Detach from parent process and run in the background")
+       stdinEnv := flag.Bool("stdin-env", false, "Load environment variables from JSON message on stdin")
+       sleep := flag.Duration("sleep", 0, "Delay before starting (testing use only)")
+       kill := flag.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
+       list := flag.Bool("list", false, "List UUIDs of existing crunch-run processes")
        enableNetwork := flag.String("container-enable-networking", "default",
                `Specify if networking should be enabled for container.  One of 'default', 'always':
        default: only enable networking if container requests it.
@@ -1722,8 +1752,37 @@ func main() {
        memprofile := flag.String("memprofile", "", "write memory profile to `file` after running container")
        getVersion := flag.Bool("version", false, "Print version information and exit.")
        flag.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
+
+       ignoreDetachFlag := false
+       if len(os.Args) > 1 && os.Args[1] == "-no-detach" {
+               // This process was invoked by a parent process, which
+               // has passed along its own arguments, including
+               // -detach, after the leading -no-detach flag.  Strip
+               // the leading -no-detach flag (it's not recognized by
+               // flag.Parse()) and ignore the -detach flag that
+               // comes later.
+               os.Args = append([]string{os.Args[0]}, os.Args[2:]...)
+               ignoreDetachFlag = true
+       }
+
        flag.Parse()
 
+       if *stdinEnv && !ignoreDetachFlag {
+               // Load env vars on stdin if asked (but not in a
+               // detached child process, in which case stdin is
+               // /dev/null).
+               loadEnv(os.Stdin)
+       }
+
+       switch {
+       case *detach && !ignoreDetachFlag:
+               os.Exit(Detach(flag.Arg(0), os.Args, os.Stdout, os.Stderr))
+       case *kill >= 0:
+               os.Exit(KillProcess(flag.Arg(0), syscall.Signal(*kill), os.Stdout, os.Stderr))
+       case *list:
+               os.Exit(ListProcesses(os.Stdout, os.Stderr))
+       }
+
        // Print version information if requested
        if *getVersion {
                fmt.Printf("crunch-run %s\n", version)
@@ -1731,6 +1790,7 @@ func main() {
        }
 
        log.Printf("crunch-run %s started", version)
+       time.Sleep(*sleep)
 
        containerId := flag.Arg(0)
 
@@ -1804,3 +1864,21 @@ func main() {
                log.Fatalf("%s: %v", containerId, runerr)
        }
 }
+
+func loadEnv(rdr io.Reader) {
+       buf, err := ioutil.ReadAll(rdr)
+       if err != nil {
+               log.Fatalf("read stdin: %s", err)
+       }
+       var env map[string]string
+       err = json.Unmarshal(buf, &env)
+       if err != nil {
+               log.Fatalf("decode stdin: %s", err)
+       }
+       for k, v := range env {
+               err = os.Setenv(k, v)
+               if err != nil {
+                       log.Fatalf("setenv(%q): %s", k, err)
+               }
+       }
+}