Compute correct arvados-cwl-runner version, refs #10194
[arvados.git] / services / crunch-run / crunchrun.go
index 0b4eb2bcbf4934f62a179b3037ad395155f7aeb4..0b59a3bb78c1b0c8919f198f5c34cc94ed00d5a9 100644 (file)
@@ -29,8 +29,9 @@ import (
 type IArvadosClient interface {
        Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
        Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
-       Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
-       Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) (err error)
+       Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
+       Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error
+       Discovery(key string) (interface{}, error)
 }
 
 // ErrCancelled is the error returned when the container is cancelled.
@@ -93,6 +94,7 @@ type ContainerRunner struct {
        SigChan        chan os.Signal
        ArvMountExit   chan error
        finalState     string
+       trashLifetime  time.Duration
 
        statLogger   io.WriteCloser
        statReporter *crunchstat.Reporter
@@ -561,6 +563,21 @@ func (runner *ContainerRunner) CaptureOutput() error {
                return nil
        }
 
+       if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
+               // Output may have been set directly by the container, so
+               // refresh the container record to check.
+               err := runner.ArvClient.Get("containers", runner.Container.UUID,
+                       nil, &runner.Container)
+               if err != nil {
+                       return err
+               }
+               if runner.Container.Output != "" {
+                       // Container output is already set.
+                       runner.OutputPDH = &runner.Container.Output
+                       return nil
+               }
+       }
+
        if runner.HostOutputDir == "" {
                return nil
        }
@@ -601,18 +618,25 @@ func (runner *ContainerRunner) CaptureOutput() error {
        err = runner.ArvClient.Create("collections",
                arvadosclient.Dict{
                        "collection": arvadosclient.Dict{
+                               "expires_at":    time.Now().Add(runner.trashLifetime).Format(time.RFC3339),
+                               "name":          "output for " + runner.Container.UUID,
                                "manifest_text": manifestText}},
                &response)
        if err != nil {
                return fmt.Errorf("While creating output collection: %v", err)
        }
-
-       runner.OutputPDH = new(string)
-       *runner.OutputPDH = response.PortableDataHash
-
+       runner.OutputPDH = &response.PortableDataHash
        return nil
 }
 
+func (runner *ContainerRunner) loadDiscoveryVars() {
+       tl, err := runner.ArvClient.Discovery("defaultTrashLifetime")
+       if err != nil {
+               log.Fatalf("getting defaultTrashLifetime from discovery document: %s", err)
+       }
+       runner.trashLifetime = time.Duration(tl.(float64)) * time.Second
+}
+
 func (runner *ContainerRunner) CleanupDirs() {
        if runner.ArvMount != nil {
                umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
@@ -665,15 +689,14 @@ func (runner *ContainerRunner) CommitLogs() error {
        err = runner.ArvClient.Create("collections",
                arvadosclient.Dict{
                        "collection": arvadosclient.Dict{
+                               "expires_at":    time.Now().Add(runner.trashLifetime).Format(time.RFC3339),
                                "name":          "logs for " + runner.Container.UUID,
                                "manifest_text": mt}},
                &response)
        if err != nil {
                return fmt.Errorf("While creating log collection: %v", err)
        }
-
        runner.LogsPDH = &response.PortableDataHash
-
        return nil
 }
 
@@ -858,6 +881,7 @@ func NewContainerRunner(api IArvadosClient,
        cr.Container.UUID = containerUUID
        cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
        cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
+       cr.loadDiscoveryVars()
        return cr
 }
 
@@ -877,7 +901,7 @@ func main() {
        api.Retries = 8
 
        var kc *keepclient.KeepClient
-       kc, err = keepclient.MakeKeepClient(&api)
+       kc, err = keepclient.MakeKeepClient(api)
        if err != nil {
                log.Fatalf("%s: %v", containerId, err)
        }