Merge branch '8333-docker-repo-tag'
authorTom Clegg <tclegg@veritasgenetics.com>
Tue, 3 Oct 2017 14:56:39 +0000 (10:56 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Tue, 3 Oct 2017 14:56:39 +0000 (10:56 -0400)
closes #8333

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

24 files changed:
build/build-dev-docker-jobs-image.sh
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvdocker.py
sdk/cwl/arvados_cwl/arvjob.py
sdk/cwl/arvados_cwl/arvworkflow.py
sdk/cwl/arvados_cwl/crunch_script.py
sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/setup.py
sdk/cwl/tests/12213-keepref-expr.cwl [new file with mode: 0644]
sdk/cwl/tests/12213-keepref-job.yml [new file with mode: 0644]
sdk/cwl/tests/12213-keepref-tool.cwl [new file with mode: 0644]
sdk/cwl/tests/12213-keepref-wf.cwl [new file with mode: 0644]
sdk/cwl/tests/arvados-tests.sh
sdk/cwl/tests/arvados-tests.yml
sdk/cwl/tests/runner.sh [deleted file]
sdk/dev-jobs.dockerfile
sdk/go/crunchrunner/upload.go
sdk/go/crunchrunner/upload_test.go
services/crunch-run/crunchrun.go
services/crunch-run/crunchrun_test.go
services/crunch-run/upload.go
services/crunch-run/upload_test.go
services/nodemanager/arvnodeman/daemon.py
services/nodemanager/tests/test_daemon.py

index 46e525bfa7a3b2f2ed70c411b1b3e4fc25bade1a..639096c9e83712f0ccf47fc111f5d500add34f0a 100755 (executable)
@@ -15,6 +15,7 @@ Syntax:
 
 WORKSPACE=path         Path to the Arvados source tree to build packages from
 CWLTOOL=path           (optional) Path to cwltool git repository.
+SALAD=path             (optional) Path to schema_salad git repository.
 
 EOF
 
@@ -42,6 +43,14 @@ sdk=$(cd sdk/python/dist && ls -t arvados-python-client-*.tar.gz | head -n1)
 (cd sdk/cwl && python setup.py sdist)
 runner=$(cd sdk/cwl/dist && ls -t arvados-cwl-runner-*.tar.gz | head -n1)
 
+rm -rf sdk/cwl/salad_dist
+mkdir -p sdk/cwl/salad_dist
+if [[ -n "$SALAD" ]] ; then
+    (cd "$SALAD" && python setup.py sdist)
+    salad=$(cd "$SALAD/dist" && ls -t schema-salad-*.tar.gz | head -n1)
+    cp "$SALAD/dist/$salad" $WORKSPACE/sdk/cwl/salad_dist
+fi
+
 rm -rf sdk/cwl/cwltool_dist
 mkdir -p sdk/cwl/cwltool_dist
 if [[ -n "$CWLTOOL" ]] ; then
@@ -61,6 +70,6 @@ else
     gittag=$(git log --first-parent --max-count=1 --format=format:%H sdk/cwl)
 fi
 
-docker build --build-arg sdk=$sdk --build-arg runner=$runner --build-arg cwltool=$cwltool -f "$WORKSPACE/sdk/dev-jobs.dockerfile" -t arvados/jobs:$gittag "$WORKSPACE/sdk"
+docker build --build-arg sdk=$sdk --build-arg runner=$runner --build-arg salad=$salad --build-arg cwltool=$cwltool -f "$WORKSPACE/sdk/dev-jobs.dockerfile" -t arvados/jobs:$gittag "$WORKSPACE/sdk"
 echo arv-keepdocker arvados/jobs $gittag
 arv-keepdocker arvados/jobs $gittag
index 7f4b5c7549314b0d0dbd3cfbf52b1023ad7887fd..5756789cb1ecd5068780ca0ae036069dbdc9af5b 100644 (file)
@@ -237,7 +237,7 @@ class ArvCwlRunner(object):
                 self.check_features(v)
         elif isinstance(obj, list):
             for i,v in enumerate(obj):
-                with SourceLine(obj, i, UnsupportedRequirement):
+                with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
                     self.check_features(v)
 
     def make_output_collection(self, name, tagsString, outputObj):
@@ -281,7 +281,7 @@ class ArvCwlRunner(object):
 
         def rewrite(fileobj):
             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
-            for k in ("basename", "listing", "contents"):
+            for k in ("basename", "listing", "contents", "nameext", "nameroot", "dirname"):
                 if k in fileobj:
                     del fileobj[k]
 
index 0513ca02ec68ca2d0fe5f58ced9ab1c98c124844..e59903f2dc33b3d69406ee1fbc92751bd78e9623 100644 (file)
@@ -33,7 +33,7 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
         if dockerRequirement["dockerImageId"] in cached_lookups:
             return dockerRequirement["dockerImageId"]
 
-    with SourceLine(dockerRequirement, "dockerImageId", WorkflowException):
+    with SourceLine(dockerRequirement, "dockerImageId", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
         sp = dockerRequirement["dockerImageId"].split(":")
         image_name = sp[0]
         image_tag = sp[1] if len(sp) > 1 else "latest"
index 737c9580fb8ceef04576168a15a53d991f9cf496..25f64ea23065f887517c2ddba5ac728f18e856b6 100644 (file)
@@ -299,6 +299,9 @@ class RunnerJob(Runner):
         if self.on_error:
             self.job_order["arv:on_error"] = self.on_error
 
+        if kwargs.get("debug"):
+            self.job_order["arv:debug"] = True
+
         return {
             "script": "cwl-runner",
             "script_version": "master",
index 20cb4677c5935b5011a66b0a958518ce42a285c3..fdf506effa68aeb8ab5b5267fef8f2ee937d51a9 100644 (file)
@@ -83,7 +83,7 @@ class ArvadosWorkflow(Workflow):
         kwargs["work_api"] = self.work_api
         req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
         if req:
-            with SourceLine(self.tool, None, WorkflowException):
+            with SourceLine(self.tool, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
                 if "id" not in self.tool:
                     raise WorkflowException("%s object must have 'id'" % (self.tool["class"]))
             document_loader, workflowobj, uri = (self.doc_loader, self.doc_loader.fetch(self.tool["id"]), self.tool["id"])
@@ -114,10 +114,10 @@ class ArvadosWorkflow(Workflow):
 
                 def keepmount(obj):
                     remove_redundant_fields(obj)
-                    with SourceLine(obj, None, WorkflowException):
+                    with SourceLine(obj, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
                         if "location" not in obj:
                             raise WorkflowException("%s object is missing required 'location' field: %s" % (obj["class"], obj))
-                    with SourceLine(obj, "location", WorkflowException):
+                    with SourceLine(obj, "location", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
                         if obj["location"].startswith("keep:"):
                             obj["location"] = "/keep/" + obj["location"][5:]
                             if "listing" in obj:
index 7fbbd29d50d72f385b5e1ea95f2b093405518baf..12d74a05c6aa855df8086d216f1e706df82ed680 100644 (file)
@@ -75,6 +75,8 @@ def run():
         output_tags = None
         enable_reuse = True
         on_error = "continue"
+        debug = False
+
         if "arv:output_name" in job_order_object:
             output_name = job_order_object["arv:output_name"]
             del job_order_object["arv:output_name"]
@@ -91,6 +93,10 @@ def run():
             on_error = job_order_object["arv:on_error"]
             del job_order_object["arv:on_error"]
 
+        if "arv:debug" in job_order_object:
+            debug = job_order_object["arv:debug"]
+            del job_order_object["arv:debug"]
+
         runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()),
                                           output_name=output_name, output_tags=output_tags)
 
@@ -103,12 +109,17 @@ def run():
                                                   fs_access=make_fs_access(""),
                                                   num_retries=runner.num_retries))
 
+        if debug:
+            logger.setLevel(logging.DEBUG)
+            logging.getLogger('arvados').setLevel(logging.DEBUG)
+            logging.getLogger("cwltool").setLevel(logging.DEBUG)
+
         args = argparse.Namespace()
         args.project_uuid = arvados.current_job()["owner_uuid"]
         args.enable_reuse = enable_reuse
         args.on_error = on_error
         args.submit = False
-        args.debug = False
+        args.debug = debug
         args.quiet = False
         args.ignore_docker_for_reuse = False
         args.basedir = os.getcwd()
index c8ae77a8b69013718c51b6b579774b9d95d985ef..914ccaa5a1049868cfe7f840f6bf7d56e957218c 100644 (file)
@@ -58,6 +58,8 @@ class ArvPathMapper(PathMapper):
         if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
             self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
 
+        debug = logger.isEnabledFor(logging.DEBUG)
+
         if src not in self._pathmap:
             if src.startswith("file:"):
                 # Local FS ref, may need to be uploaded or may be on keep
@@ -67,7 +69,7 @@ class ArvPathMapper(PathMapper):
                                                    fnPattern="keep:%s/%s",
                                                    dirPattern="keep:%s/%s",
                                                    raiseOSError=True)
-                with SourceLine(srcobj, "location", WorkflowException):
+                with SourceLine(srcobj, "location", WorkflowException, debug):
                     if isinstance(st, arvados.commands.run.UploadFile):
                         uploadfiles.add((src, ab, st))
                     elif isinstance(st, arvados.commands.run.ArvFile):
@@ -82,25 +84,26 @@ class ArvPathMapper(PathMapper):
             else:
                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
 
-        with SourceLine(srcobj, "secondaryFiles", WorkflowException):
+        with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
             for l in srcobj.get("secondaryFiles", []):
                 self.visit(l, uploadfiles)
-        with SourceLine(srcobj, "listing", WorkflowException):
+        with SourceLine(srcobj, "listing", WorkflowException, debug):
             for l in srcobj.get("listing", []):
                 self.visit(l, uploadfiles)
 
-    def addentry(self, obj, c, path, subdirs):
+    def addentry(self, obj, c, path, remap):
         if obj["location"] in self._pathmap:
             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
             if srcpath == "":
                 srcpath = "."
             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
+            remap.append((obj["location"], path + "/" + obj["basename"]))
             for l in obj.get("secondaryFiles", []):
-                self.addentry(l, c, path, subdirs)
+                self.addentry(l, c, path, remap)
         elif obj["class"] == "Directory":
             for l in obj.get("listing", []):
-                self.addentry(l, c, path + "/" + obj["basename"], subdirs)
-            subdirs.append((obj["location"], path + "/" + obj["basename"]))
+                self.addentry(l, c, path + "/" + obj["basename"], remap)
+            remap.append((obj["location"], path + "/" + obj["basename"]))
         elif obj["location"].startswith("_:") and "contents" in obj:
             with c.open(path + "/" + obj["basename"], "w") as f:
                 f.write(obj["contents"].encode("utf-8"))
@@ -152,13 +155,13 @@ class ArvPathMapper(PathMapper):
             self._pathmap[loc] = MapperEnt(urllib.quote(fn, "/:+@"), self.collection_pattern % fn[5:], cls, True)
 
         for srcobj in referenced_files:
-            subdirs = []
+            remap = []
             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
                                                   keep_client=self.arvrunner.keep_client,
                                                   num_retries=self.arvrunner.num_retries)
                 for l in srcobj.get("listing", []):
-                    self.addentry(l, c, ".", subdirs)
+                    self.addentry(l, c, ".", remap)
 
                 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
                 if not check["items"]:
@@ -172,7 +175,7 @@ class ArvPathMapper(PathMapper):
                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
                                                   keep_client=self.arvrunner.keep_client,
                                                   num_retries=self.arvrunner.num_retries                                                  )
-                self.addentry(srcobj, c, ".", subdirs)
+                self.addentry(srcobj, c, ".", remap)
 
                 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
                 if not check["items"]:
@@ -185,10 +188,13 @@ class ArvPathMapper(PathMapper):
                     ab = self.collection_pattern % c.portable_data_hash()
                     self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
 
-            if subdirs:
-                for loc, sub in subdirs:
-                    # subdirs will all start with "./", strip it off
-                    ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
+            if remap:
+                for loc, sub in remap:
+                    # subdirs start with "./", strip it off
+                    if sub.startswith("./"):
+                        ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
+                    else:
+                        ab = self.file_pattern % (c.portable_data_hash(), sub)
                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
                                                    ab, "Directory", True)
 
index 50f9cf4220d8064b98099c83b93462592d11ba5e..762c6fda3996e69091b9a0acb320d0abccc6d21f 100644 (file)
@@ -51,8 +51,8 @@ setup(name='arvados-cwl-runner',
       # Note that arvados/build/run-build-packages.sh looks at this
       # file to determine what version of cwltool and schema-salad to build.
       install_requires=[
-          'cwltool==1.0.20170828135420',
-          'schema-salad==2.6.20170712194300',
+          'cwltool==1.0.20170928192020',
+          'schema-salad==2.6.20170927145003',
           'typing==3.5.3.0',
           'ruamel.yaml==0.13.7',
           'arvados-python-client>=0.1.20170526013812',
diff --git a/sdk/cwl/tests/12213-keepref-expr.cwl b/sdk/cwl/tests/12213-keepref-expr.cwl
new file mode 100644 (file)
index 0000000..ddc7ff9
--- /dev/null
@@ -0,0 +1,31 @@
+cwlVersion: v1.0
+class: ExpressionTool
+requirements:
+  InlineJavascriptRequirement: {}
+inputs:
+  dir: Directory
+outputs:
+  out: Directory[]
+expression: |
+  ${
+    var samples = {};
+    var pattern = /^(.+)(_S[0-9]{1,3}_)(.+)$/;
+    for (var i = 0; i < inputs.dir.listing.length; i++) {
+      var file = inputs.dir.listing[i];
+      var groups = file.basename.match(pattern);
+      if (groups) {
+        var sampleid = groups[1];
+        if (!samples[sampleid]) {
+          samples[sampleid] = [];
+        }
+        samples[sampleid].push(file);
+      }
+    }
+    var dirs = [];
+    Object.keys(samples).sort().forEach(function(sampleid, _) {
+      dirs.push({"class": "Directory",
+                 "basename": sampleid,
+                 "listing": samples[sampleid]});
+    });
+    return {"out": dirs};
+  }
\ No newline at end of file
diff --git a/sdk/cwl/tests/12213-keepref-job.yml b/sdk/cwl/tests/12213-keepref-job.yml
new file mode 100644 (file)
index 0000000..5c5571a
--- /dev/null
@@ -0,0 +1,3 @@
+dir:
+  class: Directory
+  location: samples
\ No newline at end of file
diff --git a/sdk/cwl/tests/12213-keepref-tool.cwl b/sdk/cwl/tests/12213-keepref-tool.cwl
new file mode 100644 (file)
index 0000000..8c28cc2
--- /dev/null
@@ -0,0 +1,13 @@
+cwlVersion: v1.0
+class: CommandLineTool
+requirements:
+  InlineJavascriptRequirement: {}
+inputs:
+  fastqsdir: Directory
+outputs:
+  out: stdout
+baseCommand: [zcat]
+stdout: $(inputs.fastqsdir.listing[0].nameroot).txt
+arguments:
+  - $(inputs.fastqsdir.listing[0].path)
+  - $(inputs.fastqsdir.listing[1].path)
diff --git a/sdk/cwl/tests/12213-keepref-wf.cwl b/sdk/cwl/tests/12213-keepref-wf.cwl
new file mode 100644 (file)
index 0000000..3f1e890
--- /dev/null
@@ -0,0 +1,22 @@
+cwlVersion: v1.0
+class: Workflow
+requirements:
+  ScatterFeatureRequirement: {}
+inputs:
+  dir: Directory
+outputs:
+  out:
+    type: File[]
+    outputSource: tool/out
+steps:
+  ex:
+    in:
+      dir: dir
+    out: [out]
+    run: 12213-keepref-expr.cwl
+  tool:
+    in:
+      fastqsdir: ex/out
+    out: [out]
+    scatter: fastqsdir
+    run: 12213-keepref-tool.cwl
\ No newline at end of file
index f069251a2aca4174221483f7486410affc55eec4..f7cebd4249acf5e5af7a7033100896a760720fbb 100755 (executable)
@@ -6,4 +6,4 @@
 if ! arv-get d7514270f356df848477718d58308cc4+94 > /dev/null ; then
     arv-put --portable-data-hash testdir
 fi
-exec cwltest --test arvados-tests.yml --tool $PWD/runner.sh $@
+exec cwltest --test arvados-tests.yml --tool arvados-cwl-runner $@ -- --disable-reuse --compute-checksum
index d3bdefcd03f3d95fe585ae555ae8703e5588f312..2f1429efba2b63f7e8c4bdbea555f17c56d0a961 100644 (file)
   output: {}
   tool: noreuse.cwl
   doc: "Test arv:ReuseRequirement"
+
+- job: 12213-keepref-job.yml
+  output: {
+    "out": [
+        {
+            "checksum": "sha1$1c78028c0d69163391eef89316b44a57bde3fead",
+            "location": "sample1_S01_R1_001.fastq.txt",
+            "class": "File",
+            "size": 32
+        },
+        {
+            "checksum": "sha1$83483b9c65d99967aecc794c14f9f4743314d186",
+            "location": "sample2_S01_R3_001.fastq.txt",
+            "class": "File",
+            "size": 32
+        }
+    ]
+  }
+  tool: 12213-keepref-wf.cwl
+  doc: "Test manipulating keep references with expression tools"
diff --git a/sdk/cwl/tests/runner.sh b/sdk/cwl/tests/runner.sh
deleted file mode 100755 (executable)
index 20bbb26..0000000
+++ /dev/null
@@ -1,6 +0,0 @@
-#!/bin/sh
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-exec arvados-cwl-runner --disable-reuse --compute-checksum "$@"
index cc08ad7c5ccd9e63fedc46aacec61b77c71a1526..f9f1e967b94f7e589a60888261eae4a7916a88c1 100644 (file)
@@ -24,13 +24,16 @@ RUN pip install -U setuptools
 
 ARG sdk
 ARG runner
+ARG salad
 ARG cwltool
 
 ADD python/dist/$sdk /tmp/
+ADD cwl/salad_dist/$salad /tmp/
 ADD cwl/cwltool_dist/$cwltool /tmp/
 ADD cwl/dist/$runner /tmp/
 
 RUN cd /tmp/arvados-python-client-* && python setup.py install
+RUN if test -d /tmp/schema-salad-* ; then cd /tmp/schema-salad-* && python setup.py install ; fi
 RUN if test -d /tmp/cwltool-* ; then cd /tmp/cwltool-* && python setup.py install ; fi
 RUN cd /tmp/arvados-cwl-runner-* && python setup.py install
 
index fd24908ad6a7e9e49721c0ed33980ce096c7e34a..2848d1087c52684fe9ca5a79ec1f6f5c87b28b95 100644 (file)
@@ -9,14 +9,15 @@ import (
        "crypto/md5"
        "errors"
        "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "git.curoverse.com/arvados.git/sdk/go/manifest"
        "io"
        "log"
        "os"
        "path/filepath"
        "sort"
        "strings"
+
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "git.curoverse.com/arvados.git/sdk/go/manifest"
 )
 
 type Block struct {
@@ -90,7 +91,26 @@ type ManifestWriter struct {
 }
 
 func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) error {
-       if info.IsDir() {
+       if err != nil {
+               return err
+       }
+
+       targetPath, targetInfo := path, info
+       if info.Mode()&os.ModeSymlink != 0 {
+               // Update targetpath/info to reflect the symlink
+               // target, not the symlink itself
+               targetPath, err = filepath.EvalSymlinks(path)
+               if err != nil {
+                       return err
+               }
+               targetInfo, err = os.Stat(targetPath)
+               if err != nil {
+                       return fmt.Errorf("stat symlink %q target %q: %s", path, targetPath, err)
+               }
+       }
+
+       if targetInfo.Mode()&os.ModeType != 0 {
+               // Skip directories, pipes, other non-regular files
                return nil
        }
 
index ceb89dc26d767721ea65887a6789fcf8bb0d8983..5bc749258dea922bcc89f07d00b454ba89f05f72 100644 (file)
@@ -8,9 +8,11 @@ import (
        "crypto/md5"
        "errors"
        "fmt"
-       . "gopkg.in/check.v1"
        "io/ioutil"
        "os"
+       "syscall"
+
+       . "gopkg.in/check.v1"
 )
 
 type UploadTestSuite struct{}
@@ -38,18 +40,24 @@ func (s *TestSuite) TestSimpleUpload(c *C) {
        c.Check(str, Equals, ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt\n")
 }
 
-func (s *TestSuite) TestSimpleUploadTwofiles(c *C) {
+func (s *TestSuite) TestSimpleUploadThreeFiles(c *C) {
        tmpdir, _ := ioutil.TempDir("", "")
        defer func() {
                os.RemoveAll(tmpdir)
        }()
 
-       ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
-       ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
+       for _, err := range []error{
+               ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600),
+               ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600),
+               os.Symlink("./file2.txt", tmpdir+"/file3.txt"),
+               syscall.Mkfifo(tmpdir+"/ignore.fifo", 0600),
+       } {
+               c.Assert(err, IsNil)
+       }
 
        str, err := WriteTree(KeepTestClient{}, tmpdir)
        c.Check(err, IsNil)
-       c.Check(str, Equals, ". 3858f62230ac3c915f300c664312c63f+6 0:3:file1.txt 3:3:file2.txt\n")
+       c.Check(str, Equals, ". aa65a413921163458c52fea478d5d3ee+9 0:3:file1.txt 3:3:file2.txt 6:3:file3.txt\n")
 }
 
 func (s *TestSuite) TestSimpleUploadSubdir(c *C) {
index 3fdd37402fada76813dc22e69251f6144da6e358..8af12100dbe91d0cab45dca171d65b8b915f9229 100644 (file)
@@ -181,9 +181,9 @@ type ContainerRunner struct {
        networkMode   string // passed through to HostConfig.NetworkMode
 }
 
-// SetupSignals sets up signal handling to gracefully terminate the underlying
+// setupSignals sets up signal handling to gracefully terminate the underlying
 // Docker container and update state when receiving a TERM, INT or QUIT signal.
-func (runner *ContainerRunner) SetupSignals() {
+func (runner *ContainerRunner) setupSignals() {
        runner.SigChan = make(chan os.Signal, 1)
        signal.Notify(runner.SigChan, syscall.SIGTERM)
        signal.Notify(runner.SigChan, syscall.SIGINT)
@@ -192,7 +192,6 @@ func (runner *ContainerRunner) SetupSignals() {
        go func(sig chan os.Signal) {
                <-sig
                runner.stop()
-               signal.Stop(sig)
        }(runner.SigChan)
 }
 
@@ -213,6 +212,13 @@ func (runner *ContainerRunner) stop() {
        }
 }
 
+func (runner *ContainerRunner) teardown() {
+       if runner.SigChan != nil {
+               signal.Stop(runner.SigChan)
+               close(runner.SigChan)
+       }
+}
+
 // LoadImage determines the docker image id from the container record and
 // checks if it is available in the local Docker image store.  If not, it loads
 // the image from Keep.
@@ -1303,6 +1309,8 @@ func (runner *ContainerRunner) Run() (err error) {
                // a new one in case we needed to log anything while
                // finalizing.
                runner.CrunchLog.Close()
+
+               runner.teardown()
        }()
 
        err = runner.fetchContainerRecord()
@@ -1311,7 +1319,7 @@ func (runner *ContainerRunner) Run() (err error) {
        }
 
        // setup signal handling
-       runner.SetupSignals()
+       runner.setupSignals()
 
        // check for and/or load image
        err = runner.LoadImage()
index 935c61a1100f6400af82b39e57b0a28c1b1ae41b..474ba5d7db7b85de773443736f39dd6914bc94d2 100644 (file)
@@ -99,7 +99,7 @@ func NewTestDockerClient(exitCode int) *TestDockerClient {
        t := &TestDockerClient{}
        t.logReader, t.logWriter = io.Pipe()
        t.finish = exitCode
-       t.stop = make(chan bool)
+       t.stop = make(chan bool, 1)
        t.cwd = "/"
        return t
 }
index fa74eb05572ce25623b197419999310b9f402401..bb2776a4266342568a3eb05c755ed64d700659ec 100644 (file)
@@ -18,14 +18,15 @@ import (
        "crypto/md5"
        "errors"
        "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "git.curoverse.com/arvados.git/sdk/go/manifest"
        "io"
        "log"
        "os"
        "path/filepath"
        "strings"
        "sync"
+
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "git.curoverse.com/arvados.git/sdk/go/manifest"
 )
 
 // Block is a data block in a manifest stream
@@ -265,8 +266,26 @@ type WalkUpload struct {
 // WalkFunc walks a directory tree, uploads each file found and adds it to the
 // CollectionWriter.
 func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
+       if err != nil {
+               return err
+       }
+
+       targetPath, targetInfo := path, info
+       if info.Mode()&os.ModeSymlink != 0 {
+               // Update targetpath/info to reflect the symlink
+               // target, not the symlink itself
+               targetPath, err = filepath.EvalSymlinks(path)
+               if err != nil {
+                       return err
+               }
+               targetInfo, err = os.Stat(targetPath)
+               if err != nil {
+                       return fmt.Errorf("stat symlink %q target %q: %s", path, targetPath, err)
+               }
+       }
 
-       if info.IsDir() {
+       if targetInfo.Mode()&os.ModeType != 0 {
+               // Skip directories, pipes, other non-regular files
                return nil
        }
 
index 86ad1b32ae074b3acfd577e164c174dd099099cd..96ea2b119094f37b36c5711280abe2d975171d4e 100644 (file)
@@ -5,11 +5,13 @@
 package main
 
 import (
-       . "gopkg.in/check.v1"
        "io/ioutil"
        "log"
        "os"
        "sync"
+       "syscall"
+
+       . "gopkg.in/check.v1"
 )
 
 type UploadTestSuite struct{}
@@ -31,20 +33,26 @@ func (s *TestSuite) TestSimpleUpload(c *C) {
        c.Check(str, Equals, ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt\n")
 }
 
-func (s *TestSuite) TestSimpleUploadTwofiles(c *C) {
+func (s *TestSuite) TestSimpleUploadThreefiles(c *C) {
        tmpdir, _ := ioutil.TempDir("", "")
        defer func() {
                os.RemoveAll(tmpdir)
        }()
 
-       ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
-       ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
+       for _, err := range []error{
+               ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600),
+               ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600),
+               os.Symlink("./file2.txt", tmpdir+"/file3.txt"),
+               syscall.Mkfifo(tmpdir+"/ignore.fifo", 0600),
+       } {
+               c.Assert(err, IsNil)
+       }
 
        cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
        str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
 
        c.Check(err, IsNil)
-       c.Check(str, Equals, ". 3858f62230ac3c915f300c664312c63f+6 0:3:file1.txt 3:3:file2.txt\n")
+       c.Check(str, Equals, ". aa65a413921163458c52fea478d5d3ee+9 0:3:file1.txt 3:3:file2.txt 6:3:file3.txt\n")
 }
 
 func (s *TestSuite) TestSimpleUploadSubdir(c *C) {
index e476e5e3e21c07205144deb6226e54c9762f8c58..ca3029d9e1bc3c376b119cca367b3767f3a8bb45 100644 (file)
@@ -78,7 +78,10 @@ class _ArvadosNodeTracker(_BaseNodeTracker):
     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
 
     def find_stale_node(self, stale_time):
-        for record in self.nodes.itervalues():
+        # Try to select a stale node record that have an assigned slot first
+        for record in sorted(self.nodes.itervalues(),
+                             key=lambda r: r.arvados_node['slot_number'],
+                             reverse=True):
             node = record.arvados_node
             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
                                           stale_time) and
@@ -498,8 +501,19 @@ class NodeManagerDaemonActor(actor_class):
         except pykka.ActorDeadError:
             return
         cloud_node_id = cloud_node.id
-        record = self.cloud_nodes[cloud_node_id]
-        shutdown_actor.stop()
+
+        try:
+            shutdown_actor.stop()
+        except pykka.ActorDeadError:
+            pass
+
+        try:
+            record = self.cloud_nodes[cloud_node_id]
+        except KeyError:
+            # Cloud node was already removed from the cloud node list
+            # supposedly while the destroy_node call was finishing its
+            # job.
+            return
         record.shutdown_actor = None
 
         if not success:
index 1efa1ffeb35199c251d13e217f2cb37c146c4622..ebe7408e705b02e2d55b2d757ef5367953f23242 100644 (file)
@@ -77,8 +77,9 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
 
         self.arv_factory = mock.MagicMock(name='arvados_mock')
         api_client = mock.MagicMock(name='api_client')
-        api_client.nodes().create().execute.side_effect = [testutil.arvados_node_mock(1),
-                                                           testutil.arvados_node_mock(2)]
+        api_client.nodes().create().execute.side_effect = \
+            [testutil.arvados_node_mock(1),
+             testutil.arvados_node_mock(2)]
         self.arv_factory.return_value = api_client
 
         self.cloud_factory = mock.MagicMock(name='cloud_mock')
@@ -192,6 +193,39 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                          want_sizes=[testutil.MockSize(1)])
         self.busywait(lambda: not self.node_setup.start.called)
 
+    def test_select_stale_node_records_with_slot_numbers_first(self):
+        """
+        Stale node records with slot_number assigned can exist when
+        clean_arvados_node() isn't executed after a node shutdown, for
+        various reasons.
+        NodeManagerDaemonActor should use these stale node records first, so
+        that they don't accumulate unused, reducing the slots available.
+        """
+        size = testutil.MockSize(1)
+        a_long_time_ago = '1970-01-01T01:02:03.04050607Z'
+        arvados_nodes = []
+        for n in range(9):
+            # Add several stale node records without slot_number assigned
+            arvados_nodes.append(
+                testutil.arvados_node_mock(
+                    n+1,
+                    slot_number=None,
+                    modified_at=a_long_time_ago))
+        # Add one record with stale_node assigned, it should be the
+        # first one selected
+        arv_node = testutil.arvados_node_mock(
+            123,
+            modified_at=a_long_time_ago)
+        arvados_nodes.append(arv_node)
+        cloud_node = testutil.cloud_node_mock(125, size=size)
+        self.make_daemon(cloud_nodes=[cloud_node],
+                         arvados_nodes=arvados_nodes)
+        arvados_nodes_tracker = self.daemon.arvados_nodes.get()
+        # Here, find_stale_node() should return the node record with
+        # the slot_number assigned.
+        self.assertEqual(arv_node,
+                         arvados_nodes_tracker.find_stale_node(3601))
+
     def test_dont_count_missing_as_busy(self):
         size = testutil.MockSize(1)
         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, size=size),
@@ -400,6 +434,27 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.assertTrue(self.node_setup.start.called,
                         "second node not started after booted node stopped")
 
+    def test_node_disappearing_during_shutdown(self):
+        cloud_node = testutil.cloud_node_mock(6)
+        setup = self.start_node_boot(cloud_node, id_num=6)
+        self.daemon.node_setup_finished(setup).get(self.TIMEOUT)
+        self.assertEqual(1, self.alive_monitor_count())
+        monitor = self.monitor_list()[0].proxy()
+        self.daemon.update_server_wishlist([])
+        self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
+        self.assertShutdownCancellable(True)
+        shutdown = self.node_shutdown.start().proxy()
+        shutdown.cloud_node.get.return_value = cloud_node
+        # Simulate a successful but slow node destroy call: the cloud node
+        # list gets updated before the ShutdownActor finishes.
+        record = self.daemon.cloud_nodes.get().nodes.values()[0]
+        self.assertTrue(record.shutdown_actor is not None)
+        self.daemon.cloud_nodes.get().nodes.clear()
+        self.daemon.node_finished_shutdown(shutdown).get(self.TIMEOUT)
+        self.assertTrue(
+            record.shutdown_actor is not None,
+            "test was ineffective -- failed to simulate the race condition")
+
     def test_booted_node_shut_down_when_never_listed(self):
         setup = self.start_node_boot()
         self.cloud_factory().node_start_time.return_value = time.time() - 3601