11308: Merge branch 'master' into 11308-python3
authorTom Clegg <tom@curoverse.com>
Wed, 12 Apr 2017 19:01:07 +0000 (15:01 -0400)
committerTom Clegg <tom@curoverse.com>
Wed, 12 Apr 2017 19:01:07 +0000 (15:01 -0400)
Conflicts:
sdk/python/arvados/__init__.py
sdk/python/arvados/api.py
sdk/python/arvados/commands/ls.py
sdk/python/arvados/keep.py
sdk/python/setup.py
sdk/python/tests/test_arv_ls.py

59 files changed:
build/build.list
build/rails-package-scripts/postinst.sh
build/run-build-packages-sso.sh
build/run-tests.sh
doc/_includes/_navbar_top.liquid
doc/api/methods/container_requests.html.textile.liquid
doc/install/install-nodemanager.html.textile.liquid
sdk/cli/test/test_arv-keep-get.rb
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/fsaccess.py
sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/test_with_arvbox.sh
sdk/cwl/tests/arvados-tests.sh
sdk/cwl/tests/arvados-tests.yml
sdk/cwl/tests/cat.cwl [new file with mode: 0644]
sdk/cwl/tests/dir-job2.yml [new file with mode: 0644]
sdk/cwl/tests/keep-dir-test-input2.cwl [new file with mode: 0644]
sdk/cwl/tests/keep-dir-test-input3.cwl [new file with mode: 0644]
sdk/cwl/tests/octo.yml [new file with mode: 0644]
sdk/cwl/tests/octothorpe/item #1.txt [new file with mode: 0644]
sdk/go/keepclient/support.go
sdk/python/arvados/api.py
sdk/python/arvados/commands/get.py [new file with mode: 0755]
sdk/python/arvados/commands/ls.py
sdk/python/arvados/keep.py
sdk/python/bin/arv-get
sdk/python/setup.py
sdk/python/tests/test_arv_get.py [new file with mode: 0644]
sdk/python/tests/test_arv_ls.py
services/api/app/models/arvados_model.rb
services/api/app/models/collection.rb
services/api/app/models/container_request.rb
services/api/app/models/node.rb
services/api/db/migrate/20170330012505_add_output_ttl_to_container_requests.rb [new file with mode: 0644]
services/api/db/structure.sql
services/api/test/unit/collection_test.rb
services/api/test/unit/container_request_test.rb
services/api/test/unit/node_test.rb
services/crunch-run/crunchrun.go
services/crunch-run/crunchrun_test.go
services/fuse/arvados_fuse/command.py
services/fuse/arvados_fuse/unmount.py
services/fuse/tests/test_unmount.py
services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
services/nodemanager/arvnodeman/computenode/driver/__init__.py
services/nodemanager/arvnodeman/computenode/driver/gce.py
services/nodemanager/arvnodeman/config.py
services/nodemanager/arvnodeman/daemon.py
services/nodemanager/arvnodeman/launcher.py
services/nodemanager/arvnodeman/status.py [new file with mode: 0644]
services/nodemanager/doc/azure.example.cfg
services/nodemanager/doc/ec2.example.cfg
services/nodemanager/doc/gce.example.cfg
services/nodemanager/doc/local.example.cfg
services/nodemanager/setup.py
services/nodemanager/tests/test_computenode_driver_gce.py
services/nodemanager/tests/test_daemon.py
services/nodemanager/tests/test_failure.py
services/nodemanager/tests/test_status.py [new file with mode: 0644]

index 98f6a3b773852061659a58c255907bf7cd413c20..bb662dbbd52ad97aecf092af0aa8cbf973c69859 100644 (file)
@@ -1,7 +1,7 @@
 #distribution(s)|name|version|iteration|type|architecture|extra fpm arguments
 debian8,ubuntu1204,centos7|python-gflags|2.0|2|python|all
-debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|google-api-python-client|1.4.2|2|python|all
-debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|oauth2client|1.5.2|2|python|all
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|google-api-python-client|1.6.2|2|python|all
+debian8,ubuntu1204,ubuntu1404,centos7|oauth2client|1.5.2|2|python|all
 debian8,ubuntu1204,ubuntu1404,centos7|pyasn1|0.1.7|2|python|all
 debian8,ubuntu1204,ubuntu1404,centos7|pyasn1-modules|0.0.5|2|python|all
 debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|rsa|3.4.2|2|python|all
@@ -14,7 +14,7 @@ debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|ciso8601|1.0.3|3|python|amd64
 debian8,ubuntu1204,centos7|pycrypto|2.6.1|3|python|amd64
 debian8,ubuntu1204,ubuntu1404,ubuntu1604|backports.ssl_match_hostname|3.5.0.1|2|python|all
 debian8,ubuntu1204,ubuntu1404,centos7|llfuse|0.41.1|3|python|amd64
-debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|pycurl|7.19.5.3|3|python|amd64
+debian8,ubuntu1204,ubuntu1404,centos7|pycurl|7.19.5.3|3|python|amd64
 debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|pyyaml|3.12|2|python|amd64
 debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|rdflib|4.2.2|2|python|all
 debian8,ubuntu1204,ubuntu1404,centos7|shellescape|3.4.1|2|python|all
@@ -40,3 +40,5 @@ all|ruamel.yaml|0.13.7|2|python|amd64|--python-setup-py-arguments --single-versi
 all|cwltest|1.0.20160907111242|3|python|all|--depends 'python-futures >= 3.0.5'
 all|rdflib-jsonld|0.4.0|2|python|all
 all|futures|3.0.5|2|python|all
+all|future|0.16.0|2|python|all
+all|future|0.16.0|2|python3|all
index e019170d71d24a6323e3b4eaee856c30cebb5768..915958ce97b547d74832e59a9ef6d154e8551ab0 100644 (file)
@@ -210,14 +210,14 @@ configure_version() {
   chown "$WWW_OWNER:" $RELEASE_PATH/config/environment.rb
   chown "$WWW_OWNER:" $RELEASE_PATH/config.ru
   chown "$WWW_OWNER:" $RELEASE_PATH/Gemfile.lock
-  chown -R "$WWW_OWNER:" $RELEASE_PATH/tmp
+  chown -R "$WWW_OWNER:" $RELEASE_PATH/tmp || true
   chown -R "$WWW_OWNER:" $SHARED_PATH/log
   case "$RAILSPKG_DATABASE_LOAD_TASK" in
       db:schema:load) chown "$WWW_OWNER:" $RELEASE_PATH/db/schema.rb ;;
       db:structure:load) chown "$WWW_OWNER:" $RELEASE_PATH/db/structure.sql ;;
   esac
   chmod 644 $SHARED_PATH/log/*
-  chmod -R 2775 $RELEASE_PATH/tmp
+  chmod -R 2775 $RELEASE_PATH/tmp || true
   echo "... done."
 
   if [ -n "$RAILSPKG_DATABASE_LOAD_TASK" ]; then
index 264f27d12b0202a9267a548a73e684460f8f5aa3..053a6dfb30de1782b81ec0a545fcd1844c1b4673 100755 (executable)
@@ -77,6 +77,9 @@ case "$TARGET" in
     ubuntu1404)
         FORMAT=deb
         ;;
+    ubuntu1604)
+        FORMAT=deb
+        ;;
     centos7)
         FORMAT=rpm
         ;;
index cb4c82b6352a3bf8cd1573b6c45010b00f6849bc..9649d7bb838b04d5351d75f20b8de0e4e28b319d 100755 (executable)
@@ -179,6 +179,9 @@ sanity_checks() {
     echo -n 'fuse.h: '
     find /usr/include -wholename '*fuse/fuse.h' \
         || fatal "No fuse/fuse.h. Try: apt-get install libfuse-dev"
+    echo -n 'gnutls.h: '
+    find /usr/include -wholename '*gnutls/gnutls.h' \
+        || fatal "No gnutls/gnutls.h. Try: apt-get install libgnutls28-dev"
     echo -n 'pyconfig.h: '
     find /usr/include -name pyconfig.h | egrep --max-count=1 . \
         || fatal "No pyconfig.h. Try: apt-get install python-dev"
index 6caf36a18882115027c288717a74146b8281dd49..4fd1edefe455155fcc11b2b094c7b7a6a4fcb3bd 100644 (file)
@@ -20,7 +20,7 @@
       </ul>
 
       <div class="pull-right" style="padding-top: 6px">
-        <form method="get" action="http://www.google.com/search">
+        <form method="get" action="https://www.google.com/search">
           <div class="input-group" style="width: 220px">
             <input type="text" class="form-control" name="q" placeholder="search">
             <div class="input-group-addon">
index 3809b2f1eb65c254ae400b7d661ac0869ce2d833..75bf3d1ccca0c6845afc9f2d81deac5736710a0d 100644 (file)
@@ -43,6 +43,8 @@ table(table table-bordered table-condensed).
 |cwd|string|Initial working directory, given as an absolute path (in the container) or a path relative to the WORKDIR given in the image's Dockerfile.|Required.|
 |command|array of strings|Command to execute in the container.|Required. e.g., @["echo","hello"]@|
 |output_path|string|Path to a directory or file inside the container that should be preserved as container's output when it finishes. This path must be, or be inside, one of the mount targets. For best performance, point output_path to a writable collection mount. Also, see "Pre-populate output using Mount points":#pre-populate-output for details regarding optional output pre-population using mount points.|Required.|
+|output_name|string|Desired name for the output collection. If null, a name will be assigned automatically.||
+|output_ttl|integer|Desired lifetime for the output collection, in seconds. If zero, the output collection will not be deleted automatically.||
 |priority|integer|Higher value means spend more resources on this container_request, i.e., go ahead of other queued containers, bring up more nodes etc.|Priority 0 means a container should not be run on behalf of this request. Clients are expected to submit container requests with zero priority in order to preview the container that will be used to satisfy it. Priority can be null if and only if state!="Committed".|
 |expires_at|datetime|After this time, priority is considered to be zero.|Not yet implemented.|
 |use_existing|boolean|If possible, use an existing (non-failed) container to satisfy the request instead of creating a new one.|Default is true|
index baf7c2fc7c8e0c1b408d85e43cd36d3102147ac6..0cad10c5a92229a02a7263e0ae8e36fdcac7adca 100644 (file)
@@ -48,6 +48,16 @@ h3(#aws). Amazon Web Services
 # EC2 configuration for Arvados Node Manager.
 # All times are in seconds unless specified otherwise.
 
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
 [Daemon]
 # The dispatcher can customize the start and stop procedure for
 # cloud nodes.  For example, the SLURM dispatcher drains nodes
@@ -213,6 +223,16 @@ h3(#gcp). Google Cloud Platform
 # Google Compute Engine configuration for Arvados Node Manager.
 # All times are in seconds unless specified otherwise.
 
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
 [Daemon]
 # Node Manager will ensure that there are at least this many nodes running at
 # all times.  If node manager needs to start new idle nodes for the purpose of
@@ -380,6 +400,16 @@ h3(#azure). Microsoft Azure
 # Azure configuration for Arvados Node Manager.
 # All times are in seconds unless specified otherwise.
 
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
 [Daemon]
 # The dispatcher can customize the start and stop procedure for
 # cloud nodes.  For example, the SLURM dispatcher drains nodes
index 04f454369cc6541be477fe3f585c637f45c7aee9..b1f6bdf857a0fcda9b4f44d4db4778863093067d 100644 (file)
@@ -140,7 +140,7 @@ class TestArvKeepGet < Minitest::Test
       assert_arv_get false, 'e796ab2294f3e48ec709ffa8d6daf58c'
     end
     assert_equal '', out
-    assert_match /Error:/, err
+    assert_match /ERROR:/, err
   end
 
   def test_nonexistent_manifest
@@ -148,7 +148,7 @@ class TestArvKeepGet < Minitest::Test
       assert_arv_get false, 'acbd18db4cc2f85cedef654fccc4a4d8/', 'tmp/'
     end
     assert_equal '', out
-    assert_match /Error:/, err
+    assert_match /ERROR:/, err
   end
 
   def test_manifest_root_to_dir
index 3090936121daf32d9ffa61744d1f5b932b73590a..28107b491ca45f760813c03cefa63bb9c693b331 100644 (file)
@@ -1,6 +1,7 @@
 import logging
 import json
 import os
+import urllib
 
 import ruamel.yaml as yaml
 
@@ -77,7 +78,7 @@ class ArvadosContainer(object):
                     "portable_data_hash": pdh
                 }
                 if len(sp) == 2:
-                    mounts[p]["path"] = sp[1]
+                    mounts[p]["path"] = urllib.unquote(sp[1])
 
         with Perf(metrics, "generatefiles %s" % self.name):
             if self.generatefiles["listing"]:
index 70aa69f669be6d6e8c0a7210ea3da0808af3e6bc..3a3d16073833a6367876b5833b54cbb8f35584e7 100644 (file)
@@ -80,7 +80,10 @@ class CollectionFsAccess(cwltool.stdfsaccess.StdFsAccess):
     def exists(self, fn):
         collection, rest = self.get_collection(fn)
         if collection:
-            return collection.exists(rest)
+            if rest:
+                return collection.exists(rest)
+            else:
+                return True
         else:
             return super(CollectionFsAccess, self).exists(fn)
 
index 1f6aa577c180c65a9f8a31b0155c863f17e59b67..a8619a8598a538d5ba7353390bc63e316a76a648 100644 (file)
@@ -2,6 +2,7 @@ import re
 import logging
 import uuid
 import os
+import urllib
 
 import arvados.commands.run
 import arvados.collection
@@ -17,7 +18,7 @@ class ArvPathMapper(PathMapper):
     """Convert container-local paths to and from Keep collection ids."""
 
     pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
-    pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.+)?$')
+    pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
 
     def __init__(self, arvrunner, referenced_files, input_basedir,
                  collection_pattern, file_pattern, name=None, **kwargs):
@@ -34,7 +35,7 @@ class ArvPathMapper(PathMapper):
             if "#" in src:
                 src = src[:src.index("#")]
             if isinstance(src, basestring) and ArvPathMapper.pdh_path.match(src):
-                self._pathmap[src] = MapperEnt(src, self.collection_pattern % src[5:], "File")
+                self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), "File")
             if src not in self._pathmap:
                 # Local FS ref, may need to be uploaded or may be on keep
                 # mount.
@@ -44,7 +45,7 @@ class ArvPathMapper(PathMapper):
                     if isinstance(st, arvados.commands.run.UploadFile):
                         uploadfiles.add((src, ab, st))
                     elif isinstance(st, arvados.commands.run.ArvFile):
-                        self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % st.fn[5:], "File")
+                        self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File")
                     elif src.startswith("_:"):
                         if "contents" in srcobj:
                             pass
@@ -59,7 +60,7 @@ class ArvPathMapper(PathMapper):
                     self.visit(l, uploadfiles)
         elif srcobj["class"] == "Directory":
             if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
-                self._pathmap[src] = MapperEnt(src, self.collection_pattern % src[5:], "Directory")
+                self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), "Directory")
             for l in srcobj.get("listing", []):
                 self.visit(l, uploadfiles)
 
@@ -90,7 +91,7 @@ class ArvPathMapper(PathMapper):
             loc = k["location"]
             if loc in already_uploaded:
                 v = already_uploaded[loc]
-                self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % v.resolved[5:], "File")
+                self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % urllib.unquote(v.resolved[5:]), "File")
 
         for srcobj in referenced_files:
             self.visit(srcobj, uploadfiles)
@@ -105,7 +106,7 @@ class ArvPathMapper(PathMapper):
                                              project=self.arvrunner.project_uuid)
 
         for src, ab, st in uploadfiles:
-            self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % st.fn[5:], "File")
+            self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:], "File")
             self.arvrunner.add_uploaded(src, self._pathmap[src])
 
         for srcobj in referenced_files:
index b5a3e9930de668f4a7d64e6d40fa364dc5e75fa3..8d076093abfe6b6fcbb52ccea421763d76aa38b6 100755 (executable)
@@ -74,6 +74,8 @@ export ARVADOS_API_HOST=localhost:8000
 export ARVADOS_API_HOST_INSECURE=1
 export ARVADOS_API_TOKEN=\$(cat /var/lib/arvados/superuser_token)
 
+arv-keepdocker --pull arvados/jobs latest
+
 cat >/tmp/cwltest/arv-cwl-jobs <<EOF2
 #!/bin/sh
 exec arvados-cwl-runner --api=jobs --compute-checksum \\\$@
index 86467040a5c7440ae6aed42b506fbae935ab45eb..2c03812ed3e38e339f1323abd25e2bc4ec61c8dc 100755 (executable)
@@ -2,4 +2,4 @@
 if ! arv-get d7514270f356df848477718d58308cc4+94 > /dev/null ; then
     arv-put --portable-data-hash testdir
 fi
-exec cwltest --test arvados-tests.yml --tool $PWD/runner.sh
+exec cwltest --test arvados-tests.yml --tool $PWD/runner.sh $@
index 1187962a8eb5935d93a47308a8326a8de658eb3a..1beadb9de4e4e4a3897f34f57f704caff75a30ac 100644 (file)
@@ -8,3 +8,41 @@
     }
   tool: keep-dir-test-input.cwl
   doc: Test directory in keep
+
+- job: dir-job2.yml
+  output:
+    "outlist": {
+        "size": 20,
+        "location": "output.txt",
+        "class": "File",
+        "checksum": "sha1$13cda8661796ae241da3a18668fb552161a72592"
+    }
+  tool: keep-dir-test-input.cwl
+  doc: Test directory in keep
+
+- job: null
+  output:
+    "outlist": {
+        "size": 20,
+        "location": "output.txt",
+        "class": "File",
+        "checksum": "sha1$13cda8661796ae241da3a18668fb552161a72592"
+    }
+  tool: keep-dir-test-input2.cwl
+  doc: Test default directory in keep
+
+- job: null
+  output:
+    "outlist": {
+        "size": 20,
+        "location": "output.txt",
+        "class": "File",
+        "checksum": "sha1$13cda8661796ae241da3a18668fb552161a72592"
+    }
+  tool: keep-dir-test-input3.cwl
+  doc: Test default directory in keep
+
+- job: octo.yml
+  output: {}
+  tool: cat.cwl
+  doc: Test hashes in filenames
diff --git a/sdk/cwl/tests/cat.cwl b/sdk/cwl/tests/cat.cwl
new file mode 100644 (file)
index 0000000..93af517
--- /dev/null
@@ -0,0 +1,8 @@
+cwlVersion: v1.0
+class: CommandLineTool
+inputs:
+  - id: inp
+    type: File
+    inputBinding: {}
+outputs: []
+baseCommand: cat
diff --git a/sdk/cwl/tests/dir-job2.yml b/sdk/cwl/tests/dir-job2.yml
new file mode 100644 (file)
index 0000000..5c654c9
--- /dev/null
@@ -0,0 +1,3 @@
+indir:
+  class: Directory
+  location: keep:d7514270f356df848477718d58308cc4+94/
diff --git a/sdk/cwl/tests/keep-dir-test-input2.cwl b/sdk/cwl/tests/keep-dir-test-input2.cwl
new file mode 100644 (file)
index 0000000..7a355ab
--- /dev/null
@@ -0,0 +1,24 @@
+class: CommandLineTool
+cwlVersion: v1.0
+requirements:
+  - class: ShellCommandRequirement
+inputs:
+  indir:
+    type: Directory
+    inputBinding:
+      prefix: cd
+      position: -1
+    default:
+      class: Directory
+      location: keep:d7514270f356df848477718d58308cc4+94
+outputs:
+  outlist:
+    type: File
+    outputBinding:
+      glob: output.txt
+arguments: [
+  {shellQuote: false, valueFrom: "&&"},
+  "find", ".",
+  {shellQuote: false, valueFrom: "|"},
+  "sort"]
+stdout: output.txt
\ No newline at end of file
diff --git a/sdk/cwl/tests/keep-dir-test-input3.cwl b/sdk/cwl/tests/keep-dir-test-input3.cwl
new file mode 100644 (file)
index 0000000..f7321c8
--- /dev/null
@@ -0,0 +1,24 @@
+class: CommandLineTool
+cwlVersion: v1.0
+requirements:
+  - class: ShellCommandRequirement
+inputs:
+  indir:
+    type: Directory
+    inputBinding:
+      prefix: cd
+      position: -1
+    default:
+      class: Directory
+      location: keep:d7514270f356df848477718d58308cc4+94/
+outputs:
+  outlist:
+    type: File
+    outputBinding:
+      glob: output.txt
+arguments: [
+  {shellQuote: false, valueFrom: "&&"},
+  "find", ".",
+  {shellQuote: false, valueFrom: "|"},
+  "sort"]
+stdout: output.txt
\ No newline at end of file
diff --git a/sdk/cwl/tests/octo.yml b/sdk/cwl/tests/octo.yml
new file mode 100644 (file)
index 0000000..f6530df
--- /dev/null
@@ -0,0 +1,3 @@
+inp:
+  class: File
+  location: "octothorpe/item %231.txt"
\ No newline at end of file
diff --git a/sdk/cwl/tests/octothorpe/item #1.txt b/sdk/cwl/tests/octothorpe/item #1.txt
new file mode 100644 (file)
index 0000000..e69de29
index 22447794f1c7f93c4eca6fcdec83d254cbc1740b..9adbb4878f40541eb13c0feed39bf22241f4c3f5 100644 (file)
@@ -7,9 +7,12 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/streamer"
        "io"
        "io/ioutil"
+       "log"
        "math/rand"
        "net"
        "net/http"
+       "os"
+       "regexp"
        "strings"
        "time"
 )
@@ -19,6 +22,13 @@ import (
 // log.Printf to DebugPrintf.
 var DebugPrintf = func(string, ...interface{}) {}
 
+func init() {
+       var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
+       if matchTrue.MatchString(os.Getenv("ARVADOS_DEBUG")) {
+               DebugPrintf = log.Printf
+       }
+}
+
 type keepService struct {
        Uuid     string `json:"uuid"`
        Hostname string `json:"service_host"`
index a06170d2e4916939f25b50dfcccfe5ec67d136f8..6581a8e9acb59bbd81eb57997550d82543887b53 100644 (file)
@@ -48,7 +48,7 @@ class OrderedJsonModel(apiclient.model.JsonModel):
         return body
 
 
-def _intercept_http_request(self, uri, **kwargs):
+def _intercept_http_request(self, uri, method="GET", **kwargs):
     if (self.max_request_size and
         kwargs.get('body') and
         self.max_request_size < len(kwargs['body'])):
@@ -62,7 +62,7 @@ def _intercept_http_request(self, uri, **kwargs):
 
     kwargs['headers']['Authorization'] = 'OAuth2 %s' % self.arvados_api_token
 
-    retryable = kwargs.get('method', 'GET') in [
+    retryable = method in [
         'DELETE', 'GET', 'HEAD', 'OPTIONS', 'PUT']
     retry_count = self._retry_count if retryable else 0
 
@@ -79,7 +79,7 @@ def _intercept_http_request(self, uri, **kwargs):
     for _ in range(retry_count):
         self._last_request_time = time.time()
         try:
-            return self.orig_http_request(uri, **kwargs)
+            return self.orig_http_request(uri, method, **kwargs)
         except http.client.HTTPException:
             _logger.debug("Retrying API request in %d s after HTTP error",
                           delay, exc_info=True)
@@ -97,7 +97,7 @@ def _intercept_http_request(self, uri, **kwargs):
         delay = delay * self._retry_delay_backoff
 
     self._last_request_time = time.time()
-    return self.orig_http_request(uri, **kwargs)
+    return self.orig_http_request(uri, method, **kwargs)
 
 def _patch_http_request(http, api_token):
     http.arvados_api_token = api_token
diff --git a/sdk/python/arvados/commands/get.py b/sdk/python/arvados/commands/get.py
new file mode 100755 (executable)
index 0000000..f39e092
--- /dev/null
@@ -0,0 +1,276 @@
+#!/usr/bin/env python
+
+import argparse
+import hashlib
+import os
+import re
+import string
+import sys
+import logging
+
+import arvados
+import arvados.commands._util as arv_cmd
+
+from arvados._version import __version__
+
+api_client = None
+logger = logging.getLogger('arvados.arv-get')
+
+parser = argparse.ArgumentParser(
+    description='Copy data from Keep to a local file or pipe.',
+    parents=[arv_cmd.retry_opt])
+parser.add_argument('--version', action='version',
+                    version="%s %s" % (sys.argv[0], __version__),
+                    help='Print version and exit.')
+parser.add_argument('locator', type=str,
+                    help="""
+Collection locator, optionally with a file path or prefix.
+""")
+parser.add_argument('destination', type=str, nargs='?', default='-',
+                    help="""
+Local file or directory where the data is to be written. Default: stdout.
+""")
+group = parser.add_mutually_exclusive_group()
+group.add_argument('--progress', action='store_true',
+                   help="""
+Display human-readable progress on stderr (bytes and, if possible,
+percentage of total data size). This is the default behavior when it
+is not expected to interfere with the output: specifically, stderr is
+a tty _and_ either stdout is not a tty, or output is being written to
+named files rather than stdout.
+""")
+group.add_argument('--no-progress', action='store_true',
+                   help="""
+Do not display human-readable progress on stderr.
+""")
+group.add_argument('--batch-progress', action='store_true',
+                   help="""
+Display machine-readable progress on stderr (bytes and, if known,
+total data size).
+""")
+group = parser.add_mutually_exclusive_group()
+group.add_argument('--hash',
+                    help="""
+Display the hash of each file as it is read from Keep, using the given
+hash algorithm. Supported algorithms include md5, sha1, sha224,
+sha256, sha384, and sha512.
+""")
+group.add_argument('--md5sum', action='store_const',
+                    dest='hash', const='md5',
+                    help="""
+Display the MD5 hash of each file as it is read from Keep.
+""")
+parser.add_argument('-n', action='store_true',
+                    help="""
+Do not write any data -- just read from Keep, and report md5sums if
+requested.
+""")
+parser.add_argument('-r', action='store_true',
+                    help="""
+Retrieve all files in the specified collection/prefix. This is the
+default behavior if the "locator" argument ends with a forward slash.
+""")
+group = parser.add_mutually_exclusive_group()
+group.add_argument('-f', action='store_true',
+                   help="""
+Overwrite existing files while writing. The default behavior is to
+refuse to write *anything* if any of the output files already
+exist. As a special case, -f is not needed to write to stdout.
+""")
+group.add_argument('--skip-existing', action='store_true',
+                   help="""
+Skip files that already exist. The default behavior is to refuse to
+write *anything* if any files exist that would have to be
+overwritten. This option causes even devices, sockets, and fifos to be
+skipped.
+""")
+
+def parse_arguments(arguments, stdout, stderr):
+    args = parser.parse_args(arguments)
+
+    if args.locator[-1] == os.sep:
+        args.r = True
+    if (args.r and
+        not args.n and
+        not (args.destination and
+             os.path.isdir(args.destination))):
+        parser.error('Destination is not a directory.')
+    if not args.r and (os.path.isdir(args.destination) or
+                       args.destination[-1] == os.path.sep):
+        args.destination = os.path.join(args.destination,
+                                        os.path.basename(args.locator))
+        logger.debug("Appended source file name to destination directory: %s",
+                     args.destination)
+
+    if args.destination == '/dev/stdout':
+        args.destination = "-"
+
+    if args.destination == '-':
+        # Normally you have to use -f to write to a file (or device) that
+        # already exists, but "-" and "/dev/stdout" are common enough to
+        # merit a special exception.
+        args.f = True
+    else:
+        args.destination = args.destination.rstrip(os.sep)
+
+    # Turn on --progress by default if stderr is a tty and output is
+    # either going to a named file, or going (via stdout) to something
+    # that isn't a tty.
+    if (not (args.batch_progress or args.no_progress)
+        and stderr.isatty()
+        and (args.destination != '-'
+             or not stdout.isatty())):
+        args.progress = True
+    return args
+
+def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
+    global api_client
+    
+    args = parse_arguments(arguments, stdout, stderr)
+    if api_client is None:
+        api_client = arvados.api('v1')
+
+    r = re.search(r'^(.*?)(/.*)?$', args.locator)
+    collection = r.group(1)
+    get_prefix = r.group(2)
+    if args.r and not get_prefix:
+        get_prefix = os.sep
+    try:
+        reader = arvados.CollectionReader(collection, num_retries=args.retries)
+    except Exception as error:
+        logger.error("failed to read collection: {}".format(error))
+        return 1
+
+    # User asked to download the collection's manifest
+    if not get_prefix:
+        if not args.n:
+            open_flags = os.O_CREAT | os.O_WRONLY
+            if not args.f:
+                open_flags |= os.O_EXCL
+            try:
+                if args.destination == "-":
+                    stdout.write(reader.manifest_text())
+                else:
+                    out_fd = os.open(args.destination, open_flags)
+                    with os.fdopen(out_fd, 'wb') as out_file:
+                        out_file.write(reader.manifest_text())
+            except (IOError, OSError) as error:
+                logger.error("can't write to '{}': {}".format(args.destination, error))
+                return 1
+            except (arvados.errors.ApiError, arvados.errors.KeepReadError) as error:
+                logger.error("failed to download '{}': {}".format(collection, error))
+                return 1
+        return 0
+
+    # Scan the collection. Make an array of (stream, file, local
+    # destination filename) tuples, and add up total size to extract.
+    todo = []
+    todo_bytes = 0
+    try:
+        if get_prefix == os.sep:
+            item = reader
+        else:
+            item = reader.find('.' + get_prefix)
+
+        if isinstance(item, arvados.collection.Subcollection) or isinstance(item, arvados.collection.CollectionReader):
+            # If the user asked for a file and we got a subcollection, error out.
+            if get_prefix[-1] != os.sep:
+                logger.error("requested file '{}' is in fact a subcollection. Append a trailing '/' to download it.".format('.' + get_prefix))
+                return 1
+            # If the user asked stdout as a destination, error out.
+            elif args.destination == '-':
+                logger.error("cannot use 'stdout' as destination when downloading multiple files.")
+                return 1
+            # User asked for a subcollection, and that's what was found. Add up total size
+            # to download.
+            for s, f in files_in_collection(item):
+                dest_path = os.path.join(
+                    args.destination,
+                    os.path.join(s.stream_name(), f.name)[len(get_prefix)+1:])
+                if (not (args.n or args.f or args.skip_existing) and
+                    os.path.exists(dest_path)):
+                    logger.error('Local file %s already exists.' % (dest_path,))
+                    return 1
+                todo += [(s, f, dest_path)]
+                todo_bytes += f.size()
+        elif isinstance(item, arvados.arvfile.ArvadosFile):
+            todo += [(item.parent, item, args.destination)]
+            todo_bytes += item.size()
+        else:
+            logger.error("'{}' not found.".format('.' + get_prefix))
+            return 1
+    except (IOError, arvados.errors.NotFoundError) as e:
+        logger.error(e)
+        return 1
+
+    out_bytes = 0
+    for s, f, outfilename in todo:
+        outfile = None
+        digestor = None
+        if not args.n:
+            if outfilename == "-":
+                outfile = stdout
+            else:
+                if args.skip_existing and os.path.exists(outfilename):
+                    logger.debug('Local file %s exists. Skipping.', outfilename)
+                    continue
+                elif not args.f and (os.path.isfile(outfilename) or
+                                   os.path.isdir(outfilename)):
+                    # Good thing we looked again: apparently this file wasn't
+                    # here yet when we checked earlier.
+                    logger.error('Local file %s already exists.' % (outfilename,))
+                    return 1
+                if args.r:
+                    arvados.util.mkdir_dash_p(os.path.dirname(outfilename))
+                try:
+                    outfile = open(outfilename, 'wb')
+                except Exception as error:
+                    logger.error('Open(%s) failed: %s' % (outfilename, error))
+                    return 1
+        if args.hash:
+            digestor = hashlib.new(args.hash)
+        try:
+            with s.open(f.name, 'r') as file_reader:
+                for data in file_reader.readall():
+                    if outfile:
+                        outfile.write(data)
+                    if digestor:
+                        digestor.update(data)
+                    out_bytes += len(data)
+                    if args.progress:
+                        stderr.write('\r%d MiB / %d MiB %.1f%%' %
+                                     (out_bytes >> 20,
+                                      todo_bytes >> 20,
+                                      (100
+                                       if todo_bytes==0
+                                       else 100.0*out_bytes/todo_bytes)))
+                    elif args.batch_progress:
+                        stderr.write('%s %d read %d total\n' %
+                                     (sys.argv[0], os.getpid(),
+                                      out_bytes, todo_bytes))
+            if digestor:
+                stderr.write("%s  %s/%s\n"
+                             % (digestor.hexdigest(), s.stream_name(), f.name))
+        except KeyboardInterrupt:
+            if outfile and (outfile.fileno() > 2) and not outfile.closed:
+                os.unlink(outfile.name)
+            break
+        finally:
+            if outfile != None and outfile != stdout:
+                outfile.close()
+
+    if args.progress:
+        stderr.write('\n')
+    return 0
+
+def files_in_collection(c):
+    # Sort first by file type, then alphabetically by file path.
+    for i in sorted(c.keys(),
+                    key=lambda k: (
+                        isinstance(c[k], arvados.collection.Subcollection),
+                        k.upper())):
+        if isinstance(c[i], arvados.arvfile.ArvadosFile):
+            yield (c, c[i])
+        elif isinstance(c[i], arvados.collection.Subcollection):
+            for s, f in files_in_collection(c[i]):
+                yield (s, f)
index 250bda8f24ac1d46d829fbb42ecbfeeb5343d80b..3e3f67142b579de674089de49563ea8f243ec5a4 100644 (file)
@@ -2,6 +2,9 @@ from __future__ import print_function
 from __future__ import division
 
 import argparse
+import collections
+import logging
+import re
 import sys
 
 import arvados
@@ -9,13 +12,15 @@ import arvados.commands._util as arv_cmd
 
 from arvados._version import __version__
 
+FileInfo = collections.namedtuple('FileInfo', ['stream_name', 'name', 'size'])
+
 def parse_args(args):
     parser = argparse.ArgumentParser(
         description='List contents of a manifest',
         parents=[arv_cmd.retry_opt])
 
     parser.add_argument('locator', type=str,
-                        help="""Collection UUID or locator""")
+                        help="""Collection UUID or locator, optionally with a subdir path.""")
     parser.add_argument('-s', action='store_true',
                         help="""List file sizes, in KiB.""")
     parser.add_argument('--version', action='version',
@@ -25,25 +30,43 @@ def parse_args(args):
     return parser.parse_args(args)
 
 def size_formatter(coll_file):
-    return "{:>10}".format((coll_file.size() + 1023) // 1024)
+    return "{:>10}".format((coll_file.size + 1023) // 1024)
 
 def name_formatter(coll_file):
-    return "{}/{}".format(coll_file.stream_name(), coll_file.name)
+    return "{}/{}".format(coll_file.stream_name, coll_file.name)
 
-def main(args, stdout, stderr, api_client=None):
+def main(args, stdout, stderr, api_client=None, logger=None):
     args = parse_args(args)
 
     if api_client is None:
         api_client = arvados.api('v1')
 
+    if logger is None:
+        logger = logging.getLogger('arvados.arv-ls')
+
     try:
-        cr = arvados.CollectionReader(args.locator, api_client=api_client,
+        r = re.search(r'^(.*?)(/.*)?$', args.locator)
+        collection = r.group(1)
+        get_prefix = r.group(2)
+
+        cr = arvados.CollectionReader(collection, api_client=api_client,
                                       num_retries=args.retries)
-        cr.normalize()
-    except (arvados.errors.ArgumentError,
+        if get_prefix:
+            if get_prefix[-1] == '/':
+                get_prefix = get_prefix[:-1]
+            stream_name = '.' + get_prefix
+            reader = cr.find(stream_name)
+            if not (isinstance(reader, arvados.CollectionReader) or
+                    isinstance(reader, arvados.collection.Subcollection)):
+                logger.error("'{}' is not a subdirectory".format(get_prefix))
+                return 1
+        else:
+            stream_name = '.'
+            reader = cr
+    except (arvados.errors.ApiError,
+            arvados.errors.ArgumentError,
             arvados.errors.NotFoundError) as error:
-        print("arv-ls: error fetching collection: {}".format(error),
-              file=stderr)
+        logger.error("error fetching collection: {}".format(error))
         return 1
 
     formatters = []
@@ -51,7 +74,21 @@ def main(args, stdout, stderr, api_client=None):
         formatters.append(size_formatter)
     formatters.append(name_formatter)
 
-    for f in cr.all_files():
+    for f in files_in_collection(reader, stream_name):
         print(*(info_func(f) for info_func in formatters), file=stdout)
 
     return 0
+
+def files_in_collection(c, stream_name='.'):
+    # Sort first by file type, then alphabetically by file path.
+    for i in sorted(c.keys(),
+                    key=lambda k: (
+                        isinstance(c[k], arvados.collection.Subcollection),
+                        k.upper())):
+        if isinstance(c[i], arvados.arvfile.ArvadosFile):
+            yield FileInfo(stream_name=stream_name,
+                           name=i,
+                           size=c[i].size())
+        elif isinstance(c[i], arvados.collection.Subcollection):
+            for f in files_in_collection(c[i], "{}/{}".format(stream_name, i)):
+                yield f
index ce34e180c12dbb3885e91a4fa7365b000a5c07cf..b0413ebf92a06985591685c54e567b890f6827b1 100644 (file)
@@ -6,9 +6,10 @@ from builtins import next
 from builtins import str
 from builtins import range
 from builtins import object
-import io
+import collections
 import datetime
 import hashlib
+import io
 import logging
 import math
 import os
@@ -321,9 +322,22 @@ class KeepClient(object):
             except:
                 ua.close()
 
-        def _socket_open(self, family, socktype, protocol, address=None):
+        def _socket_open(self, *args, **kwargs):
+            if len(args) + len(kwargs) == 2:
+                return self._socket_open_pycurl_7_21_5(*args, **kwargs)
+            else:
+                return self._socket_open_pycurl_7_19_3(*args, **kwargs)
+
+        def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
+            return self._socket_open_pycurl_7_21_5(
+                purpose=None,
+                address=collections.namedtuple(
+                    'Address', ['family', 'socktype', 'protocol', 'addr'],
+                )(family, socktype, protocol, address))
+
+        def _socket_open_pycurl_7_21_5(self, purpose, address):
             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
-            s = socket.socket(family, socktype, protocol)
+            s = socket.socket(address.family, address.socktype, address.protocol)
             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
             # Will throw invalid protocol error on mac. This test prevents that.
             if hasattr(socket, 'TCP_KEEPIDLE'):
index f91b3977090da7c6f8b30844635174d122e67ba2..1c2e552490dcd49ba3fe1d9b893b20329f585fac 100755 (executable)
@@ -1,238 +1,7 @@
 #!/usr/bin/env python
 
-import argparse
-import hashlib
-import os
-import re
-import string
 import sys
-import logging
 
-import arvados
-import arvados.commands._util as arv_cmd
+from arvados.commands.get import main
 
-from arvados._version import __version__
-
-logger = logging.getLogger('arvados.arv-get')
-
-def abort(msg, code=1):
-    print >>sys.stderr, "arv-get:", msg
-    exit(code)
-
-parser = argparse.ArgumentParser(
-    description='Copy data from Keep to a local file or pipe.',
-    parents=[arv_cmd.retry_opt])
-parser.add_argument('--version', action='version',
-                    version="%s %s" % (sys.argv[0], __version__),
-                    help='Print version and exit.')
-parser.add_argument('locator', type=str,
-                    help="""
-Collection locator, optionally with a file path or prefix.
-""")
-parser.add_argument('destination', type=str, nargs='?', default='-',
-                    help="""
-Local file or directory where the data is to be written. Default: stdout.
-""")
-group = parser.add_mutually_exclusive_group()
-group.add_argument('--progress', action='store_true',
-                   help="""
-Display human-readable progress on stderr (bytes and, if possible,
-percentage of total data size). This is the default behavior when it
-is not expected to interfere with the output: specifically, stderr is
-a tty _and_ either stdout is not a tty, or output is being written to
-named files rather than stdout.
-""")
-group.add_argument('--no-progress', action='store_true',
-                   help="""
-Do not display human-readable progress on stderr.
-""")
-group.add_argument('--batch-progress', action='store_true',
-                   help="""
-Display machine-readable progress on stderr (bytes and, if known,
-total data size).
-""")
-group = parser.add_mutually_exclusive_group()
-group.add_argument('--hash',
-                    help="""
-Display the hash of each file as it is read from Keep, using the given
-hash algorithm. Supported algorithms include md5, sha1, sha224,
-sha256, sha384, and sha512.
-""")
-group.add_argument('--md5sum', action='store_const',
-                    dest='hash', const='md5',
-                    help="""
-Display the MD5 hash of each file as it is read from Keep.
-""")
-parser.add_argument('-n', action='store_true',
-                    help="""
-Do not write any data -- just read from Keep, and report md5sums if
-requested.
-""")
-parser.add_argument('-r', action='store_true',
-                    help="""
-Retrieve all files in the specified collection/prefix. This is the
-default behavior if the "locator" argument ends with a forward slash.
-""")
-group = parser.add_mutually_exclusive_group()
-group.add_argument('-f', action='store_true',
-                   help="""
-Overwrite existing files while writing. The default behavior is to
-refuse to write *anything* if any of the output files already
-exist. As a special case, -f is not needed to write to stdout.
-""")
-group.add_argument('--skip-existing', action='store_true',
-                   help="""
-Skip files that already exist. The default behavior is to refuse to
-write *anything* if any files exist that would have to be
-overwritten. This option causes even devices, sockets, and fifos to be
-skipped.
-""")
-
-args = parser.parse_args()
-
-if args.locator[-1] == os.sep:
-    args.r = True
-if (args.r and
-    not args.n and
-    not (args.destination and
-         os.path.isdir(args.destination))):
-    parser.error('Destination is not a directory.')
-if not args.r and (os.path.isdir(args.destination) or
-                   args.destination[-1] == os.path.sep):
-    args.destination = os.path.join(args.destination,
-                                    os.path.basename(args.locator))
-    logger.debug("Appended source file name to destination directory: %s",
-                 args.destination)
-
-if args.destination == '/dev/stdout':
-    args.destination = "-"
-
-if args.destination == '-':
-    # Normally you have to use -f to write to a file (or device) that
-    # already exists, but "-" and "/dev/stdout" are common enough to
-    # merit a special exception.
-    args.f = True
-else:
-    args.destination = args.destination.rstrip(os.sep)
-
-# Turn on --progress by default if stderr is a tty and output is
-# either going to a named file, or going (via stdout) to something
-# that isn't a tty.
-if (not (args.batch_progress or args.no_progress)
-    and sys.stderr.isatty()
-    and (args.destination != '-'
-         or not sys.stdout.isatty())):
-    args.progress = True
-
-
-r = re.search(r'^(.*?)(/.*)?$', args.locator)
-collection = r.group(1)
-get_prefix = r.group(2)
-if args.r and not get_prefix:
-    get_prefix = os.sep
-api_client = arvados.api('v1')
-reader = arvados.CollectionReader(collection, num_retries=args.retries)
-
-if not get_prefix:
-    if not args.n:
-        open_flags = os.O_CREAT | os.O_WRONLY
-        if not args.f:
-            open_flags |= os.O_EXCL
-        try:
-            if args.destination == "-":
-                sys.stdout.write(reader.manifest_text())
-            else:
-                out_fd = os.open(args.destination, open_flags)
-                with os.fdopen(out_fd, 'wb') as out_file:
-                    out_file.write(reader.manifest_text())
-        except (IOError, OSError) as error:
-            abort("can't write to '{}': {}".format(args.destination, error))
-        except (arvados.errors.ApiError, arvados.errors.KeepReadError) as error:
-            abort("failed to download '{}': {}".format(collection, error))
-    sys.exit(0)
-
-reader.normalize()
-
-# Scan the collection. Make an array of (stream, file, local
-# destination filename) tuples, and add up total size to extract.
-todo = []
-todo_bytes = 0
-try:
-    for s in reader.all_streams():
-        for f in s.all_files():
-            if get_prefix and get_prefix[-1] == os.sep:
-                if 0 != string.find(os.path.join(s.name(), f.name()),
-                                    '.' + get_prefix):
-                    continue
-                if args.destination == "-":
-                    dest_path = "-"
-                else:
-                    dest_path = os.path.join(
-                        args.destination,
-                        os.path.join(s.name(), f.name())[len(get_prefix)+1:])
-                    if (not (args.n or args.f or args.skip_existing) and
-                        os.path.exists(dest_path)):
-                        abort('Local file %s already exists.' % (dest_path,))
-            else:
-                if os.path.join(s.name(), f.name()) != '.' + get_prefix:
-                    continue
-                dest_path = args.destination
-            todo += [(s, f, dest_path)]
-            todo_bytes += f.size()
-except arvados.errors.NotFoundError as e:
-    abort(e)
-
-# Read data, and (if not -n) write to local file(s) or pipe.
-
-out_bytes = 0
-for s,f,outfilename in todo:
-    outfile = None
-    digestor = None
-    if not args.n:
-        if outfilename == "-":
-            outfile = sys.stdout
-        else:
-            if args.skip_existing and os.path.exists(outfilename):
-                logger.debug('Local file %s exists. Skipping.', outfilename)
-                continue
-            elif not args.f and (os.path.isfile(outfilename) or
-                               os.path.isdir(outfilename)):
-                # Good thing we looked again: apparently this file wasn't
-                # here yet when we checked earlier.
-                abort('Local file %s already exists.' % (outfilename,))
-            if args.r:
-                arvados.util.mkdir_dash_p(os.path.dirname(outfilename))
-            try:
-                outfile = open(outfilename, 'wb')
-            except Exception as error:
-                abort('Open(%s) failed: %s' % (outfilename, error))
-    if args.hash:
-        digestor = hashlib.new(args.hash)
-    try:
-        for data in f.readall():
-            if outfile:
-                outfile.write(data)
-            if digestor:
-                digestor.update(data)
-            out_bytes += len(data)
-            if args.progress:
-                sys.stderr.write('\r%d MiB / %d MiB %.1f%%' %
-                                 (out_bytes >> 20,
-                                  todo_bytes >> 20,
-                                  (100
-                                   if todo_bytes==0
-                                   else 100.0*out_bytes/todo_bytes)))
-            elif args.batch_progress:
-                sys.stderr.write('%s %d read %d total\n' %
-                                 (sys.argv[0], os.getpid(),
-                                  out_bytes, todo_bytes))
-        if digestor:
-            sys.stderr.write("%s  %s/%s\n"
-                             % (digestor.hexdigest(), s.name(), f.name()))
-    except KeyboardInterrupt:
-        if outfile and (outfile.fileno() > 2) and not outfile.closed:
-            os.unlink(outfile.name)
-        break
-
-if args.progress:
-    sys.stderr.write('\n')
+sys.exit(main(sys.argv[1:], sys.stdout, sys.stderr))
index 9380a03caad23c274173d3fc33761489eee04de8..a07155ece332e35ad0315ea1a45095280a485efb 100644 (file)
@@ -45,16 +45,13 @@ setup(name='arvados-python-client',
           ('share/doc/arvados-python-client', ['LICENSE-2.0.txt', 'README.rst']),
       ],
       install_requires=[
-          'google-api-python-client==1.4.2',
-          'oauth2client >=1.4.6, <2',
+          'google-api-python-client >=1.6.2, <1.7',
           'ciso8601',
-          'httplib2',
-          'pycurl >=7.19.5.1, <7.21.5',
-          'python-gflags >=3',
+          'httplib2 >=0.9.2',
+          'pycurl >=7.19.5.1',
           'setuptools',
           'ws4py <0.4',
-          'ruamel.yaml >=0.13.7',
-          'future',
+          'ruamel.yaml >=0.13.7'
       ],
       test_suite='tests',
       tests_require=['pbr<1.7.0', 'mock>=1.0', 'PyYAML'],
diff --git a/sdk/python/tests/test_arv_get.py b/sdk/python/tests/test_arv_get.py
new file mode 100644 (file)
index 0000000..907c671
--- /dev/null
@@ -0,0 +1,108 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import io
+import shutil
+import tempfile
+
+import arvados
+import arvados.collection as collection
+import arvados.commands.get as arv_get
+import run_test_server
+
+from arvados_testutil import redirected_streams
+
+class ArvadosGetTestCase(run_test_server.TestCaseWithServers):
+    MAIN_SERVER = {}
+    KEEP_SERVER = {}
+
+    def setUp(self):
+        super(ArvadosGetTestCase, self).setUp()
+        self.tempdir = tempfile.mkdtemp()
+        self.col_loc, self.col_pdh, self.col_manifest = self.write_test_collection()
+
+    def tearDown(self):
+        super(ArvadosGetTestCase, self).tearDown()
+        shutil.rmtree(self.tempdir)
+
+    def write_test_collection(self,
+                              contents = {
+                                  'foo.txt' : 'foo',
+                                  'bar.txt' : 'bar',
+                                  'subdir/baz.txt' : 'baz',
+                              }):
+        c = collection.Collection()
+        for path, data in contents.items():
+            with c.open(path, 'w') as f:
+                f.write(data)
+        c.save_new()
+        return (c.manifest_locator(), c.portable_data_hash(), c.manifest_text())
+    
+    def run_get(self, args):
+        self.stdout = io.BytesIO()
+        self.stderr = io.BytesIO()
+        return arv_get.main(args, self.stdout, self.stderr)
+
+    def test_version_argument(self):
+        err = io.BytesIO()
+        out = io.BytesIO()
+        with redirected_streams(stdout=out, stderr=err):
+            with self.assertRaises(SystemExit):
+                self.run_get(['--version'])
+        self.assertEqual(out.getvalue(), '')
+        self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
+
+    def test_get_single_file(self):
+        # Get the file using the collection's locator
+        r = self.run_get(["{}/subdir/baz.txt".format(self.col_loc), '-'])
+        self.assertEqual(0, r)
+        self.assertEqual('baz', self.stdout.getvalue())
+        # Then, try by PDH
+        r = self.run_get(["{}/subdir/baz.txt".format(self.col_pdh), '-'])
+        self.assertEqual(0, r)
+        self.assertEqual('baz', self.stdout.getvalue())        
+
+    def test_get_multiple_files(self):
+        # Download the entire collection to the temp directory
+        r = self.run_get(["{}/".format(self.col_loc), self.tempdir])
+        self.assertEqual(0, r)
+        with open("{}/foo.txt".format(self.tempdir), "r") as f:
+            self.assertEqual("foo", f.read())
+        with open("{}/bar.txt".format(self.tempdir), "r") as f:
+            self.assertEqual("bar", f.read())
+        with open("{}/subdir/baz.txt".format(self.tempdir), "r") as f:
+            self.assertEqual("baz", f.read())
+
+    def test_get_collection_manifest(self):
+        # Get the collection manifest
+        r = self.run_get([self.col_loc, self.tempdir])
+        self.assertEqual(0, r)
+        with open("{}/{}".format(self.tempdir, self.col_loc), "r") as f:
+            self.assertEqual(self.col_manifest, f.read())
+
+    def test_invalid_collection(self):
+        # Asking for an invalid collection should generate an error.
+        r = self.run_get(['this-uuid-seems-to-be-fake', self.tempdir])
+        self.assertNotEqual(0, r)
+
+    def test_invalid_file_request(self):
+        # Asking for an inexistant file within a collection should generate an error.
+        r = self.run_get(["{}/im-not-here.txt".format(self.col_loc), self.tempdir])
+        self.assertNotEqual(0, r)
+
+    def test_invalid_destination(self):
+        # Asking to place the collection's files on a non existant directory
+        # should generate an error.
+        r = self.run_get([self.col_loc, "/fake/subdir/"])
+        self.assertNotEqual(0, r)
+
+    def test_preexistent_destination(self):
+        # Asking to place a file with the same path as a local one should
+        # generate an error and avoid overwrites.
+        with open("{}/foo.txt".format(self.tempdir), "w") as f:
+            f.write("another foo")
+        r = self.run_get(["{}/foo.txt".format(self.col_loc), self.tempdir])
+        self.assertNotEqual(0, r)
+        with open("{}/foo.txt".format(self.tempdir), "r") as f:
+            self.assertEqual("another foo", f.read())
+
index a7117d3d043d229da800f01048f868e1d0b18878..e3f6c128aab622dfc49945e50258007998bfeed6 100644 (file)
@@ -35,10 +35,10 @@ class ArvLsTestCase(run_test_server.TestCaseWithServers, tutil.VersionChecker):
         api_client.collections().get().execute.return_value = coll_info
         return coll_info, api_client
 
-    def run_ls(self, args, api_client):
+    def run_ls(self, args, api_client, logger=None):
         self.stdout = StringIO()
         self.stderr = StringIO()
-        return arv_ls.main(args, self.stdout, self.stderr, api_client)
+        return arv_ls.main(args, self.stdout, self.stderr, api_client, logger)
 
     def test_plain_listing(self):
         collection, api_client = self.mock_api_for_manifest(
@@ -76,10 +76,13 @@ class ArvLsTestCase(run_test_server.TestCaseWithServers, tutil.VersionChecker):
 
     def test_locator_failure(self):
         api_client = mock.MagicMock(name='mock_api_client')
+        error_mock = mock.MagicMock()
+        logger = mock.MagicMock()
+        logger.error = error_mock
         api_client.collections().get().execute.side_effect = (
             arv_error.NotFoundError)
-        self.assertNotEqual(0, self.run_ls([self.FAKE_UUID], api_client))
-        self.assertNotEqual('', self.stderr.getvalue())
+        self.assertNotEqual(0, self.run_ls([self.FAKE_UUID], api_client, logger))
+        self.assertEqual(1, error_mock.call_count)
 
     def test_version_argument(self):
         with redirected_streams(stdout=StringIO, stderr=StringIO) as (out, err):
index b77ba1cdf86593aa2eec723d69222ca6c2ebdb01..96dc85e1e44138485899942a084e8095d3eb9182 100644 (file)
@@ -386,36 +386,31 @@ class ArvadosModel < ActiveRecord::Base
       raise PermissionDeniedError
     end
 
-    # Verify "write" permission on old owner
-    # default fail unless one of:
-    # owner_uuid did not change
-    # previous owner_uuid is nil
-    # current user is the old owner
-    # current user is this object
-    # current user can_write old owner
-    unless !owner_uuid_changed? or
-        owner_uuid_was.nil? or
-        current_user.uuid == self.owner_uuid_was or
-        current_user.uuid == self.uuid or
-        current_user.can? write: self.owner_uuid_was
-      logger.warn "User #{current_user.uuid} tried to modify #{self.class.to_s} #{uuid} but does not have permission to write old owner_uuid #{owner_uuid_was}"
-      errors.add :owner_uuid, "cannot be changed without write permission on old owner"
-      raise PermissionDeniedError
-    end
-
-    # Verify "write" permission on new owner
-    # default fail unless one of:
-    # current_user is this object
-    # current user can_write new owner, or this object if owner unchanged
-    if new_record? or owner_uuid_changed? or is_a?(ApiClientAuthorization)
-      write_target = owner_uuid
+    if new_record? || owner_uuid_changed?
+      # Permission on owner_uuid_was is needed to move an existing
+      # object away from its previous owner (which implies permission
+      # to modify this object itself, so we don't need to check that
+      # separately). Permission on the new owner_uuid is also needed.
+      [['old', owner_uuid_was],
+       ['new', owner_uuid]
+      ].each do |which, check_uuid|
+        if check_uuid.nil?
+          # old_owner_uuid is nil? New record, no need to check.
+        elsif !current_user.can?(write: check_uuid)
+          logger.warn "User #{current_user.uuid} tried to set ownership of #{self.class.to_s} #{self.uuid} but does not have permission to write #{which} owner_uuid #{check_uuid}"
+          errors.add :owner_uuid, "cannot be set or changed without write permission on #{which} owner"
+          raise PermissionDeniedError
+        end
+      end
     else
-      write_target = uuid
-    end
-    unless current_user == self or current_user.can? write: write_target
-      logger.warn "User #{current_user.uuid} tried to modify #{self.class.to_s} #{uuid} but does not have permission to write new owner_uuid #{owner_uuid}"
-      errors.add :owner_uuid, "cannot be changed without write permission on new owner"
-      raise PermissionDeniedError
+      # If the object already existed and we're not changing
+      # owner_uuid, we only need write permission on the object
+      # itself.
+      if !current_user.can?(write: self.uuid)
+        logger.warn "User #{current_user.uuid} tried to modify #{self.class.to_s} #{self.uuid} without write permission"
+        errors.add :uuid, "is not writable"
+        raise PermissionDeniedError
+      end
     end
 
     true
index b45c178fb498af1eb588c55982ad38f2bfdeb70e..4a1e610303189a28ae3c029836f9ece92e50c4e0 100644 (file)
@@ -510,33 +510,49 @@ class Collection < ArvadosModel
     true
   end
 
-  # If trash_at is updated without touching delete_at, automatically
-  # update delete_at to a sensible value.
   def default_trash_interval
     if trash_at_changed? && !delete_at_changed?
+      # If trash_at is updated without touching delete_at,
+      # automatically update delete_at to a sensible value.
       if trash_at.nil?
         self.delete_at = nil
       else
         self.delete_at = trash_at + Rails.configuration.default_trash_lifetime.seconds
       end
+    elsif !trash_at || !delete_at || trash_at > delete_at
+      # Not trash, or bogus arguments? Just validate in
+      # validate_trash_and_delete_timing.
+    elsif delete_at_changed? && delete_at >= trash_at
+      # Fix delete_at if needed, so it's not earlier than the expiry
+      # time on any permission tokens that might have been given out.
+
+      # In any case there are no signatures expiring after now+TTL.
+      # Also, if the existing trash_at time has already passed, we
+      # know we haven't given out any signatures since then.
+      earliest_delete = [
+        @validation_timestamp,
+        trash_at_was,
+      ].compact.min + Rails.configuration.blob_signature_ttl.seconds
+
+      # The previous value of delete_at is also an upper bound on the
+      # longest-lived permission token. For example, if TTL=14,
+      # trash_at_was=now-7, delete_at_was=now+7, then it is safe to
+      # set trash_at=now+6, delete_at=now+8.
+      earliest_delete = [earliest_delete, delete_at_was].compact.min
+
+      # If delete_at is too soon, use the earliest possible time.
+      if delete_at < earliest_delete
+        self.delete_at = earliest_delete
+      end
     end
   end
 
   def validate_trash_and_delete_timing
     if trash_at.nil? != delete_at.nil?
       errors.add :delete_at, "must be set if trash_at is set, and must be nil otherwise"
-    end
-
-    earliest_delete = ([@validation_timestamp, trash_at_was].compact.min +
-                       Rails.configuration.blob_signature_ttl.seconds)
-    if delete_at && delete_at < earliest_delete
-      errors.add :delete_at, "#{delete_at} is too soon: earliest allowed is #{earliest_delete}"
-    end
-
-    if delete_at && delete_at < trash_at
+    elsif delete_at && delete_at < trash_at
       errors.add :delete_at, "must not be earlier than trash_at"
     end
-
     true
   end
 end
index 694c174812dc998996f5ec6aef18b63307ea14e2..628ef886be99f06cdad61e95de4719ef0201b1ee 100644 (file)
@@ -18,8 +18,9 @@ class ContainerRequest < ArvadosModel
   before_validation :validate_scheduling_parameters
   before_validation :set_container
   validates :command, :container_image, :output_path, :cwd, :presence => true
+  validates :output_ttl, numericality: { only_integer: true, greater_than_or_equal_to: 0 }
   validate :validate_state_change
-  validate :validate_change
+  validate :check_update_whitelist
   after_save :update_priority
   after_save :finalize_if_needed
   before_create :set_requesting_container_uuid
@@ -41,6 +42,7 @@ class ContainerRequest < ArvadosModel
     t.add :output_name
     t.add :output_path
     t.add :output_uuid
+    t.add :output_ttl
     t.add :priority
     t.add :properties
     t.add :requesting_container_uuid
@@ -64,6 +66,13 @@ class ContainerRequest < ArvadosModel
     Committed => [Final]
   }
 
+  AttrsPermittedAlways = [:owner_uuid, :state, :name, :description]
+  AttrsPermittedBeforeCommit = [:command, :container_count_max,
+  :container_image, :cwd, :environment, :filters, :mounts,
+  :output_path, :priority, :properties, :requesting_container_uuid,
+  :runtime_constraints, :state, :container_uuid, :use_existing,
+  :scheduling_parameters, :output_name, :output_ttl]
+
   def state_transitions
     State_transitions
   end
@@ -91,10 +100,15 @@ class ContainerRequest < ArvadosModel
     ['output', 'log'].each do |out_type|
       pdh = c.send(out_type)
       next if pdh.nil?
-      if self.output_name and out_type == 'output'
-        coll_name = self.output_name
-      else
-        coll_name = "Container #{out_type} for request #{uuid}"
+      coll_name = "Container #{out_type} for request #{uuid}"
+      trash_at = nil
+      if out_type == 'output'
+        if self.output_name
+          coll_name = self.output_name
+        end
+        if self.output_ttl > 0
+          trash_at = db_current_time + self.output_ttl
+        end
       end
       manifest = Collection.unscoped do
         Collection.where(portable_data_hash: pdh).first.manifest_text
@@ -104,6 +118,8 @@ class ContainerRequest < ArvadosModel
                             manifest_text: manifest,
                             portable_data_hash: pdh,
                             name: coll_name,
+                            trash_at: trash_at,
+                            delete_at: trash_at,
                             properties: {
                               'type' => out_type,
                               'container_request' => uuid,
@@ -132,6 +148,7 @@ class ContainerRequest < ArvadosModel
     self.cwd ||= "."
     self.container_count_max ||= Rails.configuration.container_count_max
     self.scheduling_parameters ||= {}
+    self.output_ttl ||= 0
   end
 
   def set_container
@@ -183,57 +200,45 @@ class ContainerRequest < ArvadosModel
     end
   end
 
-  def validate_change
-    permitted = [:owner_uuid]
+  def check_update_whitelist
+    permitted = AttrsPermittedAlways.dup
 
-    case self.state
-    when Uncommitted
-      # Permit updating most fields
-      permitted.push :command, :container_count_max,
-                     :container_image, :cwd, :description, :environment,
-                     :filters, :mounts, :name, :output_path, :priority,
-                     :properties, :requesting_container_uuid, :runtime_constraints,
-                     :state, :container_uuid, :use_existing, :scheduling_parameters,
-                     :output_name
+    if self.new_record? || self.state_was == Uncommitted
+      # Allow create-and-commit in a single operation.
+      permitted.push *AttrsPermittedBeforeCommit
+    end
 
+    case self.state
     when Committed
-      if container_uuid.nil?
-        errors.add :container_uuid, "has not been resolved to a container."
-      end
+      permitted.push :priority, :container_count_max, :container_uuid
 
-      if priority.nil?
-        errors.add :priority, "cannot be nil"
+      if self.container_uuid.nil?
+        self.errors.add :container_uuid, "has not been resolved to a container."
       end
 
-      # Can update priority, container count, name and description
-      permitted.push :priority, :container_count, :container_count_max, :container_uuid,
-                     :name, :description
+      if self.priority.nil?
+        self.errors.add :priority, "cannot be nil"
+      end
 
-      if self.state_changed?
-        # Allow create-and-commit in a single operation.
-        permitted.push :command, :container_image, :cwd, :description, :environment,
-                       :filters, :mounts, :name, :output_path, :properties,
-                       :requesting_container_uuid, :runtime_constraints,
-                       :state, :container_uuid, :use_existing, :scheduling_parameters,
-                       :output_name
+      # Allow container count to increment by 1
+      if (self.container_uuid &&
+          self.container_uuid != self.container_uuid_was &&
+          self.container_count == 1 + (self.container_count_was || 0))
+        permitted.push :container_count
       end
 
     when Final
-      if not current_user.andand.is_admin and not (self.name_changed? || self.description_changed?)
-        errors.add :state, "of container request can only be set to Final by system."
+      if self.state_changed? and not current_user.andand.is_admin
+        self.errors.add :state, "of container request can only be set to Final by system."
       end
 
-      if self.state_changed? || self.name_changed? || self.description_changed? || self.output_uuid_changed? || self.log_uuid_changed?
-          permitted.push :state, :name, :description, :output_uuid, :log_uuid
-      else
-        errors.add :state, "does not allow updates"
+      if self.state_was == Committed
+        permitted.push :output_uuid, :log_uuid
       end
 
-    else
-      errors.add :state, "invalid value"
     end
 
-    check_update_whitelist permitted
+    super(permitted)
   end
 
   def update_priority
index c7ac090adc3b9d7bc6b265707ac0c3b4973521e7..82ea0acbd63d9b12e2b31cd4d35fec783b869b59 100644 (file)
@@ -1,3 +1,5 @@
+require 'tempfile'
+
 class Node < ArvadosModel
   include HasUuid
   include KindAndEtag
@@ -171,22 +173,30 @@ class Node < ArvadosModel
     }
 
     if Rails.configuration.dns_server_conf_dir and Rails.configuration.dns_server_conf_template
+      tmpfile = nil
       begin
         begin
           template = IO.read(Rails.configuration.dns_server_conf_template)
-        rescue => e
+        rescue IOError, SystemCallError => e
           logger.error "Reading #{Rails.configuration.dns_server_conf_template}: #{e.message}"
           raise
         end
 
         hostfile = File.join Rails.configuration.dns_server_conf_dir, "#{hostname}.conf"
-        File.open hostfile+'.tmp', 'w' do |f|
+        Tempfile.open(["#{hostname}-", ".conf.tmp"],
+                                 Rails.configuration.dns_server_conf_dir) do |f|
+          tmpfile = f.path
           f.puts template % template_vars
         end
-        File.rename hostfile+'.tmp', hostfile
-      rescue => e
+        File.rename tmpfile, hostfile
+      rescue IOError, SystemCallError => e
         logger.error "Writing #{hostfile}: #{e.message}"
         ok = false
+      ensure
+        if tmpfile and File.file? tmpfile
+          # Cleanup remaining temporary file.
+          File.unlink tmpfile
+        end
       end
     end
 
@@ -205,7 +215,7 @@ class Node < ArvadosModel
           # Typically, this is used to trigger a dns server restart
           f.puts Rails.configuration.dns_server_reload_command
         end
-      rescue => e
+      rescue IOError, SystemCallError => e
         logger.error "Unable to write #{restartfile}: #{e.message}"
         ok = false
       end
diff --git a/services/api/db/migrate/20170330012505_add_output_ttl_to_container_requests.rb b/services/api/db/migrate/20170330012505_add_output_ttl_to_container_requests.rb
new file mode 100644 (file)
index 0000000..ee6fa37
--- /dev/null
@@ -0,0 +1,5 @@
+class AddOutputTtlToContainerRequests < ActiveRecord::Migration
+  def change
+    add_column :container_requests, :output_ttl, :integer, default: 0, null: false
+  end
+end
index d877452f200178c4df67610ce57a7cfc9dd0f236..e25a2a960571f21b823f14d4d45e619fa2c0ae9a 100644 (file)
@@ -297,7 +297,8 @@ CREATE TABLE container_requests (
     scheduling_parameters text,
     output_uuid character varying(255),
     log_uuid character varying(255),
-    output_name character varying(255) DEFAULT NULL::character varying
+    output_name character varying(255) DEFAULT NULL::character varying,
+    output_ttl integer DEFAULT 0 NOT NULL
 );
 
 
@@ -2753,4 +2754,6 @@ INSERT INTO schema_migrations (version) VALUES ('20170216170823');
 
 INSERT INTO schema_migrations (version) VALUES ('20170301225558');
 
-INSERT INTO schema_migrations (version) VALUES ('20170328215436');
\ No newline at end of file
+INSERT INTO schema_migrations (version) VALUES ('20170328215436');
+
+INSERT INTO schema_migrations (version) VALUES ('20170330012505');
\ No newline at end of file
index 87bec21520a05b27c76e807555ba84b45739500d..882e26059ced5a30df507143e0216e6695bc1e08 100644 (file)
@@ -370,21 +370,26 @@ class CollectionTest < ActiveSupport::TestCase
     end
   end
 
+  now = Time.now
   [['trash-to-delete interval negative',
     :collection_owned_by_active,
-    {trash_at: Time.now+2.weeks, delete_at: Time.now},
+    {trash_at: now+2.weeks, delete_at: now},
     {state: :invalid}],
-   ['trash-to-delete interval too short',
+   ['now-to-delete interval short',
     :collection_owned_by_active,
-    {trash_at: Time.now+3.days, delete_at: Time.now+7.days},
-    {state: :invalid}],
+    {trash_at: now+3.days, delete_at: now+7.days},
+    {state: :trash_future}],
+   ['now-to-delete interval short, trash=delete',
+    :collection_owned_by_active,
+    {trash_at: now+3.days, delete_at: now+3.days},
+    {state: :trash_future}],
    ['trash-to-delete interval ok',
     :collection_owned_by_active,
-    {trash_at: Time.now, delete_at: Time.now+15.days},
+    {trash_at: now, delete_at: now+15.days},
     {state: :trash_now}],
    ['trash-to-delete interval short, but far enough in future',
     :collection_owned_by_active,
-    {trash_at: Time.now+13.days, delete_at: Time.now+15.days},
+    {trash_at: now+13.days, delete_at: now+15.days},
     {state: :trash_future}],
    ['trash by setting is_trashed bool',
     :collection_owned_by_active,
@@ -392,11 +397,11 @@ class CollectionTest < ActiveSupport::TestCase
     {state: :trash_now}],
    ['trash in future by setting just trash_at',
     :collection_owned_by_active,
-    {trash_at: Time.now+1.week},
+    {trash_at: now+1.week},
     {state: :trash_future}],
    ['trash in future by setting trash_at and delete_at',
     :collection_owned_by_active,
-    {trash_at: Time.now+1.week, delete_at: Time.now+4.weeks},
+    {trash_at: now+1.week, delete_at: now+4.weeks},
     {state: :trash_future}],
    ['untrash by clearing is_trashed bool',
     :expired_collection,
@@ -416,7 +421,7 @@ class CollectionTest < ActiveSupport::TestCase
         end
         updates_ok = c.update_attributes(updates)
         expect_valid = expect[:state] != :invalid
-        assert_equal updates_ok, expect_valid, c.errors.full_messages.to_s
+        assert_equal expect_valid, updates_ok, c.errors.full_messages.to_s
         case expect[:state]
         when :invalid
           refute c.valid?
index b268ce4220a3be562ea8c909099646359d2e1f52..a3dd1f9835e14ee40c475b80d41d0ec0521378aa 100644 (file)
@@ -3,6 +3,7 @@ require 'helpers/docker_migration_helper'
 
 class ContainerRequestTest < ActiveSupport::TestCase
   include DockerMigrationHelper
+  include DbCurrentTime
 
   def create_minimal_req! attrs={}
     defaults = {
@@ -579,38 +580,65 @@ class ContainerRequestTest < ActiveSupport::TestCase
 
   test "Output collection name setting using output_name with name collision resolution" do
     set_user_from_auth :active
-    output_name = collections(:foo_file).name
+    output_name = 'unimaginative name'
+    Collection.create!(name: output_name)
 
     cr = create_minimal_req!(priority: 1,
                              state: ContainerRequest::Committed,
                              output_name: output_name)
-    act_as_system_user do
-      c = Container.find_by_uuid(cr.container_uuid)
-      c.update_attributes!(state: Container::Locked)
-      c.update_attributes!(state: Container::Running)
-      c.update_attributes!(state: Container::Complete,
-                           exit_code: 0,
-                           output: '1f4b0bc7583c2a7f9102c395f4ffc5e3+45',
-                           log: 'fa7aeb5140e2848d39b416daeef4ffc5+45')
-    end
-    cr.save
+    run_container(cr)
+    cr.reload
     assert_equal ContainerRequest::Final, cr.state
     output_coll = Collection.find_by_uuid(cr.output_uuid)
     # Make sure the resulting output collection name include the original name
     # plus the date
     assert_not_equal output_name, output_coll.name,
-                     "It shouldn't exist more than one collection with the same owner and name '${output_name}'"
+                     "more than one collection with the same owner and name"
     assert output_coll.name.include?(output_name),
            "New name should include original name"
     assert_match /\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z/, output_coll.name,
                  "New name should include ISO8601 date"
   end
 
-  test "Finalize committed request when reusing a finished container" do
-    set_user_from_auth :active
-    cr = create_minimal_req!(priority: 1, state: ContainerRequest::Committed)
-    cr.reload
-    assert_equal ContainerRequest::Committed, cr.state
+  [[0, :check_output_ttl_0],
+   [1, :check_output_ttl_1s],
+   [365*86400, :check_output_ttl_1y],
+  ].each do |ttl, checker|
+    test "output_ttl=#{ttl}" do
+      act_as_user users(:active) do
+        cr = create_minimal_req!(priority: 1,
+                                 state: ContainerRequest::Committed,
+                                 output_name: 'foo',
+                                 output_ttl: ttl)
+        run_container(cr)
+        cr.reload
+        output = Collection.find_by_uuid(cr.output_uuid)
+        send(checker, db_current_time, output.trash_at, output.delete_at)
+      end
+    end
+  end
+
+  def check_output_ttl_0(now, trash, delete)
+    assert_nil(trash)
+    assert_nil(delete)
+  end
+
+  def check_output_ttl_1s(now, trash, delete)
+    assert_not_nil(trash)
+    assert_not_nil(delete)
+    assert_in_delta(trash, now + 1.second, 10)
+    assert_in_delta(delete, now + Rails.configuration.blob_signature_ttl.second, 10)
+  end
+
+  def check_output_ttl_1y(now, trash, delete)
+    year = (86400*365).second
+    assert_not_nil(trash)
+    assert_not_nil(delete)
+    assert_in_delta(trash, now + year, 10)
+    assert_in_delta(delete, now + year, 10)
+  end
+
+  def run_container(cr)
     act_as_system_user do
       c = Container.find_by_uuid(cr.container_uuid)
       c.update_attributes!(state: Container::Locked)
@@ -619,7 +647,16 @@ class ContainerRequestTest < ActiveSupport::TestCase
                            exit_code: 0,
                            output: '1f4b0bc7583c2a7f9102c395f4ffc5e3+45',
                            log: 'fa7aeb5140e2848d39b416daeef4ffc5+45')
+      c
     end
+  end
+
+  test "Finalize committed request when reusing a finished container" do
+    set_user_from_auth :active
+    cr = create_minimal_req!(priority: 1, state: ContainerRequest::Committed)
+    cr.reload
+    assert_equal ContainerRequest::Committed, cr.state
+    run_container(cr)
     cr.reload
     assert_equal ContainerRequest::Final, cr.state
 
@@ -665,4 +702,48 @@ class ContainerRequestTest < ActiveSupport::TestCase
       end
     end
   end
+
+  [['Committed', true, {name: "foobar", priority: 123}],
+   ['Committed', false, {container_count: 2}],
+   ['Committed', false, {container_count: 0}],
+   ['Committed', false, {container_count: nil}],
+   ['Final', false, {state: ContainerRequest::Committed, name: "foobar"}],
+   ['Final', false, {name: "foobar", priority: 123}],
+   ['Final', false, {name: "foobar", output_uuid: "zzzzz-4zz18-znfnqtbbv4spc3w"}],
+   ['Final', false, {name: "foobar", log_uuid: "zzzzz-4zz18-znfnqtbbv4spc3w"}],
+   ['Final', false, {log_uuid: "zzzzz-4zz18-znfnqtbbv4spc3w"}],
+   ['Final', false, {priority: 123}],
+   ['Final', false, {mounts: {}}],
+   ['Final', false, {container_count: 2}],
+   ['Final', true, {name: "foobar"}],
+   ['Final', true, {name: "foobar", description: "baz"}],
+  ].each do |state, permitted, updates|
+    test "state=#{state} can#{'not' if !permitted} update #{updates.inspect}" do
+      act_as_user users(:active) do
+        cr = create_minimal_req!(priority: 1,
+                                 state: "Committed",
+                                 container_count_max: 1)
+        case state
+        when 'Committed'
+          # already done
+        when 'Final'
+          act_as_system_user do
+            Container.find_by_uuid(cr.container_uuid).
+              update_attributes!(state: Container::Cancelled)
+          end
+          cr.reload
+        else
+          raise 'broken test case'
+        end
+        assert_equal state, cr.state
+        if permitted
+          assert cr.update_attributes!(updates)
+        else
+          assert_raises(ActiveRecord::RecordInvalid) do
+            cr.update_attributes!(updates)
+          end
+        end
+      end
+    end
+  end
 end
index c1e77f6a4d4cf78e6e3ac7ce77b4ffe5a2fd4c9e..2330e7c528f6a304b05cf318fcc2482e91e62b8d 100644 (file)
@@ -1,4 +1,6 @@
 require 'test_helper'
+require 'tmpdir'
+require 'tempfile'
 
 class NodeTest < ActiveSupport::TestCase
   def ping_node(node_name, ping_data)
@@ -76,6 +78,16 @@ class NodeTest < ActiveSupport::TestCase
     assert Node.dns_server_update 'compute65535', '127.0.0.127'
   end
 
+  test "don't leave temp files behind if there's an error writing them" do
+    Rails.configuration.dns_server_conf_template = Rails.root.join 'config', 'unbound.template'
+    Tempfile.any_instance.stubs(:puts).raises(IOError)
+    Dir.mktmpdir do |tmpdir|
+      Rails.configuration.dns_server_conf_dir = tmpdir
+      refute Node.dns_server_update 'compute65535', '127.0.0.127'
+      assert_empty Dir.entries(tmpdir).select{|f| File.file? f}
+    end
+  end
+
   test "ping new node with no hostname and default config" do
     node = ping_node(:new_with_no_hostname, {})
     slot_number = node.slot_number
index a05f61a858c04321f41ff7c8a3faf97a87618431..fd2ce3f659f188dc71a6b8fa72c22fc149faa499 100644 (file)
@@ -1,6 +1,7 @@
 package main
 
 import (
+       "bytes"
        "context"
        "encoding/json"
        "errors"
@@ -134,7 +135,7 @@ type ContainerRunner struct {
        loggingDone   chan bool
        CrunchLog     *ThrottledLogger
        Stdout        io.WriteCloser
-       Stderr        *ThrottledLogger
+       Stderr        io.WriteCloser
        LogCollection *CollectionWriter
        LogsPDH       *string
        RunArvMount
@@ -345,10 +346,10 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
 
        for _, bind := range binds {
                mnt := runner.Container.Mounts[bind]
-               if bind == "stdout" {
+               if bind == "stdout" || bind == "stderr" {
                        // Is it a "file" mount kind?
                        if mnt.Kind != "file" {
-                               return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind)
+                               return fmt.Errorf("Unsupported mount kind '%s' for %s. Only 'file' is supported.", mnt.Kind, bind)
                        }
 
                        // Does path start with OutputPath?
@@ -357,7 +358,14 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                                prefix += "/"
                        }
                        if !strings.HasPrefix(mnt.Path, prefix) {
-                               return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix)
+                               return fmt.Errorf("%s path does not start with OutputPath: %s, %s", strings.Title(bind), mnt.Path, prefix)
+                       }
+               }
+
+               if bind == "stdin" {
+                       // Is it a "collection" mount kind?
+                       if mnt.Kind != "collection" && mnt.Kind != "json" {
+                               return fmt.Errorf("Unsupported mount kind '%s' for stdin. Only 'collection' or 'json' are supported.", mnt.Kind)
                        }
                }
 
@@ -372,7 +380,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                }
 
                switch {
-               case mnt.Kind == "collection":
+               case mnt.Kind == "collection" && bind != "stdin":
                        var src string
                        if mnt.UUID != "" && mnt.PortableDataHash != "" {
                                return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
@@ -657,14 +665,44 @@ func (runner *ContainerRunner) LogContainerRecord() (err error) {
        return nil
 }
 
-// AttachLogs connects the docker container stdout and stderr logs to the
-// Arvados logger which logs to Keep and the API server logs table.
+// AttachStreams connects the docker container stdin, stdout and stderr logs
+// to the Arvados logger which logs to Keep and the API server logs table.
 func (runner *ContainerRunner) AttachStreams() (err error) {
 
        runner.CrunchLog.Print("Attaching container streams")
 
+       // If stdin mount is provided, attach it to the docker container
+       var stdinRdr keepclient.Reader
+       var stdinJson []byte
+       if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok {
+               if stdinMnt.Kind == "collection" {
+                       var stdinColl arvados.Collection
+                       collId := stdinMnt.UUID
+                       if collId == "" {
+                               collId = stdinMnt.PortableDataHash
+                       }
+                       err = runner.ArvClient.Get("collections", collId, nil, &stdinColl)
+                       if err != nil {
+                               return fmt.Errorf("While getting stding collection: %v", err)
+                       }
+
+                       stdinRdr, err = runner.Kc.ManifestFileReader(manifest.Manifest{Text: stdinColl.ManifestText}, stdinMnt.Path)
+                       if os.IsNotExist(err) {
+                               return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path)
+                       } else if err != nil {
+                               return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
+                       }
+               } else if stdinMnt.Kind == "json" {
+                       stdinJson, err = json.Marshal(stdinMnt.Content)
+                       if err != nil {
+                               return fmt.Errorf("While encoding stdin json data: %v", err)
+                       }
+               }
+       }
+
+       stdinUsed := stdinRdr != nil || len(stdinJson) != 0
        response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
-               dockertypes.ContainerAttachOptions{Stream: true, Stdout: true, Stderr: true})
+               dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
        if err != nil {
                return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
        }
@@ -672,37 +710,76 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
        runner.loggingDone = make(chan bool)
 
        if stdoutMnt, ok := runner.Container.Mounts["stdout"]; ok {
-               stdoutPath := stdoutMnt.Path[len(runner.Container.OutputPath):]
-               index := strings.LastIndex(stdoutPath, "/")
-               if index > 0 {
-                       subdirs := stdoutPath[:index]
-                       if subdirs != "" {
-                               st, err := os.Stat(runner.HostOutputDir)
-                               if err != nil {
-                                       return fmt.Errorf("While Stat on temp dir: %v", err)
-                               }
-                               stdoutPath := path.Join(runner.HostOutputDir, subdirs)
-                               err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
-                               if err != nil {
-                                       return fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
-                               }
-                       }
-               }
-               stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
+               stdoutFile, err := runner.getStdoutFile(stdoutMnt.Path)
                if err != nil {
-                       return fmt.Errorf("While creating stdout file: %v", err)
+                       return err
                }
                runner.Stdout = stdoutFile
        } else {
                runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
        }
-       runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
+
+       if stderrMnt, ok := runner.Container.Mounts["stderr"]; ok {
+               stderrFile, err := runner.getStdoutFile(stderrMnt.Path)
+               if err != nil {
+                       return err
+               }
+               runner.Stderr = stderrFile
+       } else {
+               runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
+       }
+
+       if stdinRdr != nil {
+               go func() {
+                       _, err := io.Copy(response.Conn, stdinRdr)
+                       if err != nil {
+                               runner.CrunchLog.Print("While writing stdin collection to docker container %q", err)
+                               runner.stop()
+                       }
+                       stdinRdr.Close()
+                       response.CloseWrite()
+               }()
+       } else if len(stdinJson) != 0 {
+               go func() {
+                       _, err := io.Copy(response.Conn, bytes.NewReader(stdinJson))
+                       if err != nil {
+                               runner.CrunchLog.Print("While writing stdin json to docker container %q", err)
+                               runner.stop()
+                       }
+                       response.CloseWrite()
+               }()
+       }
 
        go runner.ProcessDockerAttach(response.Reader)
 
        return nil
 }
 
+func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) {
+       stdoutPath := mntPath[len(runner.Container.OutputPath):]
+       index := strings.LastIndex(stdoutPath, "/")
+       if index > 0 {
+               subdirs := stdoutPath[:index]
+               if subdirs != "" {
+                       st, err := os.Stat(runner.HostOutputDir)
+                       if err != nil {
+                               return nil, fmt.Errorf("While Stat on temp dir: %v", err)
+                       }
+                       stdoutPath := path.Join(runner.HostOutputDir, subdirs)
+                       err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
+                       if err != nil {
+                               return nil, fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
+                       }
+               }
+       }
+       stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
+       if err != nil {
+               return nil, fmt.Errorf("While creating file %q: %v", stdoutPath, err)
+       }
+
+       return stdoutFile, nil
+}
+
 // CreateContainer creates the docker container.
 func (runner *ContainerRunner) CreateContainer() error {
        runner.CrunchLog.Print("Creating Docker container")
@@ -743,6 +820,13 @@ func (runner *ContainerRunner) CreateContainer() error {
                }
        }
 
+       _, stdinUsed := runner.Container.Mounts["stdin"]
+       runner.ContainerConfig.OpenStdin = stdinUsed
+       runner.ContainerConfig.StdinOnce = stdinUsed
+       runner.ContainerConfig.AttachStdin = stdinUsed
+       runner.ContainerConfig.AttachStdout = true
+       runner.ContainerConfig.AttachStderr = true
+
        createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID)
        if err != nil {
                return fmt.Errorf("While creating container: %v", err)
index 98462f8fdcfda5a84fb93b5f56a22d536665a0f4..43c55b67c1c08c07f69fe9913e3681e3835026a6 100644 (file)
@@ -10,6 +10,7 @@ import (
        "fmt"
        "io"
        "io/ioutil"
+       "net"
        "os"
        "os/exec"
        "path/filepath"
@@ -94,8 +95,21 @@ func NewTestDockerClient(exitCode int) *TestDockerClient {
        return t
 }
 
+type MockConn struct {
+       net.Conn
+}
+
+func (m *MockConn) Write(b []byte) (int, error) {
+       return len(b), nil
+}
+
+func NewMockConn() *MockConn {
+       c := &MockConn{}
+       return c
+}
+
 func (t *TestDockerClient) ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error) {
-       return dockertypes.HijackedResponse{Reader: bufio.NewReader(t.logReader)}, nil
+       return dockertypes.HijackedResponse{Conn: NewMockConn(), Reader: bufio.NewReader(t.logReader)}, nil
 }
 
 func (t *TestDockerClient) ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig, networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) {
@@ -286,6 +300,10 @@ func (client *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename s
                rdr := ioutil.NopCloser(&bytes.Buffer{})
                client.Called = true
                return FileWrapper{rdr, 1321984}, nil
+       } else if filename == "/file1_in_main.txt" {
+               rdr := ioutil.NopCloser(strings.NewReader("foo"))
+               client.Called = true
+               return FileWrapper{rdr, 3}, nil
        }
        return nil, nil
 }
@@ -1113,6 +1131,22 @@ func (s *TestSuite) TestSetupMounts(c *C) {
                cr.CleanupDirs()
                checkEmpty()
        }
+
+       // Only mount point of kind 'collection' is allowed for stdin
+       {
+               i = 0
+               cr.ArvMountPoint = ""
+               cr.Container.Mounts = make(map[string]arvados.Mount)
+               cr.Container.Mounts = map[string]arvados.Mount{
+                       "stdin": {Kind: "tmp"},
+               }
+
+               err := cr.SetupMounts()
+               c.Check(err, NotNil)
+               c.Check(err, ErrorMatches, `Unsupported mount kind 'tmp' for stdin.*`)
+               cr.CleanupDirs()
+               checkEmpty()
+       }
 }
 
 func (s *TestSuite) TestStdout(c *C) {
@@ -1359,3 +1393,103 @@ func (s *TestSuite) TestStdoutWithMountPointsUnderOutputDirDenormalizedManifest(
                }
        }
 }
+
+func (s *TestSuite) TestStdinCollectionMountPoint(c *C) {
+       helperRecord := `{
+               "command": ["/bin/sh", "-c", "echo $FROBIZ"],
+               "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+               "cwd": "/bin",
+               "environment": {"FROBIZ": "bilbo"},
+               "mounts": {
+        "/tmp": {"kind": "tmp"},
+        "stdin": {"kind": "collection", "portable_data_hash": "b0def87f80dd594d4675809e83bd4f15+367", "path": "/file1_in_main.txt"},
+        "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"}
+    },
+               "output_path": "/tmp",
+               "priority": 1,
+               "runtime_constraints": {}
+       }`
+
+       extraMounts := []string{
+               "b0def87f80dd594d4675809e83bd4f15+367/file1_in_main.txt",
+       }
+
+       api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+               t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
+               t.logWriter.Close()
+       })
+
+       c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+       c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+       for _, v := range api.Content {
+               if v["collection"] != nil {
+                       collection := v["collection"].(arvadosclient.Dict)
+                       if strings.Index(collection["name"].(string), "output") == 0 {
+                               manifest := collection["manifest_text"].(string)
+                               c.Check(manifest, Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
+`)
+                       }
+               }
+       }
+}
+
+func (s *TestSuite) TestStdinJsonMountPoint(c *C) {
+       helperRecord := `{
+               "command": ["/bin/sh", "-c", "echo $FROBIZ"],
+               "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+               "cwd": "/bin",
+               "environment": {"FROBIZ": "bilbo"},
+               "mounts": {
+        "/tmp": {"kind": "tmp"},
+        "stdin": {"kind": "json", "content": "foo"},
+        "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"}
+    },
+               "output_path": "/tmp",
+               "priority": 1,
+               "runtime_constraints": {}
+       }`
+
+       api, _, _ := FullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
+               t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
+               t.logWriter.Close()
+       })
+
+       c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+       c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+       for _, v := range api.Content {
+               if v["collection"] != nil {
+                       collection := v["collection"].(arvadosclient.Dict)
+                       if strings.Index(collection["name"].(string), "output") == 0 {
+                               manifest := collection["manifest_text"].(string)
+                               c.Check(manifest, Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
+`)
+                       }
+               }
+       }
+}
+
+func (s *TestSuite) TestStderrMount(c *C) {
+       api, _, _ := FullRunHelper(c, `{
+    "command": ["/bin/sh", "-c", "echo hello;exit 1"],
+    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "cwd": ".",
+    "environment": {},
+    "mounts": {"/tmp": {"kind": "tmp"},
+               "stdout": {"kind": "file", "path": "/tmp/a/out.txt"},
+               "stderr": {"kind": "file", "path": "/tmp/b/err.txt"}},
+    "output_path": "/tmp",
+    "priority": 1,
+    "runtime_constraints": {}
+}`, nil, 1, func(t *TestDockerClient) {
+               t.logWriter.Write(dockerLog(1, "hello\n"))
+               t.logWriter.Write(dockerLog(2, "oops\n"))
+               t.logWriter.Close()
+       })
+
+       final := api.CalledWith("container.state", "Complete")
+       c.Assert(final, NotNil)
+       c.Check(final["container"].(arvadosclient.Dict)["exit_code"], Equals, 1)
+       c.Check(final["container"].(arvadosclient.Dict)["log"], NotNil)
+
+       c.Check(api.CalledWith("collection.manifest_text", "./a b1946ac92492d2347c6235b4d2611184+6 0:6:out.txt\n./b 38af5c54926b620264ab1501150cf189+5 0:5:err.txt\n"), NotNil)
+}
index fea6048798317f904393e5e9b0511895b68338ee..ef3e78e8709c57c03b2fc63cd05259cdc3dc91a7 100644 (file)
@@ -93,9 +93,9 @@ class ArgumentParser(argparse.ArgumentParser):
 
         unmount = self.add_mutually_exclusive_group()
         unmount.add_argument('--unmount', action='store_true', default=False,
-                             help="Forcefully unmount the specified mountpoint (if it's a fuse mount) and exit.")
+                             help="Forcefully unmount the specified mountpoint (if it's a fuse mount) and exit. If --subtype is given, unmount only if the mount has the specified subtype. WARNING: This command can affect any kind of fuse mount, not just arv-mount.")
         unmount.add_argument('--unmount-all', action='store_true', default=False,
-                             help="Forcefully unmount every fuse mount at or below the specified mountpoint and exit.")
+                             help="Forcefully unmount every fuse mount at or below the specified path and exit. If --subtype is given, unmount only mounts that have the specified subtype. Exit non-zero if any other types of mounts are found at or below the given path. WARNING: This command can affect any kind of fuse mount, not just arv-mount.")
         unmount.add_argument('--replace', action='store_true', default=False,
                              help="If a fuse mount is already present at mountpoint, forcefully unmount it before mounting")
         self.add_argument('--unmount-timeout',
@@ -159,6 +159,7 @@ class Mount(object):
     def run(self):
         if self.args.unmount or self.args.unmount_all:
             unmount(path=self.args.mountpoint,
+                    subtype=self.args.subtype,
                     timeout=self.args.unmount_timeout,
                     recursive=self.args.unmount_all)
         elif self.args.exec_args:
index db78ddc738311b40914087c7b07fd0e27d6700f1..e213c733d1768c6c7cb37346d3840b0d73bdcb54 100644 (file)
@@ -26,7 +26,7 @@ def mountinfo():
     return mi
 
 
-def unmount(path, timeout=10, recursive=False):
+def unmount(path, subtype=None, timeout=10, recursive=False):
     """Unmount the fuse mount at path.
 
     Unmounting is done by writing 1 to the "abort" control file in
@@ -43,15 +43,23 @@ def unmount(path, timeout=10, recursive=False):
 
     path = os.path.realpath(path)
 
+    if subtype is None:
+        mnttype = None
+    elif subtype == '':
+        mnttype = 'fuse'
+    else:
+        mnttype = 'fuse.' + subtype
+
     if recursive:
         paths = []
         for m in mountinfo():
             if m.path == path or m.path.startswith(path+"/"):
                 paths.append(m.path)
-                if not m.is_fuse:
+                if not (m.is_fuse and (mnttype is None or
+                                       mnttype == m.mnttype)):
                     raise Exception(
-                        "cannot unmount {}: non-fuse mountpoint {}".format(
-                            path, m))
+                        "cannot unmount {}: mount type is {}".format(
+                            path, m.mnttype))
         for path in sorted(paths, key=len, reverse=True):
             unmount(path, timeout=timeout, recursive=False)
         return len(paths) > 0
@@ -66,7 +74,7 @@ def unmount(path, timeout=10, recursive=False):
     while True:
         mounted = False
         for m in mountinfo():
-            if m.is_fuse:
+            if m.is_fuse and (mnttype is None or mnttype == m.mnttype):
                 try:
                     if os.path.realpath(m.path) == path:
                         was_mounted = True
index 972edaadc8baaf2bdd7cc5c2312473bc6adf81fd..716a0e00d704d16b5edcb8436b3ca68de157024b 100644 (file)
@@ -32,10 +32,41 @@ class UnmountTest(IntegrationTest):
             self.assertNotIn(' '+self.mnt+' ', m)
 
     def _mounted(self, mounts):
-        all_mounts = subprocess.check_output(['mount', '-t', 'fuse.test'])
+        all_mounts = subprocess.check_output(['mount'])
         return [m for m in mounts
                 if ' '+m+' ' in all_mounts]
 
+    def _wait_for_mounts(self, mounts):
+        deadline = time.time() + 10
+        while self._mounted(mounts) != mounts:
+            time.sleep(0.1)
+            self.assertLess(time.time(), deadline)
+
+    def test_unmount_subtype(self):
+        mounts = []
+        for d in ['foo', 'bar']:
+            mnt = self.tmp+'/'+d
+            os.mkdir(mnt)
+            self.to_delete.insert(0, mnt)
+            mounts.append(mnt)
+            subprocess.check_call(
+                ['./bin/arv-mount', '--subtype', d, mnt])
+
+        self._wait_for_mounts(mounts)
+        self.assertEqual(mounts, self._mounted(mounts))
+        subprocess.call(['./bin/arv-mount', '--subtype', 'baz', '--unmount-all', self.tmp])
+        self.assertEqual(mounts, self._mounted(mounts))
+        subprocess.call(['./bin/arv-mount', '--subtype', 'bar', '--unmount', mounts[0]])
+        self.assertEqual(mounts, self._mounted(mounts))
+        subprocess.call(['./bin/arv-mount', '--subtype', '', '--unmount', self.tmp])
+        self.assertEqual(mounts, self._mounted(mounts))
+        subprocess.check_call(['./bin/arv-mount', '--subtype', 'foo', '--unmount', mounts[0]])
+        self.assertEqual(mounts[1:], self._mounted(mounts))
+        subprocess.check_call(['./bin/arv-mount', '--subtype', '', '--unmount-all', mounts[0]])
+        self.assertEqual(mounts[1:], self._mounted(mounts))
+        subprocess.check_call(['./bin/arv-mount', '--subtype', 'bar', '--unmount-all', self.tmp])
+        self.assertEqual([], self._mounted(mounts))
+
     def test_unmount_children(self):
         for d in ['foo', 'foo/bar', 'bar']:
             mnt = self.tmp+'/'+d
@@ -48,12 +79,7 @@ class UnmountTest(IntegrationTest):
             subprocess.check_call(
                 ['./bin/arv-mount', '--subtype', 'test', mnt])
 
-        # Wait for mounts to attach
-        deadline = time.time() + 10
-        while self._mounted(mounts) != mounts:
-            time.sleep(0.1)
-            self.assertLess(time.time(), deadline)
-
+        self._wait_for_mounts(mounts)
         self.assertEqual(mounts, self._mounted(mounts))
         subprocess.check_call(['./bin/arv-mount', '--unmount', self.tmp])
         self.assertEqual(mounts, self._mounted(mounts))
index 71f9083c01a3e99b35020c69cf2dee301f7e848e..9ee26e336d2849b163d02ca77fdac39012afd7c1 100644 (file)
@@ -121,8 +121,14 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
                           self.cloud_size.name)
         self.cloud_node = self._cloud.create_node(self.cloud_size,
                                                   self.arvados_node)
-        if not self.cloud_node.size:
-             self.cloud_node.size = self.cloud_size
+
+        # The information included in the node size object we get from libcloud
+        # is inconsistent between cloud providers.  Replace libcloud NodeSize
+        # object with compatible CloudSizeWrapper object which merges the size
+        # info reported from the cloud with size information from the
+        # configuration file.
+        self.cloud_node.size = self.cloud_size
+
         self._logger.info("Cloud node %s created.", self.cloud_node.id)
         self._later.update_arvados_node_properties()
 
index c78f1c6b8d63160c40e7e57d9b29f57d07e59dcf..29b04845b653190f88c306094486ac030add3af7 100644 (file)
@@ -219,6 +219,23 @@ class BaseComputeNodeDriver(RetryMixin):
         return (isinstance(exception, cls.CLOUD_ERRORS) or
                 type(exception) is Exception)
 
+    def destroy_node(self, cloud_node):
+        try:
+            return self.real.destroy_node(cloud_node)
+        except self.CLOUD_ERRORS as destroy_error:
+            # Sometimes the destroy node request succeeds but times out and
+            # raises an exception instead of returning success.  If this
+            # happens, we get a noisy stack trace.  Check if the node is still
+            # on the node list.  If it is gone, we can declare victory.
+            try:
+                self.search_for_now(cloud_node.id, 'list_nodes')
+            except ValueError:
+                # If we catch ValueError, that means search_for_now didn't find
+                # it, which means destroy_node actually succeeded.
+                return True
+            # The node is still on the list.  Re-raise.
+            raise
+
     # Now that we've defined all our own methods, delegate generic, public
     # attributes of libcloud drivers that we haven't defined ourselves.
     def _delegate_to_real(attr_name):
index 1c6d214fe8818e9dd49e94b413daa6609096a4c8..79e43cb52a881f37f0e91819d25977828e057941 100644 (file)
@@ -136,6 +136,10 @@ class ComputeNodeDriver(BaseComputeNodeDriver):
             raise
 
     def sync_node(self, cloud_node, arvados_node):
+        # Update the cloud node record to ensure we have the correct metadata
+        # fingerprint.
+        cloud_node = self.real.ex_get_node(cloud_node.name, cloud_node.extra['zone'])
+
         # We can't store the FQDN on the name attribute or anything like it,
         # because (a) names are static throughout the node's life (so FQDN
         # isn't available because we don't know it at node creation time) and
@@ -147,12 +151,8 @@ class ComputeNodeDriver(BaseComputeNodeDriver):
             self._find_metadata(metadata_items, 'hostname')['value'] = hostname
         except KeyError:
             metadata_items.append({'key': 'hostname', 'value': hostname})
-        response = self.real.connection.async_request(
-            '/zones/{}/instances/{}/setMetadata'.format(
-                cloud_node.extra['zone'].name, cloud_node.name),
-            method='POST', data=metadata_req)
-        if not response.success():
-            raise Exception("setMetadata error: {}".format(response.error))
+
+        self.real.ex_set_node_metadata(cloud_node, metadata_items)
 
     @classmethod
     def node_fqdn(cls, node):
index aa9b3e290fe42123ca19209de45781a58f2eda94..30e8995baa26bdcc22154570dfa099aa5658d341 100644 (file)
@@ -47,6 +47,8 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser):
                        'node_stale_after': str(60 * 60 * 2),
                        'watchdog': '600',
                        'node_mem_scaling': '0.95'},
+            'Manage': {'address': '127.0.0.1',
+                       'port': '-1'},
             'Logging': {'file': '/dev/stderr',
                         'level': 'WARNING'},
         }.iteritems():
index f23b2615e29876aeb5689c79959fe605d09d7e11..9bfee79b59bae21968064b995e5cd87df7d7c7b9 100644 (file)
@@ -9,6 +9,7 @@ import time
 import pykka
 
 from . import computenode as cnode
+from . import status
 from .computenode import dispatch
 from .config import actor_class
 
@@ -253,6 +254,18 @@ class NodeManagerDaemonActor(actor_class):
                     states.append("shutdown")
         return states + pykka.get_all(proxy_states)
 
+    def _update_tracker(self):
+        updates = {
+            k: 0
+            for k in status.tracker.keys()
+            if k.startswith('nodes_')
+        }
+        for s in self._node_states(size=None):
+            updates.setdefault('nodes_'+s, 0)
+            updates['nodes_'+s] += 1
+        updates['nodes_wish'] = len(self.last_wishlist)
+        status.tracker.update(updates)
+
     def _state_counts(self, size):
         states = self._node_states(size)
         counts = {
@@ -336,7 +349,11 @@ class NodeManagerDaemonActor(actor_class):
                 elif (nodes_wanted < 0) and self.booting:
                     self._later.stop_booting_node(size)
             except Exception as e:
-                self._logger.exception("while calculating nodes wanted for size %s", size)
+                self._logger.exception("while calculating nodes wanted for size %s", getattr(size, "id", "(id not available)"))
+        try:
+            self._update_tracker()
+        except:
+            self._logger.exception("while updating tracker")
 
     def _check_poll_freshness(orig_func):
         """Decorator to inhibit a method when poll information is stale.
index 93f6cbdbe3c60c4736614594fdd2e77666276274..11d38ecb76d22105b289d17cec5081aaa5bf3952 100644 (file)
@@ -13,6 +13,7 @@ import pykka
 import libcloud
 
 from . import config as nmconfig
+from . import status
 from .baseactor import WatchdogActor
 from .daemon import NodeManagerDaemonActor
 from .jobqueue import JobQueueMonitorActor, ServerCalculator
@@ -112,6 +113,8 @@ def main(args=None):
     for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
         signal.signal(sigcode, shutdown_signal)
 
+    status.Server(config).start()
+
     try:
         root_logger = setup_logging(config.get('Logging', 'file'), **config.log_levels())
         root_logger.info("%s %s, libcloud %s", sys.argv[0], __version__, libcloud.__version__)
diff --git a/services/nodemanager/arvnodeman/status.py b/services/nodemanager/arvnodeman/status.py
new file mode 100644 (file)
index 0000000..d21899c
--- /dev/null
@@ -0,0 +1,65 @@
+from __future__ import absolute_import, print_function
+from future import standard_library
+
+import http.server
+import json
+import logging
+import socketserver
+import threading
+
+_logger = logging.getLogger('status.Handler')
+
+
+class Server(socketserver.ThreadingMixIn, http.server.HTTPServer, object):
+    def __init__(self, config):
+        port = config.getint('Manage', 'port')
+        self.enabled = port >= 0
+        if not self.enabled:
+            _logger.warning("Management server disabled. "+
+                            "Use [Manage] config section to enable.")
+            return
+        self._config = config
+        self._tracker = tracker
+        super(Server, self).__init__(
+            (config.get('Manage', 'address'), port), Handler)
+        self._thread = threading.Thread(target=self.serve_forever)
+        self._thread.daemon = True
+
+    def start(self):
+        if self.enabled:
+            self._thread.start()
+
+
+class Handler(http.server.BaseHTTPRequestHandler, object):
+    def do_GET(self):
+        if self.path == '/status.json':
+            self.send_response(200)
+            self.send_header('Content-type', 'application/json')
+            self.end_headers()
+            self.wfile.write(tracker.get_json())
+        else:
+            self.send_response(404)
+
+    def log_message(self, fmt, *args, **kwargs):
+        _logger.info(fmt, *args, **kwargs)
+
+
+class Tracker(object):
+    def __init__(self):
+        self._mtx = threading.Lock()
+        self._latest = {}
+
+    def get_json(self):
+        with self._mtx:
+            return json.dumps(self._latest)
+
+    def keys(self):
+        with self._mtx:
+            return self._latest.keys()
+
+    def update(self, updates):
+        with self._mtx:
+            self._latest.update(updates)
+
+
+tracker = Tracker()
index f253621ced4ecccd578eb0f810d6c8bbcc2d7dee..8d5b855cdb4370927991e0aadc4f077f920d9eaa 100644 (file)
@@ -1,6 +1,16 @@
 # Azure configuration for Arvados Node Manager.
 # All times are in seconds unless specified otherwise.
 
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
 [Daemon]
 # The dispatcher can customize the start and stop procedure for
 # cloud nodes.  For example, the SLURM dispatcher drains nodes
index b25bf940cf4564a9da43b254c08812b74bf8f528..d5bed57b95811795ee83529935edf897f2339b18 100644 (file)
@@ -1,6 +1,16 @@
 # EC2 configuration for Arvados Node Manager.
 # All times are in seconds unless specified otherwise.
 
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
 [Daemon]
 # The dispatcher can customize the start and stop procedure for
 # cloud nodes.  For example, the SLURM dispatcher drains nodes
index ed7bdc3b3d7d9b423b217c3057f83b0ef86823bc..043bb9567d04909ec848f027365e464a859b2a6b 100644 (file)
@@ -1,6 +1,16 @@
 # Google Compute Engine configuration for Arvados Node Manager.
 # All times are in seconds unless specified otherwise.
 
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
 [Daemon]
 # Node Manager will ensure that there are at least this many nodes running at
 # all times.  If node manager needs to start new idle nodes for the purpose of
index 314750e6cd3ade7c6bf6abdd25e4803b6a7acb2c..85b4c7d4a7c3f545e3cdff1d23e8cdef9aaf5d48 100644 (file)
@@ -5,6 +5,10 @@
 # is through the API server Rails console: load the Node object, set its
 # IP address to 10.10.0.N (where N is the cloud node's ID), and save.
 
+[Manage]
+address = 0.0.0.0
+port = 8989
+
 [Daemon]
 min_nodes = 0
 max_nodes = 8
index c30108f44bb65a487945e665e7f2afae91528c00..5eb923eb93079cc28bc0d1836c9f4b6dea6635a2 100644 (file)
@@ -29,17 +29,23 @@ setup(name='arvados-node-manager',
           ('share/doc/arvados-node-manager', ['agpl-3.0.txt', 'README.rst']),
       ],
       install_requires=[
-        'apache-libcloud>=0.16',
-        'arvados-python-client>=0.1.20150206225333',
-        'pykka',
-        'python-daemon',
-        'setuptools'
-        ],
-      dependency_links = [
+          'apache-libcloud>=0.16',
+          'arvados-python-client>=0.1.20150206225333',
+          'future',
+          'pykka',
+          'python-daemon',
+          'setuptools'
+      ],
+      dependency_links=[
           "https://github.com/curoverse/libcloud/archive/apache-libcloud-0.18.1.dev4.zip"
       ],
       test_suite='tests',
-      tests_require=['pbr<1.7.0', 'mock>=1.0', "apache-libcloud==0.18.1.dev4"],
+      tests_require=[
+          'requests',
+          'pbr<1.7.0',
+          'mock>=1.0',
+          'apache-libcloud==0.18.1.dev4',
+      ],
       zip_safe=False,
       cmdclass={'egg_info': tagger},
       )
index 84e061d867ff42033fd526e92440695702a3dd8c..d47dbdfa0306d82a27232f1099f8902085513b75 100644 (file)
@@ -123,16 +123,15 @@ class GCEComputeNodeDriverTestCase(testutil.DriverTestMixin, unittest.TestCase):
         cloud_node = testutil.cloud_node_mock(
             2, metadata=start_metadata.copy(),
             zone=testutil.cloud_object_mock('testzone'))
+        self.driver_mock().ex_get_node.return_value = cloud_node
         driver = self.new_driver()
         driver.sync_node(cloud_node, arv_node)
-        args, kwargs = self.driver_mock().connection.async_request.call_args
-        self.assertEqual('/zones/testzone/instances/2/setMetadata', args[0])
-        for key in ['kind', 'fingerprint']:
-            self.assertEqual(start_metadata[key], kwargs['data'][key])
+        args, kwargs = self.driver_mock().ex_set_node_metadata.call_args
+        self.assertEqual(cloud_node, args[0])
         plain_metadata['hostname'] = 'compute1.zzzzz.arvadosapi.com'
         self.assertEqual(
             plain_metadata,
-            {item['key']: item['value'] for item in kwargs['data']['items']})
+            {item['key']: item['value'] for item in args[1]})
 
     def test_sync_node_updates_hostname_tag(self):
         self.check_sync_node_updates_hostname_tag(
@@ -145,9 +144,7 @@ class GCEComputeNodeDriverTestCase(testutil.DriverTestMixin, unittest.TestCase):
         arv_node = testutil.arvados_node_mock(8)
         cloud_node = testutil.cloud_node_mock(
             9, metadata={}, zone=testutil.cloud_object_mock('failzone'))
-        mock_response = self.driver_mock().connection.async_request()
-        mock_response.success.return_value = False
-        mock_response.error = 'sync error test'
+        mock_response = self.driver_mock().ex_set_node_metadata.side_effect = (Exception('sync error test'),)
         driver = self.new_driver()
         with self.assertRaises(Exception) as err_check:
             driver.sync_node(cloud_node, arv_node)
index e49fc39eed3dad01be48004047341ec650a81a5e..04ff9b6d79962922ea8a3327edc726db528b524e 100644 (file)
@@ -9,9 +9,11 @@ import mock
 import pykka
 
 import arvnodeman.daemon as nmdaemon
+import arvnodeman.status as status
 from arvnodeman.jobqueue import ServerCalculator
 from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
 from . import testutil
+from . import test_status
 import logging
 
 class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
@@ -355,10 +357,16 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         monitor = self.monitor_list()[0].proxy()
         self.daemon.update_server_wishlist([])
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
+        self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
         self.assertTrue(self.node_shutdown.start.called,
                         "daemon did not shut down booted node on offer")
 
+        with test_status.TestServer() as srv:
+            self.assertEqual(0, srv.get_status().get('nodes_unpaired', None))
+            self.assertEqual(1, srv.get_status().get('nodes_shutdown', None))
+            self.assertEqual(0, srv.get_status().get('nodes_wish', None))
+
     def test_booted_node_lifecycle(self):
         cloud_node = testutil.cloud_node_mock(6)
         setup = self.start_node_boot(cloud_node, id_num=6)
index 285aa03c7deaa84a07ab6407151f77f5f2b08a31..b947c955723ab53dcfa36e9694c16c4973560b2b 100644 (file)
@@ -26,7 +26,7 @@ class BogusActor(arvnodeman.baseactor.BaseNodeManagerActor):
     def ping(self):
         # Called by WatchdogActorTest, this delay is longer than the test timeout
         # of 1 second, which should cause the watchdog ping to fail.
-        time.sleep(2)
+        time.sleep(4)
         return True
 
 class ActorUnhandledExceptionTest(testutil.ActorTestMixin, unittest.TestCase):
diff --git a/services/nodemanager/tests/test_status.py b/services/nodemanager/tests/test_status.py
new file mode 100644 (file)
index 0000000..c11fe40
--- /dev/null
@@ -0,0 +1,55 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+from future import standard_library
+
+import requests
+import unittest
+
+import arvnodeman.status as status
+import arvnodeman.config as config
+
+
+class TestServer(object):
+    def __enter__(self):
+        cfg = config.NodeManagerConfig()
+        cfg.set('Manage', 'port', '0')
+        cfg.set('Manage', 'address', '127.0.0.1')
+        self.srv = status.Server(cfg)
+        self.srv.start()
+        addr, port = self.srv.server_address
+        self.srv_base = 'http://127.0.0.1:'+str(port)
+        return self
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        self.srv.shutdown()
+
+    def get_status_response(self):
+        return requests.get(self.srv_base+'/status.json')
+
+    def get_status(self):
+        return self.get_status_response().json()
+
+
+class StatusServerUpdates(unittest.TestCase):
+    def test_updates(self):
+        with TestServer() as srv:
+            for n in [1, 2, 3]:
+                status.tracker.update({'nodes_'+str(n): n})
+                r = srv.get_status_response()
+                self.assertEqual(200, r.status_code)
+                self.assertEqual('application/json', r.headers['content-type'])
+                resp = r.json()
+                self.assertEqual(n, resp['nodes_'+str(n)])
+            self.assertEqual(1, resp['nodes_1'])
+
+
+class StatusServerDisabled(unittest.TestCase):
+    def test_config_disabled(self):
+        cfg = config.NodeManagerConfig()
+        cfg.set('Manage', 'port', '-1')
+        cfg.set('Manage', 'address', '127.0.0.1')
+        self.srv = status.Server(cfg)
+        self.srv.start()
+        self.assertFalse(self.srv.enabled)
+        self.assertFalse(getattr(self.srv, '_thread', False))