Merge branch '8284-fix-slurm-queue-timestamp-check' closes #8284
authorTom Clegg <tom@curoverse.com>
Wed, 10 Feb 2016 15:51:33 +0000 (10:51 -0500)
committerTom Clegg <tom@curoverse.com>
Wed, 10 Feb 2016 15:51:33 +0000 (10:51 -0500)
83 files changed:
apps/workbench/Gemfile.lock
apps/workbench/app/models/arvados_api_client.rb
backports/python-llfuse/fpm-info.sh
crunch_scripts/crunchutil/subst.py
crunch_scripts/run-command
doc/_includes/_install_git.liquid
doc/_includes/_install_redhat_postgres_auth.liquid [new file with mode: 0644]
doc/_includes/_install_ruby_and_bundler.liquid
doc/_includes/_install_runit.liquid [new file with mode: 0644]
doc/install/install-api-server.html.textile.liquid
doc/install/install-arv-git-httpd.html.textile.liquid
doc/install/install-compute-node.html.textile.liquid
doc/install/install-crunch-dispatch.html.textile.liquid
doc/install/install-keep-web.html.textile.liquid
doc/install/install-keepproxy.html.textile.liquid
doc/install/install-keepstore.html.textile.liquid
doc/install/install-shell-server.html.textile.liquid
doc/install/install-sso.html.textile.liquid
doc/install/install-workbench-app.html.textile.liquid
sdk/cli/bin/crunch-job
sdk/python/arvados/events.py
services/api/Gemfile
services/api/Gemfile.lock
services/api/app/controllers/arvados/v1/nodes_controller.rb
services/api/app/models/arvados_model.rb
services/api/fpm-info.sh
services/api/lib/crunch_dispatch.rb
services/api/lib/load_param.rb
services/api/test/fixtures/nodes.yml
services/api/test/functional/arvados/v1/nodes_controller_test.rb
services/api/test/functional/arvados/v1/query_test.rb [new file with mode: 0644]
services/crunchstat/crunchstat.go
services/fuse/arvados_fuse/command.py
services/fuse/arvados_fuse/fusedir.py
services/fuse/fpm-info.sh [new file with mode: 0644]
services/fuse/setup.py
services/fuse/tests/integration_test.py
services/fuse/tests/mount_test_base.py
services/fuse/tests/test_command_args.py
services/fuse/tests/test_exec.py [new file with mode: 0644]
services/keepstore/azure_blob_volume.go
services/keepstore/handler_test.go
services/keepstore/handlers.go
services/keepstore/keepstore.go
services/keepstore/s3_volume.go
services/keepstore/trash_worker.go
services/keepstore/volume.go
services/keepstore/volume_generic_test.go
services/keepstore/volume_test.go
services/keepstore/volume_unix.go
services/keepstore/volume_unix_test.go
services/login-sync/bin/arvados-login-sync
services/nodemanager/arvnodeman/clientactor.py
services/nodemanager/arvnodeman/computenode/__init__.py
services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
services/nodemanager/arvnodeman/computenode/driver/__init__.py
services/nodemanager/arvnodeman/computenode/driver/ec2.py
services/nodemanager/arvnodeman/computenode/driver/gce.py
services/nodemanager/arvnodeman/daemon.py
services/nodemanager/arvnodeman/jobqueue.py
services/nodemanager/arvnodeman/launcher.py
services/nodemanager/arvnodeman/nodelist.py
services/nodemanager/tests/test_clientactor.py
services/nodemanager/tests/test_computenode_dispatch.py
services/nodemanager/tests/test_computenode_driver_azure.py
services/nodemanager/tests/test_computenode_driver_gce.py
services/nodemanager/tests/test_daemon.py
services/nodemanager/tests/testutil.py
tools/crunchstat-summary/MANIFEST.in
tools/crunchstat-summary/bin/crunchstat-summary
tools/crunchstat-summary/crunchstat_summary/__init__.py
tools/crunchstat-summary/crunchstat_summary/chartjs.js [new file with mode: 0644]
tools/crunchstat-summary/crunchstat_summary/chartjs.py [new file with mode: 0644]
tools/crunchstat-summary/crunchstat_summary/command.py
tools/crunchstat-summary/crunchstat_summary/reader.py [new file with mode: 0644]
tools/crunchstat-summary/crunchstat_summary/summarizer.py
tools/crunchstat-summary/setup.py
tools/crunchstat-summary/tests/crunchstat_error_messages.txt [new file with mode: 0644]
tools/crunchstat-summary/tests/logfile_20151204190335.txt.gz.report
tools/crunchstat-summary/tests/logfile_20151210063411.txt.gz.report
tools/crunchstat-summary/tests/logfile_20151210063439.txt.gz.report
tools/crunchstat-summary/tests/test_examples.py

index 303d5830de18774db271edb7891397000e1fddc8..b4e2400beda11a9186dd5f4c04468638f237a517 100644 (file)
@@ -294,6 +294,3 @@ DEPENDENCIES
   therubyracer
   uglifier (>= 1.0.3)
   wiselinks
-
-BUNDLED WITH
-   1.10.6
index 4d549d194728eb00a9f3a2a01fd097d84955a16e..13d4a24c69cc5f7e687c47c0e95ed715ab9f5fa2 100644 (file)
@@ -89,7 +89,10 @@ class ArvadosApiClient
           @api_client.ssl_config.verify_mode = OpenSSL::SSL::VERIFY_NONE
         else
           # Use system CA certificates
-          @api_client.ssl_config.add_trust_ca('/etc/ssl/certs')
+          ["/etc/ssl/certs/ca-certificates.crt",
+           "/etc/pki/tls/certs/ca-bundle.crt"]
+            .select { |ca_path| File.readable?(ca_path) }
+            .each { |ca_path| @api_client.ssl_config.add_trust_ca(ca_path) }
         end
         if Rails.configuration.api_response_compression
           @api_client.transparent_gzip_decompression = true
index a7d9398701b7bc4c405027c26f5133fe7b23d383..327bc5e50f5793c6cb8a81a2f73117fac424e3be 100644 (file)
@@ -11,3 +11,6 @@ esac
 
 # FIXME: Remove this line after #6885 is done.
 fpm_args+=(--iteration 2)
+
+# FIXME: Remove once support for llfuse 0.42+ is in place
+fpm_args+=(-v 0.41.1)
index fad9b060ee65ea574d6c8a3a1f04e528559a6277..bd99d3c71cafc76392be14ec0b0b38973a7c11fc 100644 (file)
@@ -63,7 +63,7 @@ def sub_basename(v):
 def sub_glob(v):
     l = glob.glob(v)
     if len(l) == 0:
-        raise SubstitutionError("$(glob {}) no match fonud".format(v))
+        raise SubstitutionError("$(glob {}) no match found".format(v))
     else:
         return l[0]
 
index a6c5ef981ce35fb5e52f8bab9c6628c803e45872..74793d4fce40ae84a820a72db74b69a6c92f3ec4 100755 (executable)
@@ -397,12 +397,19 @@ try:
     active = 1
     pids = set([s.pid for s in subprocesses])
     while len(pids) > 0:
-        (pid, status) = os.wait()
-        pids.discard(pid)
-        if not taskp.get("task.ignore_rcode"):
-            rcode[pid] = (status >> 8)
+        try:
+            (pid, status) = os.wait()
+        except OSError as e:
+            if e.errno == errno.EINTR:
+                pass
+            else:
+                raise
         else:
-            rcode[pid] = 0
+            pids.discard(pid)
+            if not taskp.get("task.ignore_rcode"):
+                rcode[pid] = (status >> 8)
+            else:
+                rcode[pid] = 0
 
     if sig.sig is not None:
         logger.critical("terminating on signal %s" % sig.sig)
index 2ca1ce494434687a876befc4d73c75fc847a202e..60092c1ee8e4a1a66c0ab09552936479078c5b2b 100644 (file)
@@ -1,3 +1,9 @@
 {% include 'notebox_begin' %}
-Arvados requires git version 1.7.10 or later. If you are using an earlier version of git, please update your git version.
+The Arvados API and Git servers require Git 1.7.10 or later.  You can get this version on CentOS 6 from RepoForge.  "Install the repository":http://repoforge.org/use/, then run:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo yum install --enablerepo=rpmforge-extras git</span>
+</code></pre>
+</notextile>
+
 {% include 'notebox_end' %}
diff --git a/doc/_includes/_install_redhat_postgres_auth.liquid b/doc/_includes/_install_redhat_postgres_auth.liquid
new file mode 100644 (file)
index 0000000..35f8b79
--- /dev/null
@@ -0,0 +1,11 @@
+{% include 'notebox_begin' %}
+
+If you are installing on CentOS 6, you will need to modify PostgreSQL's configuration to allow password authentication for local users. The default configuration allows 'ident' authentication only. The following commands will make the configuration change, and restart PostgreSQL for it to take effect.
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo sed -ri -e 's/^(host +all +all +(127\.0\.0\.1\/32|::1\/128) +)ident$/\1md5/' {{pg_hba_path}}</span>
+~$ <span class="userinput">sudo service {{pg_service}} restart</span>
+</code></pre>
+</notextile>
+
+{% include 'notebox_end' %}
index e79cffb7f86dd9adf5a622b231ceda25e057f4e4..cd1aeaf989caa1b67b775d75b1985cc6d2c1fcdf 100644 (file)
@@ -36,7 +36,7 @@ Install prerequisites for CentOS 6:
 <pre><code><span class="userinput">sudo yum install \
     libyaml-devel glibc-headers autoconf gcc-c++ glibc-devel \
     patch readline-devel zlib-devel libffi-devel openssl-devel \
-    automake libtool bison sqlite-devel
+    automake libtool bison sqlite-devel tar
 </span></code></pre></notextile>
 
 Install prerequisites for Ubuntu 12.04 or 14.04:
diff --git a/doc/_includes/_install_runit.liquid b/doc/_includes/_install_runit.liquid
new file mode 100644 (file)
index 0000000..8a1581c
--- /dev/null
@@ -0,0 +1,13 @@
+On Debian-based systems:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo apt-get install runit</span>
+</code></pre>
+</notextile>
+
+On Red Hat-based systems:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo yum install runit</span>
+</code></pre>
+</notextile>
index fae06decb8ae14403e3afe92aa7e64057e0c2744..91e2c69892820fd2d81d7789bb85c205db4c4cb5 100644 (file)
@@ -52,11 +52,9 @@ Enter password for new role: <span class="userinput">paste-password-you-generate
 Enter it again: <span class="userinput">paste-password-again</span>
 </code></pre></notextile>
 
-{% include 'notebox_begin' %}
-
-This user setup assumes that your PostgreSQL is configured to accept password authentication.  Red Hat systems use ident-based authentication by default.  You may need to either adapt the user creation, or reconfigure PostgreSQL (in @pg_hba.conf@) to accept password authentication.
-
-{% include 'notebox_end' %}
+{% assign pg_hba_path = "/opt/rh/postgresql92/root/var/lib/pgsql/data/pg_hba.conf" %}
+{% assign pg_service = "postgresql92-postgresql" %}
+{% include 'install_redhat_postgres_auth' %}
 
 Create the database:
 
@@ -65,26 +63,13 @@ Create the database:
 </code></pre>
 </notextile>
 
-h2. Set up configuration files
-
-The API server package uses configuration files that you write to @/etc/arvados/api@ and ensures they're consistently deployed.  Create this directory and copy the example configuration files to it:
-
-<notextile>
-<pre><code>~$ <span class="userinput">sudo mkdir -p /etc/arvados/api</span>
-~$ <span class="userinput">sudo chmod 700 /etc/arvados/api</span>
-~$ <span class="userinput">cd /var/www/arvados-api/current</span>
-/var/www/arvados-api/current$ <span class="userinput">sudo cp config/database.yml.example /etc/arvados/api/database.yml</span>
-/var/www/arvados-api/current$ <span class="userinput">sudo cp config/application.yml.example /etc/arvados/api/application.yml</span>
-</code></pre>
-</notextile>
-
 h2. Configure the database connection
 
 Edit @/etc/arvados/api/database.yml@ and replace the @xxxxxxxx@ database password placeholders with the PostgreSQL password you generated above.
 
 h2(#configure_application). Configure the API server
 
-Edit @/etc/arvados/api/application.yml@ to configure the settings described in the following sections.  The deployment script will consistently deploy this to the API server's configuration directory.  The API server reads both @application.yml@ and its own @config/application.default.yml@ file.  The settings in @application.yml@ take precedence over the defaults that are defined in @config/application.default.yml@.  The @config/application.yml.example@ file is not read by the API server and is provided as a starting template only.
+Edit @/etc/arvados/api/application.yml@ to configure the settings described in the following sections.  The API server reads both @application.yml@ and its own @config/application.default.yml@ file.  The settings in @application.yml@ take precedence over the defaults that are defined in @config/application.default.yml@.  The @config/application.yml.example@ file is not read by the API server and is provided as a starting template only.
 
 @config/application.default.yml@ documents additional configuration settings not listed here.  You can "view the current source version":https://dev.arvados.org/projects/arvados/repository/revisions/master/entry/services/api/config/application.default.yml for reference.
 
@@ -205,7 +190,9 @@ For best performance, we recommend you use Nginx as your Web server front-end, w
 <ol>
 <li><a href="https://www.phusionpassenger.com/library/walkthroughs/deploy/ruby/ownserver/nginx/oss/install_passenger_main.html">Install Nginx and Phusion Passenger</a>.</li>
 
-<li><p>Puma is already included with the API server's gems.  We recommend you run it as a service under <a href="http://smarden.org/runit/">runit</a> or a similar tool.  Here's a sample runit script for that:</p>
+<li><p>Install runit to supervise the Puma daemon.  {% include 'install_runit' %}<notextile></p></li>
+
+<li><p>Install the script below as the run script for the Puma service, modifying it as directed by the comments.</p>
 
 <pre><code>#!/bin/bash
 
index 146dbe170b917ff45849ebc018324949dc789cb2..5e373c38b855bb3b2f27410d94b0a6835da07c18 100644 (file)
@@ -89,14 +89,14 @@ git@gitserver:~$ <span class="userinput">rm .ssh/authorized_keys</span>
 
 h2. Install gitolite
 
-Check "https://github.com/sitaramc/gitolite/tags":https://github.com/sitaramc/gitolite/tags for the latest stable version. This guide was tested with @v3.6.3@. _Versions below 3.0 are missing some features needed by Arvados, and should not be used._
+Check "https://github.com/sitaramc/gitolite/tags":https://github.com/sitaramc/gitolite/tags for the latest stable version. This guide was tested with @v3.6.4@. _Versions below 3.0 are missing some features needed by Arvados, and should not be used._
 
 Download and install the version you selected.
 
 <notextile>
 <pre><code>git@gitserver:~$ <span class="userinput">echo 'PATH=$HOME/bin:$PATH' &gt;.profile</span>
 git@gitserver:~$ <span class="userinput">source .profile</span>
-git@gitserver:~$ <span class="userinput">git clone --branch <b>v3.6.3</b> git://github.com/sitaramc/gitolite</span>
+git@gitserver:~$ <span class="userinput">git clone --branch <b>v3.6.4</b> https://github.com/sitaramc/gitolite</span>
 ...
 Note: checking out '5d24ae666bfd2fa9093d67c840eb8d686992083f'.
 ...
@@ -255,19 +255,13 @@ fatal: No REQUEST_METHOD from server
 
 h3. Enable arvados-git-httpd
 
-On Debian-based systems, install runit:
-
-<notextile>
-<pre><code>~$ <span class="userinput">sudo apt-get install runit</span>
-</code></pre>
-</notextile>
-
-On Red Hat-based systems, "install runit from source":http://smarden.org/runit/install.html or use an alternative daemon supervisor.
+Install runit to supervise the arvados-git-httpd daemon.  {% include 'install_runit' %}
 
 Configure runit to run arvados-git-httpd, making sure to update the API host to match your site:
 
 <notextile>
-<pre><code>~$ <span class="userinput">cd /etc/sv</span>
+<pre><code>~$ <span class="userinput">sudo mkdir -p /etc/sv</span>
+~$ <span class="userinput">cd /etc/sv</span>
 /etc/sv$ <span class="userinput">sudo mkdir arvados-git-httpd; cd arvados-git-httpd</span>
 /etc/sv/arvados-git-httpd$ <span class="userinput">sudo mkdir log</span>
 /etc/sv/arvados-git-httpd$ <span class="userinput">sudo sh -c 'cat &gt;log/run' &lt;&lt;'EOF'
@@ -285,6 +279,7 @@ export PATH="$PATH:/var/lib/arvados/git/bin"
 exec chpst -u git:git arvados-git-httpd -address=:9001 -git-command=/var/lib/arvados/git/gitolite/src/gitolite-shell -repo-root=<b>/var/lib/arvados/git</b>/repositories 2&gt;&1
 EOF</span>
 /etc/sv/arvados-git-httpd$ <span class="userinput">sudo chmod +x run log/run</span>
+/etc/sv/arvados-git-httpd$ <span class="userinput">sudo ln -s "$(pwd)" /etc/service/</span>
 </code></pre>
 </notextile>
 
index 7d13f773a628c34db47fe5d9ba7f8b54e47eabfd..9a64ac76d79532d643a895a7df5d2f9971dfd2fc 100644 (file)
@@ -54,7 +54,7 @@ Install SLURM following "the same process you used to install the Crunch dispatc
 
 h2. Copy configuration files from the dispatcher (API server)
 
-The @/etc/slurm-llnl/slurm.conf@ and @/etc/munge/munge.key@ files need to be identicaly across the dispatcher and all compute nodes. Copy the files you created in the "Install the Crunch dispatcher":install-crunch-dispatch.html step to this compute node.
+The @slurm.conf@ and @/etc/munge/munge.key@ files need to be identical across the dispatcher and all compute nodes. Copy the files you created in the "Install the Crunch dispatcher":install-crunch-dispatch.html step to this compute node.
 
 h2. Configure FUSE
 
@@ -79,22 +79,16 @@ h2. Configure the Docker cleaner
 The arvados-docker-cleaner program removes least recently used docker images as needed to keep disk usage below a configured limit.
 
 {% include 'notebox_begin' %}
-This also removes all containers as soon as they exit, as if they were run with `docker run --rm`. If you need to debug or inspect containers after they stop, temporarily stop arvados-docker-cleaner or run it with `--remove-stopped-containers never`.
+This also removes all containers as soon as they exit, as if they were run with @docker run --rm@. If you need to debug or inspect containers after they stop, temporarily stop arvados-docker-cleaner or run it with @--remove-stopped-containers never@.
 {% include 'notebox_end' %}
 
-On Debian-based systems, install runit:
-
-<notextile>
-<pre><code>~$ <span class="userinput">sudo apt-get install runit</span>
-</code></pre>
-</notextile>
-
-On Red Hat-based systems, "install runit from source":http://smarden.org/runit/install.html or use an alternative daemon supervisor.
+Install runit to supervise the Docker cleaner daemon.  {% include 'install_runit' %}
 
 Configure runit to run the image cleaner using a suitable quota for your compute nodes and workload:
 
 <notextile>
-<pre><code>~$ <span class="userinput">cd /etc/sv</span>
+<pre><code>~$ <span class="userinput">sudo mkdir -p /etc/sv</span>
+~$ <span class="userinput">cd /etc/sv</span>
 /etc/sv$ <span class="userinput">sudo mkdir arvados-docker-cleaner; cd arvados-docker-cleaner</span>
 /etc/sv/arvados-docker-cleaner$ <span class="userinput">sudo mkdir log log/main</span>
 /etc/sv/arvados-docker-cleaner$ <span class="userinput">sudo sh -c 'cat &gt;log/run' &lt;&lt;'EOF'
@@ -106,6 +100,7 @@ EOF</span>
 exec python3 -m arvados_docker.cleaner --quota <b>50G</b>
 EOF</span>
 /etc/sv/arvados-docker-cleaner$ <span class="userinput">sudo chmod +x run log/run</span>
+/etc/sv/arvados-docker-cleaner$ <span class="userinput">sudo ln -s "$(pwd)" /etc/service/</span>
 </code></pre>
 </notextile>
 
@@ -152,8 +147,7 @@ if ! test -f /root/node.json ; then
 import arvados, json, socket
 fqdn = socket.getfqdn()
 hostname, _, domain = fqdn.partition('.')
-ip_address = socket.gethostbyname(fqdn)
-node = arvados.api('v1').nodes().create(body={'hostname': hostname, 'domain': domain, 'ip_address': ip_address}).execute()
+node = arvados.api('v1').nodes().create(body={'hostname': hostname, 'domain': domain}).execute()
 with open('/root/node.json', 'w') as node_file:
     json.dump(node, node_file, indent=2)
 EOF
@@ -183,4 +177,3 @@ And remove your token from the environment:
 </code>
 </pre>
 </notextile>
-
index 907f0fdf92b246bc7fdc691938f725376dd9db56..0e5be9411a28f435a738e694dd6c398b91ae2b1e 100644 (file)
@@ -58,9 +58,14 @@ On Debian-based systems:
 </code></pre>
 </notextile>
 
-On Red Hat-based systems, "install SLURM and munge from source following their installation guide":https://computing.llnl.gov/linux/slurm/quickstart_admin.html.
+On Red Hat-based systems:
 
-Now we need to give SLURM a configuration file in @/etc/slurm-llnl/slurm.conf@. Here's an example:
+<notextile>
+<pre><code>~$ <span class="userinput">sudo yum install slurm munge slurm-munge</span>
+</code></pre>
+</notextile>
+
+Now we need to give SLURM a configuration file.  On Debian-based systems, this is installed at @/etc/slurm-llnl/slurm.conf@.  On Red Hat-based systems, this is installed at @/etc/slurm/slurm.conf@.  Here's an example @slurm.conf@:
 
 <notextile>
 <pre>
@@ -90,9 +95,8 @@ Waittime=0
 # SCHEDULING
 SchedulerType=sched/backfill
 SchedulerPort=7321
-SelectType=select/cons_res
-SelectTypeParameters=CR_CPU_Memory
-FastSchedule=1
+SelectType=select/linear
+FastSchedule=0
 #
 # LOGGING
 SlurmctldDebug=3
@@ -125,8 +129,8 @@ Whenever you change this file, you will need to update the copy _on every comput
 Each hostname in @slurm.conf@ must also resolve correctly on all SLURM worker nodes as well as the controller itself. Furthermore, the hostnames used in the configuration file must match the hostnames reported by @hostname@ or @hostname -s@ on the nodes themselves. This applies to the ControlMachine as well as the worker nodes.
 
 For example:
-* In @/etc/slurm-llnl/slurm.conf@ on control and worker nodes: @ControlMachine=uuid_prefix.your.domain@
-* In @/etc/slurm-llnl/slurm.conf@ on control and worker nodes: @NodeName=compute[0-255]@
+* In @slurm.conf@ on control and worker nodes: @ControlMachine=uuid_prefix.your.domain@
+* In @slurm.conf@ on control and worker nodes: @NodeName=compute[0-255]@
 * In @/etc/resolv.conf@ on control and worker nodes: @search uuid_prefix.your.domain@
 * On the control node: @hostname@ reports @uuid_prefix.your.domain@
 * On worker node 123: @hostname@ reports @compute123.uuid_prefix.your.domain@
@@ -163,7 +167,9 @@ To dispatch Arvados jobs:
 * @crunch-job@ needs the installation path of the Perl SDK in its @PERLLIB@.
 * @crunch-job@ needs the @ARVADOS_API_HOST@ (and, if necessary, @ARVADOS_API_HOST_INSECURE@) environment variable set.
 
-We recommend you run @crunch-dispatch.rb@ under "runit":http://smarden.org/runit/ or a similar supervisor.  Here's an example runit service script:
+Install runit to monitor the Crunch dispatch daemon.  {% include 'install_runit' %}
+
+Install the script below as the run script for the Crunch dispatch service, modifying it as directed by the comments.
 
 <notextile>
 <pre><code>#!/bin/sh
index c7a7b2007657d82b120bd191a5904bdd703d3936..16d23e6df56d2fb58cf38974a1e31dbda82d8915 100644 (file)
@@ -55,7 +55,9 @@ Usage of keep-web:
 {% assign railsout = "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz" %}
 If you intend to use Keep-web to serve public data to anonymous clients, configure it with an anonymous token. You can use the same one you used when you set up your Keepproxy server, or use the following command on the <strong>API server</strong> to create another. {% include 'install_rails_command' %}
 
-We recommend running Keep-web under "runit":https://packages.debian.org/search?keywords=runit or a similar supervisor. The basic command to start Keep-web is:
+Install runit to supervise the Keep-web daemon.  {% include 'install_runit' %}
+
+The basic command to start Keep-web in the service run script is:
 
 <notextile>
 <pre><code>export ARVADOS_API_HOST=<span class="userinput">uuid_prefix</span>.your.domain
index 14e5ed5741067e0550a450b9d3bf9f24cb264a83..a6bb5d4bd9aeb3a6d2b276d20e1fc1e0505bf2c9 100644 (file)
@@ -57,7 +57,9 @@ The Keepproxy server needs a token to talk to the API server.  On the <strong>AP
 
 h3. Set up the Keepproxy service
 
-We recommend you run Keepproxy under "runit":http://smarden.org/runit/ or a similar supervisor.  Make sure the launcher sets the envirnoment variables @ARVADOS_API_TOKEN@ (with the token you just generated), @ARVADOS_API_HOST@, and, if needed, @ARVADOS_API_HOST_INSECURE@.  The core keepproxy command to run is:
+Install runit to supervise the keepproxy daemon.  {% include 'install_runit' %}
+
+The run script for the keepproxy service should set the environment variables @ARVADOS_API_TOKEN@ (with the token you just generated), @ARVADOS_API_HOST@, and, if needed, @ARVADOS_API_HOST_INSECURE@.  The core keepproxy command to run is:
 
 <notextile>
 <pre><code>ARVADOS_API_TOKEN=<span class="userinput">{{railsout}}</span> ARVADOS_API_HOST=<span class="userinput">uuid_prefix.your.domain</span> exec keepproxy
@@ -77,7 +79,7 @@ upstream keepproxy {
 
 server {
   listen                <span class="userinput">[your public IP address]</span>:443 ssl;
-  server_name           keep.<span class="userinput">uuid_prefix</span>.your.domain
+  server_name           keep.<span class="userinput">uuid_prefix</span>.your.domain;
 
   proxy_connect_timeout 90s;
   proxy_read_timeout    300s;
index 2e7382bf1ebc642c34619a66ad6472fda18cc1fe..13dfaf6725d40e089759ec62058fcd63ff83c33b 100644 (file)
@@ -94,7 +94,9 @@ Equivalently:
 
 h3. Run keepstore as a supervised service
 
-We recommend running Keepstore under "runit":http://smarden.org/runit/ or something similar, using a run script like the following:
+Install runit to supervise the keepstore daemon.  {% include 'install_runit' %}
+
+Install this script as the run script for the keepstore service, modifying it as directed below.
 
 <notextile>
 <pre><code>#!/bin/sh
index ec4c9d46cc8ee7c98a097baa3ddad49f55bd760c..dd5995ffdde442c85f665cb5feae14fd9b0fe879 100644 (file)
@@ -12,7 +12,21 @@ Please follow the "API token guide":../user/reference/api-tokens.html to get API
 
 h2. Install the Ruby SDK and utilities
 
-If you're using RVM:
+First, install the curl development libraries necessary to build the Arvados Ruby SDK.  On Debian-based systems:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo apt-get install libcurl4-openssl-dev</span>
+</code></pre>
+</notextile>
+
+On Red Hat-based systems:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo yum install libcurl-devel</span>
+</code></pre>
+</notextile>
+
+Next, install the arvados-cli Ruby gem.  If you're using RVM:
 
 <notextile>
 <pre><code>~$ <span class="userinput">sudo /usr/local/rvm/bin/rvm-exec default gem install arvados-cli</span>
index a3064038e85134f77697c8fb1c0cd43a7f370f64..aaa6211b461ca3296a6cab157c9ee49bf4488c65 100644 (file)
@@ -94,18 +94,9 @@ On a Red Hat-based system, we also need to initialize the database system:
 </code></pre>
 </notextile>
 
-{% include 'notebox_begin' %}
-
-If you are installing on CentOS6, you will need to modify PostgreSQL's configuration to allow password authentication for local users. The default configuration allows 'ident' only. The following commands will make the configuration change, and restart PostgreSQL for it to take effect.
-<br/>
-<notextile>
-<pre><code>~$ <span class="userinput">sudo sed -i -e "s/127.0.0.1\/32          ident/127.0.0.1\/32          md5/" /var/lib/pgsql/data/pg_hba.conf</span>
-~$ <span class="userinput">sudo sed -i -e "s/::1\/128               ident/::1\/128               md5/" /var/lib/pgsql/data/pg_hba.conf</span>
-~$ <span class="userinput">sudo service postgresql restart</span>
-</code></pre>
-</notextile>
-{% include 'notebox_end' %}
-
+{% assign pg_service = "postgresql" %}
+{% assign pg_hba_path = "/var/lib/pgsql/data/pg_hba.conf" %}
+{% include 'install_redhat_postgres_auth' %}
 
 Next, generate a new database password. Nobody ever needs to memorize it or type it, so make a strong one:
 
index 1fd525d404c348fda648d57898c8c4877fd0bbb1..5a60ca5484e49fe502617accf847988d9d815d81 100644 (file)
@@ -32,20 +32,9 @@ On a Red Hat-based system, install the following packages:
 </code></pre>
 </notextile>
 
-h2. Set up configuration files
-
-The Workbench server package uses configuration files that you write to @/etc/arvados/workbench@ and ensures they're consistently deployed.  Create this directory and copy the example configuration files to it:
-
-<notextile>
-<pre><code>~$ <span class="userinput">sudo mkdir -p /etc/arvados/workbench</span>
-~$ <span class="userinput">sudo chmod 700 /etc/arvados/workbench</span>
-~$ <span class="userinput">sudo cp /var/www/arvados-workbench/current/config/application.yml.example /etc/arvados/workbench/application.yml</span>
-</code></pre>
-</notextile>
-
 h2(#configure). Configure Workbench
 
-Edit @/etc/arvados/workbench/application.yml@ following the instructions below.  The deployment script will consistently deploy this to Workbench's configuration directory.  Workbench reads both @application.yml@ and its own @config/application.defaults.yml@ file.  Values in @application.yml@ take precedence over the defaults that are defined in @config/application.defaults.yml@.  The @config/application.yml.example@ file is not read by Workbench and is provided for installation convenience only.
+Edit @/etc/arvados/workbench/application.yml@ following the instructions below.  Workbench reads both @application.yml@ and its own @config/application.defaults.yml@ file.  Values in @application.yml@ take precedence over the defaults that are defined in @config/application.defaults.yml@.  The @config/application.yml.example@ file is not read by Workbench and is provided for installation convenience only.
 
 Consult @config/application.default.yml@ for a full list of configuration options.  Always put your local configuration in @/etc/arvados/workbench/application.yml@&mdash;never edit @config/application.default.yml@.
 
@@ -98,7 +87,7 @@ For best performance, we recommend you use Nginx as your Web server front-end, w
 <li>If you're deploying on an older Red Hat-based distribution and installed Pythyon 2.7 from Software Collections, configure Nginx to use it:
 
 <pre><code>~$ <span class="userinput">sudo usermod --shell /bin/bash nginx</span>
-~$ <span class="userinput">sudo -u nginx sh -c 'echo "[[ -z \$PS1 && -e /opt/rh/python27/enable ]] && source /opt/rh/python27/enable" >>~/.bash_profile'</span>
+~$ <span class="userinput">sudo -u nginx sh -c 'echo "[[ -z \$PS1 ]] && source scl_source enable python27" >>~/.bash_profile'</span>
 </code></pre>
 
 </li>
index 7c50c282dace9859c855c438f56b8e981b0feb43..4e5b0826b6bceabf326b07b65660a5ec4c93f946 100755 (executable)
@@ -415,11 +415,13 @@ if (!defined $no_clear_tmp) {
 # If this job requires a Docker image, install that.
 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
 if ($docker_locator = $Job->{docker_image_locator}) {
+  Log (undef, "Install docker image $docker_locator");
   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
   if (!$docker_hash)
   {
     croak("No Docker image hash found from locator $docker_locator");
   }
+  Log (undef, "docker image hash is $docker_hash");
   $docker_stream =~ s/^\.//;
   my $docker_install_script = qq{
 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
@@ -1057,12 +1059,14 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
       check_refresh_wanted();
       check_squeue();
       update_progress_stats();
-      select (undef, undef, undef, 0.1);
     }
     elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
     {
       update_progress_stats();
     }
+    if (!$gotsome) {
+      select (undef, undef, undef, 0.1);
+    }
     $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
                                         $_->{node}->{hold_count} < 4 } @slot);
     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
@@ -1434,15 +1438,21 @@ sub readfrompipes
   foreach my $job (keys %reader)
   {
     my $buf;
-    while (0 < sysread ($reader{$job}, $buf, 8192))
+    if (0 < sysread ($reader{$job}, $buf, 65536))
     {
       print STDERR $buf if $ENV{CRUNCH_DEBUG};
       $jobstep[$job]->{stderr_at} = time;
       $jobstep[$job]->{stderr} .= $buf;
+
+      # Consume everything up to the last \n
       preprocess_stderr ($job);
+
       if (length ($jobstep[$job]->{stderr}) > 16384)
       {
-       substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
+        # If we get a lot of stderr without a newline, chop off the
+        # front to avoid letting our buffer grow indefinitely.
+        substr ($jobstep[$job]->{stderr},
+                0, length($jobstep[$job]->{stderr}) - 8192) = "";
       }
       $gotsome = 1;
     }
@@ -1463,7 +1473,7 @@ sub preprocess_stderr
       # whoa.
       $main::please_freeze = 1;
     }
-    elsif ($line =~ /srun: error: Node failure on/) {
+    elsif ($line =~ /srun: error: (Node failure on|Aborting, .*\bio error\b)/) {
       my $job_slot_index = $jobstep[$job]->{slotindex};
       $slot[$job_slot_index]->{node}->{fail_count}++;
       $jobstep[$job]->{tempfail} = 1;
index 3132da3a0a2e271350c1a5418efa577703555231..94b8a9d06cfaec73b718b8514adcd3ba08ab2991 100644 (file)
@@ -13,6 +13,7 @@ from ws4py.client.threadedclient import WebSocketClient
 
 _logger = logging.getLogger('arvados.events')
 
+
 class EventClient(WebSocketClient):
     def __init__(self, url, filters, on_event, last_log_id):
         ssl_options = {'ca_certs': arvados.util.ca_certs_path()}
@@ -29,23 +30,33 @@ class EventClient(WebSocketClient):
         self.filters = filters
         self.on_event = on_event
         self.last_log_id = last_log_id
-        self._closed_lock = threading.RLock()
-        self._closed = False
+        self._closing_lock = threading.RLock()
+        self._closing = False
+        self._closed = threading.Event()
 
     def opened(self):
         self.subscribe(self.filters, self.last_log_id)
 
+    def closed(self, code, reason=None):
+        self._closed.set()
+
     def received_message(self, m):
-        with self._closed_lock:
-            if not self._closed:
+        with self._closing_lock:
+            if not self._closing:
                 self.on_event(json.loads(str(m)))
 
-    def close(self, code=1000, reason=''):
-        """Close event client and wait for it to finish."""
+    def close(self, code=1000, reason='', timeout=0):
+        """Close event client and optionally wait for it to finish.
+
+        :timeout: is the number of seconds to wait for ws4py to
+        indicate that the connection has closed.
+        """
         super(EventClient, self).close(code, reason)
-        with self._closed_lock:
+        with self._closing_lock:
             # make sure we don't process any more messages.
-            self._closed = True
+            self._closing = True
+        # wait for ws4py to tell us the connection is closed.
+        self._closed.wait(timeout=timeout)
 
     def subscribe(self, filters, last_log_id=None):
         m = {"method": "subscribe", "filters": filters}
@@ -56,6 +67,7 @@ class EventClient(WebSocketClient):
     def unsubscribe(self, filters):
         self.send(json.dumps({"method": "unsubscribe", "filters": filters}))
 
+
 class PollClient(threading.Thread):
     def __init__(self, api, filters, on_event, poll_time, last_log_id):
         super(PollClient, self).__init__()
@@ -67,8 +79,9 @@ class PollClient(threading.Thread):
         self.on_event = on_event
         self.poll_time = poll_time
         self.daemon = True
-        self.stop = threading.Event()
         self.last_log_id = last_log_id
+        self._closing = threading.Event()
+        self._closing_lock = threading.RLock()
 
     def run(self):
         self.id = 0
@@ -83,7 +96,7 @@ class PollClient(threading.Thread):
 
         self.on_event({'status': 200})
 
-        while not self.stop.isSet():
+        while not self._closing.is_set():
             max_id = self.id
             moreitems = False
             for f in self.filters:
@@ -91,24 +104,38 @@ class PollClient(threading.Thread):
                 for i in items["items"]:
                     if i['id'] > max_id:
                         max_id = i['id']
-                    self.on_event(i)
+                    with self._closing_lock:
+                        if self._closing.is_set():
+                            return
+                        self.on_event(i)
                 if items["items_available"] > len(items["items"]):
                     moreitems = True
             self.id = max_id
             if not moreitems:
-                self.stop.wait(self.poll_time)
+                self._closing.wait(self.poll_time)
 
     def run_forever(self):
         # Have to poll here, otherwise KeyboardInterrupt will never get processed.
-        while not self.stop.is_set():
-            self.stop.wait(1)
+        while not self._closing.is_set():
+            self._closing.wait(1)
+
+    def close(self, code=None, reason=None, timeout=0):
+        """Close poll client and optionally wait for it to finish.
+
+        If an :on_event: handler is running in a different thread,
+        first wait (indefinitely) for it to return.
+
+        After closing, wait up to :timeout: seconds for the thread to
+        finish the poll request in progress (if any).
 
-    def close(self):
-        """Close poll client and wait for it to finish."""
+        :code: and :reason: are ignored. They are present for
+        interface compatibility with EventClient.
+        """
 
-        self.stop.set()
+        with self._closing_lock:
+            self._closing.set()
         try:
-            self.join()
+            self.join(timeout=timeout)
         except RuntimeError:
             # "join() raises a RuntimeError if an attempt is made to join the
             # current thread as that would cause a deadlock. It is also an
index 3b4330935c568a035178b594857f1e4c8916b37c..48998aad36d10f7630f24e574a4d278968dc7032 100644 (file)
@@ -74,7 +74,7 @@ gem 'faye-websocket'
 gem 'themes_for_rails'
 
 gem 'arvados', '>= 0.1.20150615153458'
-gem 'arvados-cli', '>= 0.1.20151023185755'
+gem 'arvados-cli', '>= 0.1.20151207150126'
 
 # pg_power lets us use partial indexes in schema.rb in Rails 3
 gem 'pg_power'
index b505b194b2ac35b220c970c9f9e6c45ebeedd9f4..ac6be5a522303fd7418ba69974165e3b2821e17f 100644 (file)
@@ -41,7 +41,7 @@ GEM
       google-api-client (~> 0.6.3, >= 0.6.3)
       json (~> 1.7, >= 1.7.7)
       jwt (>= 0.1.5, < 1.0.0)
-    arvados-cli (0.1.20151023190001)
+    arvados-cli (0.1.20151207150126)
       activesupport (~> 3.2, >= 3.2.13)
       andand (~> 1.3, >= 1.3.3)
       arvados (~> 0.1, >= 0.1.20150128223554)
@@ -228,7 +228,7 @@ DEPENDENCIES
   acts_as_api
   andand
   arvados (>= 0.1.20150615153458)
-  arvados-cli (>= 0.1.20151023185755)
+  arvados-cli (>= 0.1.20151207150126)
   coffee-rails (~> 3.2.0)
   database_cleaner
   factory_girl_rails
index 4ab5695a2f1c607f22bec9bd1948d575c73fd29e..5e2404e62c4db63360ddf91f5a6f1c801c763ce3 100644 (file)
@@ -23,7 +23,7 @@ class Arvados::V1::NodesController < ApplicationController
         return render_not_found
       end
       ping_data = {
-        ip: params[:local_ipv4] || request.env['REMOTE_ADDR'],
+        ip: params[:local_ipv4] || request.remote_ip,
         ec2_instance_id: params[:instance_id]
       }
       [:ping_secret, :total_cpu_cores, :total_ram_mb, :total_scratch_mb]
index 35dd1a94c9d983b343fc6394370f03ca795ca896..6cd40a44585c6805278dd9d421c8495d5d66c1c7 100644 (file)
@@ -115,6 +115,10 @@ class ArvadosModel < ActiveRecord::Base
     ["#{table_name}.modified_at desc", "#{table_name}.uuid"]
   end
 
+  def self.unique_columns
+    ["id", "uuid"]
+  end
+
   # If current user can manage the object, return an array of uuids of
   # users and groups that have permission to write the object. The
   # first two elements are always [self.owner_uuid, current user's
index aa3d819b56d53bb1e50ff9dfa13bb6bb9bff2fb3..38a9cde32765d7fa24b7d03bd29d579a377ded93 100644 (file)
@@ -5,6 +5,6 @@ case "$TARGET" in
         fpm_depends+=(libcurl-devel postgresql-devel)
         ;;
     debian* | ubuntu*)
-        fpm_depends+=(libcurl4-openssl-dev libpq-dev)
+        fpm_depends+=(libcurl-ssl-dev libpq-dev)
         ;;
 esac
index 05f85c7bb67f8c7863ed46fd793ad6a96fa2cf77..b59279e554643c5015fb371ec3774e34bc28f99f 100644 (file)
@@ -637,9 +637,8 @@ class CrunchDispatch
 
     jobrecord = Job.find_by_uuid(job_done.uuid)
 
-    if exit_status == EXIT_RETRY_UNLOCKED
-      # The job failed because all of the nodes allocated to it
-      # failed.  Only this crunch-dispatch process can retry the job:
+    if exit_status == EXIT_RETRY_UNLOCKED or (exit_tempfail and @job_retry_counts.include? jobrecord.uuid)
+      # Only this crunch-dispatch process can retry the job:
       # it's already locked, and there's no way to put it back in the
       # Queued state.  Put it in our internal todo list unless the job
       # has failed this way excessively.
index 718aaeaf690be5bc5e61592849ad4009e3b69f52..d7b9bb7513899d477906738d09a5a23bc8e6095f 100644 (file)
@@ -111,7 +111,31 @@ module LoadParam
     # (e.g., [] or ['owner_uuid desc']), fall back on the default
     # orders to ensure repeating the same request (possibly with
     # different limit/offset) will return records in the same order.
-    @orders += model_class.default_orders
+    #
+    # Clean up the resulting list of orders such that no column
+    # uselessly appears twice (Postgres might not optimize this out
+    # for us) and no columns uselessly appear after a unique column
+    # (Postgres does not optimize this out for us; as of 9.2, "order
+    # by id, modified_at desc, uuid" is slow but "order by id" is
+    # fast).
+    orders_given_and_default = @orders + model_class.default_orders
+    order_cols_used = {}
+    @orders = []
+    orders_given_and_default.each do |order|
+      otablecol = order.split(' ')[0]
+
+      next if order_cols_used[otablecol]
+      order_cols_used[otablecol] = true
+
+      @orders << order
+
+      otable, ocol = otablecol.split('.')
+      if otable == table_name and model_class.unique_columns.include?(ocol)
+        # we already have a full ordering; subsequent entries would be
+        # superfluous
+        break
+      end
+    end
 
     case params[:select]
     when Array
index 23d2e1955eeee141e263a68f7aa101549e3037b1..489bb1d6605f86d622c824260d96ef89f63bd026 100644 (file)
@@ -80,3 +80,14 @@ new_with_custom_hostname:
   job_uuid: ~
   info:
     ping_secret: "abcdyi0x4lb5q4gzqqtrnq30oyj08r8dtdimmanbqw49z1anz2"
+
+node_with_no_ip_address_yet:
+  uuid: zzzzz-7ekkf-nodenoipaddryet
+  owner_uuid: zzzzz-tpzed-000000000000000
+  hostname: noipaddr
+  slot_number: ~
+  last_ping_at: ~
+  first_ping_at: ~
+  job_uuid: ~
+  info:
+    ping_secret: "abcdyefg4lb5q4gzqqtrnq30oyj08r8dtdimmanbqw49z1anz2"
index d2f56699ed0c0a858b3d296bd1a799dd605fa4a0..428c663a77b92ab2fc679ef20bbcf6b8fcc0fd69 100644 (file)
@@ -182,4 +182,42 @@ class Arvados::V1::NodesControllerTest < ActionController::TestCase
     }
     assert_response 422
   end
+
+  test "first ping should set ip addr using local_ipv4 when provided" do
+    post :ping, {
+      id: 'zzzzz-7ekkf-nodenoipaddryet',
+      instance_id: 'i-0000000',
+      local_ipv4: '172.17.2.172',
+      ping_secret: 'abcdyefg4lb5q4gzqqtrnq30oyj08r8dtdimmanbqw49z1anz2'
+    }
+    assert_response :success
+    response = JSON.parse(@response.body)
+    assert_equal 'zzzzz-7ekkf-nodenoipaddryet', response['uuid']
+    assert_equal '172.17.2.172', response['ip_address']
+  end
+
+  test "first ping should set ip addr using remote_ip when local_ipv4 is not provided" do
+    post :ping, {
+      id: 'zzzzz-7ekkf-nodenoipaddryet',
+      instance_id: 'i-0000000',
+      ping_secret: 'abcdyefg4lb5q4gzqqtrnq30oyj08r8dtdimmanbqw49z1anz2'
+    }
+    assert_response :success
+    response = JSON.parse(@response.body)
+    assert_equal 'zzzzz-7ekkf-nodenoipaddryet', response['uuid']
+    assert_equal request.remote_ip, response['ip_address']
+  end
+
+  test "future pings should not change previous ip address" do
+    post :ping, {
+      id: 'zzzzz-7ekkf-2z3mc76g2q73aio',
+      instance_id: 'i-0000000',
+      local_ipv4: '172.17.2.175',
+      ping_secret: '69udawxvn3zzj45hs8bumvndricrha4lcpi23pd69e44soanc0'
+    }
+    assert_response :success
+    response = JSON.parse(@response.body)
+    assert_equal 'zzzzz-7ekkf-2z3mc76g2q73aio', response['uuid']
+    assert_equal '172.17.2.174', response['ip_address']   # original ip address is not overwritten
+  end
 end
diff --git a/services/api/test/functional/arvados/v1/query_test.rb b/services/api/test/functional/arvados/v1/query_test.rb
new file mode 100644 (file)
index 0000000..91fe077
--- /dev/null
@@ -0,0 +1,68 @@
+require 'test_helper'
+
+class Arvados::V1::QueryTest < ActionController::TestCase
+  test 'no fallback orders when order is unambiguous' do
+    @controller = Arvados::V1::LogsController.new
+    authorize_with :active
+    get :index, {
+      order: ['id asc'],
+      controller: 'logs',
+    }
+    assert_response :success
+    assert_equal ['logs.id asc'], assigns(:objects).order_values
+  end
+
+  test 'fallback orders when order is ambiguous' do
+    @controller = Arvados::V1::LogsController.new
+    authorize_with :active
+    get :index, {
+      order: ['event_type asc'],
+      controller: 'logs',
+    }
+    assert_response :success
+    assert_equal('logs.event_type asc, logs.modified_at desc, logs.uuid',
+                 assigns(:objects).order_values.join(', '))
+  end
+
+  test 'skip fallback orders already given by client' do
+    @controller = Arvados::V1::LogsController.new
+    authorize_with :active
+    get :index, {
+      order: ['modified_at asc'],
+      controller: 'logs',
+    }
+    assert_response :success
+    assert_equal('logs.modified_at asc, logs.uuid',
+                 assigns(:objects).order_values.join(', '))
+  end
+
+  test 'eliminate superfluous orders' do
+    @controller = Arvados::V1::LogsController.new
+    authorize_with :active
+    get :index, {
+      order: ['logs.modified_at asc',
+              'modified_at desc',
+              'event_type desc',
+              'logs.event_type asc'],
+      controller: 'logs',
+    }
+    assert_response :success
+    assert_equal('logs.modified_at asc, logs.event_type desc, logs.uuid',
+                 assigns(:objects).order_values.join(', '))
+  end
+
+  test 'eliminate orders after the first unique column' do
+    @controller = Arvados::V1::LogsController.new
+    authorize_with :active
+    get :index, {
+      order: ['event_type asc',
+              'id asc',
+              'uuid asc',
+              'modified_at desc'],
+      controller: 'logs',
+    }
+    assert_response :success
+    assert_equal('logs.event_type asc, logs.id asc',
+                 assigns(:objects).order_values.join(', '))
+  end
+end
index e14912423db73483ef2623149e23d3ca63b3dabb..6bce3258d9857808f24e677ce5374f0f2de61a23 100644 (file)
@@ -142,7 +142,7 @@ func GetContainerNetStats(cgroup Cgroup) (io.Reader, error) {
                statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
                stats, err := ioutil.ReadFile(statsFilename)
                if err != nil {
-                       statLog.Printf("read %s: %s\n", statsFilename, err)
+                       statLog.Printf("error reading %s: %s\n", statsFilename, err)
                        continue
                }
                return strings.NewReader(string(stats)), nil
@@ -409,7 +409,7 @@ func run(logger *log.Logger) error {
                        if cmd.Process != nil {
                                cmd.Process.Signal(catch)
                        }
-                       statLog.Println("caught signal:", catch)
+                       statLog.Println("notice: caught signal:", catch)
                }(sigChan)
                signal.Notify(sigChan, syscall.SIGTERM)
                signal.Notify(sigChan, syscall.SIGINT)
index 71623a5f3d07364bb4de94c42023af3b5658fc55..c4b0df3a4e51e5ba9236b1a04957d019cce5d88c 100644 (file)
@@ -82,6 +82,10 @@ class ArgumentParser(argparse.ArgumentParser):
 
         self.add_argument('--crunchstat-interval', type=float, help="Write stats to stderr every N seconds (default disabled)", default=0)
 
+        self.add_argument('--unmount-timeout',
+                          type=float, default=2.0,
+                          help="Time to wait for graceful shutdown after --exec program exits and filesystem is unmounted")
+
         self.add_argument('--exec', type=str, nargs=argparse.REMAINDER,
                             dest="exec_args", metavar=('command', 'args', '...', '--'),
                             help="""Mount, run a command, then unmount and exit""")
@@ -91,6 +95,7 @@ class Mount(object):
     def __init__(self, args, logger=logging.getLogger('arvados.arv-mount')):
         self.logger = logger
         self.args = args
+        self.listen_for_events = False
 
         self.args.mountpoint = os.path.realpath(self.args.mountpoint)
         if self.args.logfile:
@@ -106,15 +111,21 @@ class Mount(object):
 
     def __enter__(self):
         llfuse.init(self.operations, self.args.mountpoint, self._fuse_options())
-        if self.args.mode != 'by_pdh':
+        if self.listen_for_events:
             self.operations.listen_for_events()
-        t = threading.Thread(None, lambda: llfuse.main())
-        t.start()
+        self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
+        self.llfuse_thread.daemon = True
+        self.llfuse_thread.start()
         self.operations.initlock.wait()
 
     def __exit__(self, exc_type, exc_value, traceback):
         subprocess.call(["fusermount", "-u", "-z", self.args.mountpoint])
-        self.operations.destroy()
+        self.llfuse_thread.join(timeout=self.args.unmount_timeout)
+        if self.llfuse_thread.is_alive():
+            self.logger.warning("Mount.__exit__:"
+                                " llfuse thread still alive %fs after umount"
+                                " -- abandoning and exiting anyway",
+                                self.args.unmount_timeout)
 
     def run(self):
         if self.args.exec_args:
@@ -230,7 +241,9 @@ class Mount(object):
             mount_readme = True
 
         if dir_class is not None:
-            self.operations.inodes.add_entry(dir_class(*dir_args))
+            ent = dir_class(*dir_args)
+            self.operations.inodes.add_entry(ent)
+            self.listen_for_events = ent.want_event_subscribe()
             return
 
         e = self.operations.inodes.add_entry(Directory(
@@ -260,6 +273,7 @@ class Mount(object):
         if name in ['', '.', '..'] or '/' in name:
             sys.exit("Mount point '{}' is not supported.".format(name))
         tld._entries[name] = self.operations.inodes.add_entry(ent)
+        self.listen_for_events = (self.listen_for_events or ent.want_event_subscribe())
 
     def _readme_text(self, api_host, user_email):
         return '''
@@ -277,63 +291,57 @@ From here, the following directories are available:
 '''.format(api_host, user_email)
 
     def _run_exec(self):
-        # Initialize the fuse connection
-        llfuse.init(self.operations, self.args.mountpoint, self._fuse_options())
-
-        # Subscribe to change events from API server
-        if self.args.mode != 'by_pdh':
-            self.operations.listen_for_events()
-
-        t = threading.Thread(None, lambda: llfuse.main())
-        t.start()
-
-        # wait until the driver is finished initializing
-        self.operations.initlock.wait()
-
         rc = 255
-        try:
-            sp = subprocess.Popen(self.args.exec_args, shell=False)
-
-            # forward signals to the process.
-            signal.signal(signal.SIGINT, lambda signum, frame: sp.send_signal(signum))
-            signal.signal(signal.SIGTERM, lambda signum, frame: sp.send_signal(signum))
-            signal.signal(signal.SIGQUIT, lambda signum, frame: sp.send_signal(signum))
-
-            # wait for process to complete.
-            rc = sp.wait()
-
-            # restore default signal handlers.
-            signal.signal(signal.SIGINT, signal.SIG_DFL)
-            signal.signal(signal.SIGTERM, signal.SIG_DFL)
-            signal.signal(signal.SIGQUIT, signal.SIG_DFL)
-        except Exception as e:
-            self.logger.exception(
-                'arv-mount: exception during exec %s', self.args.exec_args)
+        with self:
             try:
-                rc = e.errno
-            except AttributeError:
-                pass
-        finally:
-            subprocess.call(["fusermount", "-u", "-z", self.args.mountpoint])
-            self.operations.destroy()
+                sp = subprocess.Popen(self.args.exec_args, shell=False)
+
+                # forward signals to the process.
+                signal.signal(signal.SIGINT, lambda signum, frame: sp.send_signal(signum))
+                signal.signal(signal.SIGTERM, lambda signum, frame: sp.send_signal(signum))
+                signal.signal(signal.SIGQUIT, lambda signum, frame: sp.send_signal(signum))
+
+                # wait for process to complete.
+                rc = sp.wait()
+
+                # restore default signal handlers.
+                signal.signal(signal.SIGINT, signal.SIG_DFL)
+                signal.signal(signal.SIGTERM, signal.SIG_DFL)
+                signal.signal(signal.SIGQUIT, signal.SIG_DFL)
+            except Exception as e:
+                self.logger.exception(
+                    'arv-mount: exception during exec %s', self.args.exec_args)
+                try:
+                    rc = e.errno
+                except AttributeError:
+                    pass
         exit(rc)
 
     def _run_standalone(self):
         try:
             llfuse.init(self.operations, self.args.mountpoint, self._fuse_options())
 
-            if not (self.args.exec_args or self.args.foreground):
-                self.daemon_ctx = daemon.DaemonContext(working_directory=os.path.dirname(self.args.mountpoint),
-                                                       files_preserve=range(3, resource.getrlimit(resource.RLIMIT_NOFILE)[1]))
+            if not self.args.foreground:
+                self.daemon_ctx = daemon.DaemonContext(
+                    working_directory=os.path.dirname(self.args.mountpoint),
+                    files_preserve=range(
+                        3, resource.getrlimit(resource.RLIMIT_NOFILE)[1]))
                 self.daemon_ctx.open()
 
             # Subscribe to change events from API server
-            self.operations.listen_for_events()
+            if self.listen_for_events:
+                self.operations.listen_for_events()
 
-            llfuse.main()
+            self._llfuse_main()
         except Exception as e:
             self.logger.exception('arv-mount: exception during mount: %s', e)
             exit(getattr(e, 'errno', 1))
-        finally:
-            self.operations.destroy()
         exit(0)
+
+    def _llfuse_main(self):
+        try:
+            llfuse.main()
+        except:
+            llfuse.close(unmount=False)
+            raise
+        llfuse.close()
index 4c4dbc8ea1c7ec1db585673015d236586868154b..196bb221e901e132d10db4f2bdbd7ed060f794e3 100644 (file)
@@ -184,6 +184,9 @@ class Directory(FreshBase):
     def flush(self):
         pass
 
+    def want_event_subscribe(self):
+        raise NotImplementedError()
+
     def create(self, name):
         raise NotImplementedError()
 
@@ -351,6 +354,9 @@ class CollectionDirectory(CollectionDirectoryBase):
     def writable(self):
         return self.collection.writable() if self.collection is not None else self._writable
 
+    def want_event_subscribe(self):
+        return (uuid_pattern.match(self.collection_locator) is not None)
+
     # Used by arv-web.py to switch the contents of the CollectionDirectory
     def change_collection(self, new_locator):
         """Switch the contents of the CollectionDirectory.
@@ -544,6 +550,9 @@ class TmpCollectionDirectory(CollectionDirectoryBase):
     def writable(self):
         return True
 
+    def want_event_subscribe(self):
+        return False
+
     def finalize(self):
         self.collection.stop_threads()
 
@@ -629,6 +638,9 @@ will appear if it exists.
     def clear(self, force=False):
         pass
 
+    def want_event_subscribe(self):
+        return not self.pdh_only
+
 
 class RecursiveInvalidateDirectory(Directory):
     def invalidate(self):
@@ -650,6 +662,9 @@ class TagsDirectory(RecursiveInvalidateDirectory):
         self._poll = True
         self._poll_time = poll_time
 
+    def want_event_subscribe(self):
+        return True
+
     @use_counter
     def update(self):
         with llfuse.lock_released:
@@ -678,6 +693,9 @@ class TagDirectory(Directory):
         self._poll = poll
         self._poll_time = poll_time
 
+    def want_event_subscribe(self):
+        return True
+
     @use_counter
     def update(self):
         with llfuse.lock_released:
@@ -709,6 +727,9 @@ class ProjectDirectory(Directory):
         self._updating_lock = threading.Lock()
         self._current_user = None
 
+    def want_event_subscribe(self):
+        return True
+
     def createDirectory(self, i):
         if collection_uuid_pattern.match(i['uuid']):
             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
@@ -929,3 +950,6 @@ class SharedDirectory(Directory):
                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
         except Exception:
             _logger.exception()
+
+    def want_event_subscribe(self):
+        return True
diff --git a/services/fuse/fpm-info.sh b/services/fuse/fpm-info.sh
new file mode 100644 (file)
index 0000000..57671cd
--- /dev/null
@@ -0,0 +1 @@
+fpm_depends+=(fuse)
index 1cedd6665001b86a5baf4e1d5cba2c2a32e9df6f..fca1edf6bc25603beac62e6b88041cf481da43ee 100644 (file)
@@ -33,7 +33,7 @@ setup(name='arvados_fuse',
       ],
       install_requires=[
         'arvados-python-client >= 0.1.20151118035730',
-        'llfuse>=0.40',
+        'llfuse==0.41.1',
         'python-daemon',
         'ciso8601'
         ],
index faa4a55065945d907109b8994470ccccb2c5904d..5a45bfc103f34df2ae928ed5bcd305d100c408fa 100644 (file)
@@ -62,7 +62,9 @@ class IntegrationTest(unittest.TestCase):
             def wrapper(self, *args, **kwargs):
                 with arvados_fuse.command.Mount(
                         arvados_fuse.command.ArgumentParser().parse_args(
-                            argv + ['--foreground', self.mnt])):
+                            argv + ['--foreground',
+                                    '--unmount-timeout=0.1',
+                                    self.mnt])):
                     return func(self, *args, **kwargs)
             return wrapper
         return decorator
index 44ec1996d2121698b478e2ba25a6ae95d0bef4c7..c79daf80f54156b6e304839c01a66221217ae3c9 100644 (file)
@@ -37,6 +37,16 @@ class MountTestBase(unittest.TestCase):
         run_test_server.authorize_with("admin")
         self.api = api if api else arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
 
+    # This is a copy of Mount's method.  TODO: Refactor MountTestBase
+    # to use a Mount instead of copying its code.
+    def _llfuse_main(self):
+        try:
+            llfuse.main()
+        except:
+            llfuse.close(unmount=False)
+            raise
+        llfuse.close()
+
     def make_mount(self, root_class, **root_kwargs):
         self.operations = fuse.Operations(
             os.getuid(), os.getgid(),
@@ -45,7 +55,9 @@ class MountTestBase(unittest.TestCase):
         self.operations.inodes.add_entry(root_class(
             llfuse.ROOT_INODE, self.operations.inodes, self.api, 0, **root_kwargs))
         llfuse.init(self.operations, self.mounttmp, [])
-        threading.Thread(None, llfuse.main).start()
+        self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
+        self.llfuse_thread.daemon = True
+        self.llfuse_thread.start()
         # wait until the driver is finished initializing
         self.operations.initlock.wait()
         return self.operations.inodes[llfuse.ROOT_INODE]
@@ -55,17 +67,12 @@ class MountTestBase(unittest.TestCase):
         self.pool.join()
         del self.pool
 
-        # llfuse.close is buggy, so use fusermount instead.
-        #llfuse.close(unmount=True)
-
-        count = 0
-        success = 1
-        while (count < 9 and success != 0):
-          success = subprocess.call(["fusermount", "-u", self.mounttmp])
-          time.sleep(0.1)
-          count += 1
-
-        self.operations.destroy()
+        subprocess.call(["fusermount", "-u", "-z", self.mounttmp])
+        self.llfuse_thread.join(timeout=1)
+        if self.llfuse_thread.is_alive():
+            logger.warning("MountTestBase.tearDown():"
+                           " llfuse thread still alive 1s after umount"
+                           " -- abandoning and exiting anyway")
 
         os.rmdir(self.mounttmp)
         if self.keeptmp:
index bfefc674d7991dd80963a5d102a1bc757bc7bfb3..bb80d0a2fc94dc4c77c0f46f59414a8d00627235 100644 (file)
@@ -6,6 +6,7 @@ import functools
 import json
 import llfuse
 import logging
+import mock
 import os
 import run_test_server
 import sys
@@ -82,6 +83,7 @@ class MountArgsTest(unittest.TestCase):
         self.mnt = arvados_fuse.command.Mount(args)
         e = self.check_ent_type(arvados_fuse.MagicDirectory)
         self.assertEqual(e.pdh_only, False)
+        self.assertEqual(True, self.mnt.listen_for_events)
 
     @noexit
     def test_by_pdh(self):
@@ -92,6 +94,7 @@ class MountArgsTest(unittest.TestCase):
         self.mnt = arvados_fuse.command.Mount(args)
         e = self.check_ent_type(arvados_fuse.MagicDirectory)
         self.assertEqual(e.pdh_only, True)
+        self.assertEqual(False, self.mnt.listen_for_events)
 
     @noexit
     def test_by_tag(self):
@@ -101,6 +104,7 @@ class MountArgsTest(unittest.TestCase):
         self.assertEqual(args.mode, 'by_tag')
         self.mnt = arvados_fuse.command.Mount(args)
         e = self.check_ent_type(arvados_fuse.TagsDirectory)
+        self.assertEqual(True, self.mnt.listen_for_events)
 
     @noexit
     def test_collection(self, id_type='uuid'):
@@ -112,6 +116,7 @@ class MountArgsTest(unittest.TestCase):
         self.mnt = arvados_fuse.command.Mount(args)
         e = self.check_ent_type(arvados_fuse.CollectionDirectory)
         self.assertEqual(e.collection_locator, cid)
+        self.assertEqual(id_type == 'uuid', self.mnt.listen_for_events)
 
     def test_collection_pdh(self):
         self.test_collection('portable_data_hash')
@@ -126,6 +131,7 @@ class MountArgsTest(unittest.TestCase):
         e = self.check_ent_type(arvados_fuse.ProjectDirectory)
         self.assertEqual(e.project_object['uuid'],
                          run_test_server.fixture('users')['active']['uuid'])
+        self.assertEqual(True, self.mnt.listen_for_events)
 
     def test_mutually_exclusive_args(self):
         cid = run_test_server.fixture('collections')['public_text_file']['uuid']
@@ -162,9 +168,11 @@ class MountArgsTest(unittest.TestCase):
         e = self.check_ent_type(arvados_fuse.SharedDirectory)
         self.assertEqual(e.current_user['uuid'],
                          run_test_server.fixture('users')['active']['uuid'])
+        self.assertEqual(True, self.mnt.listen_for_events)
 
     @noexit
-    def test_custom(self):
+    @mock.patch('arvados.events.subscribe')
+    def test_custom(self, mock_subscribe):
         args = arvados_fuse.command.ArgumentParser().parse_args([
             '--mount-tmp', 'foo',
             '--mount-tmp', 'bar',
@@ -178,6 +186,24 @@ class MountArgsTest(unittest.TestCase):
         e = self.check_ent_type(arvados_fuse.ProjectDirectory, 'my_home')
         self.assertEqual(e.project_object['uuid'],
                          run_test_server.fixture('users')['active']['uuid'])
+        self.assertEqual(True, self.mnt.listen_for_events)
+        with self.mnt:
+            pass
+        self.assertEqual(1, mock_subscribe.call_count)
+
+    @noexit
+    @mock.patch('arvados.events.subscribe')
+    def test_custom_no_listen(self, mock_subscribe):
+        args = arvados_fuse.command.ArgumentParser().parse_args([
+            '--mount-by-pdh', 'pdh',
+            '--mount-tmp', 'foo',
+            '--mount-tmp', 'bar',
+            '--foreground', self.mntdir])
+        self.mnt = arvados_fuse.command.Mount(args)
+        self.assertEqual(False, self.mnt.listen_for_events)
+        with self.mnt:
+            pass
+        self.assertEqual(0, mock_subscribe.call_count)
 
     def test_custom_unsupported_layouts(self):
         for name in ['.', '..', '', 'foo/bar', '/foo']:
diff --git a/services/fuse/tests/test_exec.py b/services/fuse/tests/test_exec.py
new file mode 100644 (file)
index 0000000..66013a4
--- /dev/null
@@ -0,0 +1,60 @@
+import arvados_fuse.command
+import json
+import multiprocessing
+import os
+import run_test_server
+import tempfile
+import unittest
+
+try:
+    from shlex import quote
+except:
+    from pipes import quote
+
+def try_exec(mnt, cmd):
+    try:
+        arvados_fuse.command.Mount(
+            arvados_fuse.command.ArgumentParser().parse_args([
+                '--read-write',
+                '--mount-tmp=zzz',
+                '--unmount-timeout=0.1',
+                mnt,
+                '--exec'] + cmd)).run()
+    except SystemExit:
+        pass
+    else:
+        raise AssertionError('should have exited')
+
+
+class ExecMode(unittest.TestCase):
+    @classmethod
+    def setUpClass(cls):
+        run_test_server.run()
+        run_test_server.run_keep(enforce_permissions=True, num_servers=2)
+        run_test_server.authorize_with('active')
+
+    @classmethod
+    def tearDownClass(cls):
+        run_test_server.stop_keep(num_servers=2)
+
+    def setUp(self):
+        self.mnt = tempfile.mkdtemp()
+        _, self.okfile = tempfile.mkstemp()
+        self.pool = multiprocessing.Pool(1)
+
+    def tearDown(self):
+        self.pool.terminate()
+        self.pool.join()
+        os.rmdir(self.mnt)
+        os.unlink(self.okfile)
+
+    def test_exec(self):
+        self.pool.apply(try_exec, (self.mnt, [
+            'sh', '-c',
+            'echo -n foo >{}; cp {} {}'.format(
+                quote(os.path.join(self.mnt, 'zzz', 'foo.txt')),
+                quote(os.path.join(self.mnt, 'zzz', '.arvados#collection')),
+                quote(os.path.join(self.okfile)))]))
+        self.assertRegexpMatches(
+            json.load(open(self.okfile))['manifest_text'],
+            r' 0:3:foo.txt\n')
index c0033d9186c3a6306ff4251aa3e84b37af6e5109..7dfb84d109957af05f101f01c4dbc94e074457d3 100644 (file)
@@ -43,6 +43,10 @@ type azureVolumeAdder struct {
 }
 
 func (s *azureVolumeAdder) Set(containerName string) error {
+       if trashLifetime != 0 {
+               return ErrNotImplemented
+       }
+
        if containerName == "" {
                return errors.New("no container name given")
        }
@@ -311,11 +315,16 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
        }
 }
 
-// Delete a Keep block.
-func (v *AzureBlobVolume) Delete(loc string) error {
+// Trash a Keep block.
+func (v *AzureBlobVolume) Trash(loc string) error {
        if v.readonly {
                return MethodDisabledError
        }
+
+       if trashLifetime != 0 {
+               return ErrNotImplemented
+       }
+
        // Ideally we would use If-Unmodified-Since, but that
        // particular condition seems to be ignored by Azure. Instead,
        // we get the Etag before checking Mtime, and use If-Match to
@@ -335,6 +344,12 @@ func (v *AzureBlobVolume) Delete(loc string) error {
        })
 }
 
+// Untrash a Keep block.
+// TBD
+func (v *AzureBlobVolume) Untrash(loc string) error {
+       return ErrNotImplemented
+}
+
 // Status returns a VolumeStatus struct with placeholder data.
 func (v *AzureBlobVolume) Status() *VolumeStatus {
        return &VolumeStatus{
index 3817ea19002d1c18f14c2479a383fb2d1601d763..a7675fb1dcfbea5782a40ef6fc5b0d0c8bd93a8f 100644 (file)
@@ -970,3 +970,106 @@ func TestPutReplicationHeader(t *testing.T) {
                t.Errorf("Got X-Keep-Replicas-Stored: %q, expected %q", r, "1")
        }
 }
+
+func TestUntrashHandler(t *testing.T) {
+       defer teardown()
+
+       // Set up Keep volumes
+       KeepVM = MakeTestVolumeManager(2)
+       defer KeepVM.Close()
+       vols := KeepVM.AllWritable()
+       vols[0].Put(TestHash, TestBlock)
+
+       dataManagerToken = "DATA MANAGER TOKEN"
+
+       // unauthenticatedReq => UnauthorizedError
+       unauthenticatedReq := &RequestTester{
+               method: "PUT",
+               uri:    "/untrash/" + TestHash,
+       }
+       response := IssueRequest(unauthenticatedReq)
+       ExpectStatusCode(t,
+               "Unauthenticated request",
+               UnauthorizedError.HTTPCode,
+               response)
+
+       // notDataManagerReq => UnauthorizedError
+       notDataManagerReq := &RequestTester{
+               method:   "PUT",
+               uri:      "/untrash/" + TestHash,
+               apiToken: knownToken,
+       }
+
+       response = IssueRequest(notDataManagerReq)
+       ExpectStatusCode(t,
+               "Non-datamanager token",
+               UnauthorizedError.HTTPCode,
+               response)
+
+       // datamanagerWithBadHashReq => StatusBadRequest
+       datamanagerWithBadHashReq := &RequestTester{
+               method:   "PUT",
+               uri:      "/untrash/thisisnotalocator",
+               apiToken: dataManagerToken,
+       }
+       response = IssueRequest(datamanagerWithBadHashReq)
+       ExpectStatusCode(t,
+               "Bad locator in untrash request",
+               http.StatusBadRequest,
+               response)
+
+       // datamanagerWrongMethodReq => StatusBadRequest
+       datamanagerWrongMethodReq := &RequestTester{
+               method:   "GET",
+               uri:      "/untrash/" + TestHash,
+               apiToken: dataManagerToken,
+       }
+       response = IssueRequest(datamanagerWrongMethodReq)
+       ExpectStatusCode(t,
+               "Only PUT method is supported for untrash",
+               http.StatusBadRequest,
+               response)
+
+       // datamanagerReq => StatusOK
+       datamanagerReq := &RequestTester{
+               method:   "PUT",
+               uri:      "/untrash/" + TestHash,
+               apiToken: dataManagerToken,
+       }
+       response = IssueRequest(datamanagerReq)
+       ExpectStatusCode(t,
+               "",
+               http.StatusOK,
+               response)
+       expected := "Successfully untrashed on: [MockVolume],[MockVolume]"
+       if response.Body.String() != expected {
+               t.Errorf(
+                       "Untrash response mismatched: expected %s, got:\n%s",
+                       expected, response.Body.String())
+       }
+}
+
+func TestUntrashHandlerWithNoWritableVolumes(t *testing.T) {
+       defer teardown()
+
+       // Set up readonly Keep volumes
+       vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()}
+       vols[0].Readonly = true
+       vols[1].Readonly = true
+       KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
+       defer KeepVM.Close()
+
+       dataManagerToken = "DATA MANAGER TOKEN"
+
+       // datamanagerReq => StatusOK
+       datamanagerReq := &RequestTester{
+               method:   "PUT",
+               uri:      "/untrash/" + TestHash,
+               apiToken: dataManagerToken,
+       }
+       response := IssueRequest(datamanagerReq)
+       ExpectStatusCode(t,
+               "No writable volumes",
+               http.StatusNotFound,
+               response)
+}
index 95af1b48707c6b189982dc18762cb517769bd117..043ab69b17c255aa463fe8259a777cec682453f5 100644 (file)
@@ -20,6 +20,7 @@ import (
        "regexp"
        "runtime"
        "strconv"
+       "strings"
        "sync"
        "time"
 )
@@ -53,6 +54,9 @@ func MakeRESTRouter() *mux.Router {
        // Replace the current trash queue.
        rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
 
+       // Untrash moves blocks from trash back into store
+       rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
+
        // Any request which does not match any of these routes gets
        // 400 Bad Request.
        rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
@@ -295,7 +299,7 @@ func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
                Failed  int `json:"copies_failed"`
        }
        for _, vol := range KeepVM.AllWritable() {
-               if err := vol.Delete(hash); err == nil {
+               if err := vol.Trash(hash); err == nil {
                        result.Deleted++
                } else if os.IsNotExist(err) {
                        continue
@@ -430,6 +434,53 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) {
        trashq.ReplaceQueue(tlist)
 }
 
+// UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
+func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
+       // Reject unauthorized requests.
+       if !IsDataManagerToken(GetApiToken(req)) {
+               http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
+               return
+       }
+
+       hash := mux.Vars(req)["hash"]
+
+       if len(KeepVM.AllWritable()) == 0 {
+               http.Error(resp, "No writable volumes", http.StatusNotFound)
+               return
+       }
+
+       var untrashedOn, failedOn []string
+       var numNotFound int
+       for _, vol := range KeepVM.AllWritable() {
+               err := vol.Untrash(hash)
+
+               if os.IsNotExist(err) {
+                       numNotFound++
+               } else if err != nil {
+                       log.Printf("Error untrashing %v on volume %v", hash, vol.String())
+                       failedOn = append(failedOn, vol.String())
+               } else {
+                       log.Printf("Untrashed %v on volume %v", hash, vol.String())
+                       untrashedOn = append(untrashedOn, vol.String())
+               }
+       }
+
+       if numNotFound == len(KeepVM.AllWritable()) {
+               http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
+               return
+       }
+
+       if len(failedOn) == len(KeepVM.AllWritable()) {
+               http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
+       } else {
+               respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
+               if len(failedOn) > 0 {
+                       respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
+               }
+               resp.Write([]byte(respBody))
+       }
+}
+
 // ==============================
 // GetBlock and PutBlock implement lower-level code for handling
 // blocks by rooting through volumes connected to the local machine.
index 96a887fecb20b278a2c9a763ebfc094b71bf31ac..3850e993fc511216d4967b1c757109ced844a591 100644 (file)
@@ -55,6 +55,10 @@ var dataManagerToken string
 // actually deleting anything.
 var neverDelete = true
 
+// trashLifetime is the time duration after a block is trashed
+// during which it can be recovered using an /untrash request
+var trashLifetime time.Duration
+
 var maxBuffers = 128
 var bufs *bufferPool
 
@@ -79,6 +83,7 @@ var (
        SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
        TooLongError        = &KeepError{413, "Block is too large"}
        MethodDisabledError = &KeepError{405, "Method disabled"}
+       ErrNotImplemented   = &KeepError{500, "Unsupported configuration"}
 )
 
 func (e *KeepError) Error() string {
@@ -200,6 +205,11 @@ func main() {
                "max-buffers",
                maxBuffers,
                fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
+       flag.DurationVar(
+               &trashLifetime,
+               "trash-lifetime",
+               0*time.Second,
+               "Interval after a block is trashed during which it can be recovered using an /untrash request")
 
        flag.Parse()
 
index 572ee46e71419693b103801c7e01a8a139a0c69e..7d9ba8ab9ef33bf46888566c6d0c6ae333dba9ae 100644 (file)
@@ -39,6 +39,9 @@ type s3VolumeAdder struct {
 }
 
 func (s *s3VolumeAdder) Set(bucketName string) error {
+       if trashLifetime != 0 {
+               return ErrNotImplemented
+       }
        if bucketName == "" {
                return fmt.Errorf("no container name given")
        }
@@ -257,10 +260,13 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
        return nil
 }
 
-func (v *S3Volume) Delete(loc string) error {
+func (v *S3Volume) Trash(loc string) error {
        if v.readonly {
                return MethodDisabledError
        }
+       if trashLifetime != 0 {
+               return ErrNotImplemented
+       }
        if t, err := v.Mtime(loc); err != nil {
                return err
        } else if time.Since(t) < blobSignatureTTL {
@@ -272,6 +278,11 @@ func (v *S3Volume) Delete(loc string) error {
        return v.Bucket.Del(loc)
 }
 
+// TBD
+func (v *S3Volume) Untrash(loc string) error {
+       return ErrNotImplemented
+}
+
 func (v *S3Volume) Status() *VolumeStatus {
        return &VolumeStatus{
                DeviceNum: 1,
index 65e3fbd2849593e44be94921cb7073a5aba3adaa..62f63d57c8edb655b5078ebf637ce6d0ed0475bb 100644 (file)
@@ -47,7 +47,7 @@ func TrashItem(trashRequest TrashRequest) {
                if neverDelete {
                        err = errors.New("did not delete block because neverDelete is true")
                } else {
-                       err = volume.Delete(trashRequest.Locator)
+                       err = volume.Trash(trashRequest.Locator)
                }
 
                if err != nil {
index 7966c41b92bd89958308ec77765f0b7a5a1f0fd9..58710c04b269a57af236fbb36f5a6aaa61d9b256 100644 (file)
@@ -144,20 +144,21 @@ type Volume interface {
        // particular order.
        IndexTo(prefix string, writer io.Writer) error
 
-       // Delete deletes the block data from the underlying storage
-       // device.
+       // Trash moves the block data from the underlying storage
+       // device to trash area. The block then stays in trash for
+       // -trash-lifetime interval before it is actually deleted.
        //
        // loc is as described in Get.
        //
        // If the timestamp for the given locator is newer than
-       // blobSignatureTTL, Delete must not delete the data.
+       // blobSignatureTTL, Trash must not trash the data.
        //
-       // If a Delete operation overlaps with any Touch or Put
+       // If a Trash operation overlaps with any Touch or Put
        // operations on the same locator, the implementation must
        // ensure one of the following outcomes:
        //
        //   - Touch and Put return a non-nil error, or
-       //   - Delete does not delete the block, or
+       //   - Trash does not trash the block, or
        //   - Both of the above.
        //
        // If it is possible for the storage device to be accessed by
@@ -171,9 +172,12 @@ type Volume interface {
        // reliably or fail outright.
        //
        // Corollary: A successful Touch or Put guarantees a block
-       // will not be deleted for at least blobSignatureTTL
+       // will not be trashed for at least blobSignatureTTL
        // seconds.
-       Delete(loc string) error
+       Trash(loc string) error
+
+       // Untrash moves block from trash back into store
+       Untrash(loc string) error
 
        // Status returns a *VolumeStatus representing the current
        // in-use and available storage capacity and an
index 7580a202594426173ca14d54d9fda8c523171e30..e168940fdd660f9c3487fea26e1decdec8ba9098 100644 (file)
@@ -420,7 +420,7 @@ func testDeleteNewBlock(t TB, factory TestableVolumeFactory) {
 
        v.Put(TestHash, TestBlock)
 
-       if err := v.Delete(TestHash); err != nil {
+       if err := v.Trash(TestHash); err != nil {
                t.Error(err)
        }
        data, err := v.Get(TestHash)
@@ -449,7 +449,7 @@ func testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
        v.Put(TestHash, TestBlock)
        v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
 
-       if err := v.Delete(TestHash); err != nil {
+       if err := v.Trash(TestHash); err != nil {
                t.Error(err)
        }
        if _, err := v.Get(TestHash); err == nil || !os.IsNotExist(err) {
@@ -463,7 +463,7 @@ func testDeleteNoSuchBlock(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
 
-       if err := v.Delete(TestHash2); err == nil {
+       if err := v.Trash(TestHash2); err == nil {
                t.Errorf("Expected error when attempting to delete a non-existing block")
        }
 }
@@ -535,7 +535,7 @@ func testUpdateReadOnly(t TB, factory TestableVolumeFactory) {
        }
 
        // Delete a block from a read-only volume should result in error
-       err = v.Delete(TestHash)
+       err = v.Trash(TestHash)
        if err == nil {
                t.Errorf("Expected error when deleting block from a read-only volume")
        }
index d6714365de5bef98ad082b93f595231993bafa48..53ffeef0bba186d7f995e6e6afb00feb194c5e7f 100644 (file)
@@ -183,7 +183,7 @@ func (v *MockVolume) IndexTo(prefix string, w io.Writer) error {
        return nil
 }
 
-func (v *MockVolume) Delete(loc string) error {
+func (v *MockVolume) Trash(loc string) error {
        v.gotCall("Delete")
        <-v.Gate
        if v.Readonly {
@@ -199,6 +199,11 @@ func (v *MockVolume) Delete(loc string) error {
        return os.ErrNotExist
 }
 
+// TBD
+func (v *MockVolume) Untrash(loc string) error {
+       return nil
+}
+
 func (v *MockVolume) Status() *VolumeStatus {
        var used uint64
        for _, block := range v.Store {
index 910cc25d613cb7690f944b418aebf5c205c7aced..0dd1d82a98ca4b9f14c79d8b96e90f10faf4311f 100644 (file)
@@ -23,6 +23,9 @@ type unixVolumeAdder struct {
 }
 
 func (vs *unixVolumeAdder) Set(value string) error {
+       if trashLifetime != 0 {
+               return ErrNotImplemented
+       }
        if dirs := strings.Split(value, ","); len(dirs) > 1 {
                log.Print("DEPRECATED: using comma-separated volume list.")
                for _, dir := range dirs {
@@ -363,7 +366,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
 }
 
 // Delete deletes the block data from the unix storage
-func (v *UnixVolume) Delete(loc string) error {
+func (v *UnixVolume) Trash(loc string) error {
        // Touch() must be called before calling Write() on a block.  Touch()
        // also uses lockfile().  This avoids a race condition between Write()
        // and Delete() because either (a) the file will be deleted and Touch()
@@ -375,6 +378,9 @@ func (v *UnixVolume) Delete(loc string) error {
        if v.readonly {
                return MethodDisabledError
        }
+       if trashLifetime != 0 {
+               return ErrNotImplemented
+       }
        if v.locker != nil {
                v.locker.Lock()
                defer v.locker.Unlock()
@@ -405,6 +411,12 @@ func (v *UnixVolume) Delete(loc string) error {
        return os.Remove(p)
 }
 
+// Untrash moves block from trash back into store
+// TBD
+func (v *UnixVolume) Untrash(loc string) error {
+       return ErrNotImplemented
+}
+
 // blockDir returns the fully qualified directory name for the directory
 // where loc is (or would be) stored on this volume.
 func (v *UnixVolume) blockDir(loc string) string {
index b216810f8cb0fc1008e4bb7bc99b3c306728d70a..0775e89ed275d14f7e2be510084a52e39af84472 100644 (file)
@@ -166,7 +166,7 @@ func TestUnixVolumeReadonly(t *testing.T) {
                t.Errorf("got err %v, expected MethodDisabledError", err)
        }
 
-       err = v.Delete(TestHash)
+       err = v.Trash(TestHash)
        if err != MethodDisabledError {
                t.Errorf("got err %v, expected MethodDisabledError", err)
        }
index c92fc9b00dd0cc723e4fd64bd2b001daa8bce60c..e1b8c484f0413cb8ff6bfe3ac0be77aebbcd2aa7 100755 (executable)
@@ -68,7 +68,6 @@ begin
   logins.each do |l|
     next if seen[l[:username]]
     seen[l[:username]] = true if not seen.has_key?(l[:username])
-    @homedir = "/home/#{l[:username]}"
 
     unless uids[l[:username]]
       STDERR.puts "Creating account #{l[:username]}"
@@ -85,6 +84,7 @@ begin
                          out: devnull)
     end
     # Create .ssh directory if necessary
+    @homedir = Etc.getpwnam(l[:username]).dir
     userdotssh = File.join(@homedir, ".ssh")
     Dir.mkdir(userdotssh) if !File.exists?(userdotssh)
     @key = "#######################################################################################
@@ -109,4 +109,3 @@ rescue Exception => bang
   puts bang.backtrace.join("\n")
   exit 1
 end
-
index 6319f4bbfc5cece52832842cfda05d9ae9253005..9a9ce588d382f54b9e399b5b280b8ed979557927 100644 (file)
@@ -38,10 +38,8 @@ class RemotePollLoopActor(actor_class):
         super(RemotePollLoopActor, self).__init__()
         self._client = client
         self._timer = timer_actor
-        self._logger = logging.getLogger(self.LOGGER_NAME)
         self._later = self.actor_ref.proxy()
         self._polling_started = False
-        self.log_prefix = "{} (at {})".format(self.__class__.__name__, id(self))
         self.min_poll_wait = poll_wait
         self.max_poll_wait = max_poll_wait
         self.poll_wait = self.min_poll_wait
@@ -50,6 +48,9 @@ class RemotePollLoopActor(actor_class):
         if hasattr(self, '_item_key'):
             self.subscribe_to = self._subscribe_to
 
+    def on_start(self):
+        self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, id(self.actor_urn[9:])))
+
     def _start_polling(self):
         if not self._polling_started:
             self._polling_started = True
@@ -57,22 +58,20 @@ class RemotePollLoopActor(actor_class):
 
     def subscribe(self, subscriber):
         self.all_subscribers.add(subscriber)
-        self._logger.debug("%r subscribed to all events", subscriber)
+        self._logger.debug("%s subscribed to all events", subscriber.actor_ref.actor_urn)
         self._start_polling()
 
     # __init__ exposes this method to the proxy if the subclass defines
     # _item_key.
     def _subscribe_to(self, key, subscriber):
         self.key_subscribers.setdefault(key, set()).add(subscriber)
-        self._logger.debug("%r subscribed to events for '%s'", subscriber, key)
+        self._logger.debug("%s subscribed to events for '%s'", subscriber.actor_ref.actor_urn, key)
         self._start_polling()
 
     def _send_request(self):
         raise NotImplementedError("subclasses must implement request method")
 
     def _got_response(self, response):
-        self._logger.debug("%s got response with %d items",
-                           self.log_prefix, len(response))
         self.poll_wait = self.min_poll_wait
         _notify_subscribers(response, self.all_subscribers)
         if hasattr(self, '_item_key'):
@@ -82,14 +81,14 @@ class RemotePollLoopActor(actor_class):
 
     def _got_error(self, error):
         self.poll_wait = min(self.poll_wait * 2, self.max_poll_wait)
-        return "{} got error: {} - waiting {} seconds".format(
-            self.log_prefix, error, self.poll_wait)
+        return "got error: {} - will try again in {} seconds".format(
+            error, self.poll_wait)
 
     def is_common_error(self, exception):
         return False
 
     def poll(self, scheduled_start=None):
-        self._logger.debug("%s sending poll", self.log_prefix)
+        self._logger.debug("sending request")
         start_time = time.time()
         if scheduled_start is None:
             scheduled_start = start_time
@@ -105,6 +104,9 @@ class RemotePollLoopActor(actor_class):
         else:
             self._got_response(response)
             next_poll = scheduled_start + self.poll_wait
+            self._logger.info("got response with %d items in %s seconds, next poll at %s",
+                              len(response), (time.time() - scheduled_start),
+                              time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_poll)))
         end_time = time.time()
         if next_poll < end_time:  # We've drifted too much; start fresh.
             next_poll = end_time + self.poll_wait
index 6e46bc0f4c6283ab0d73da212529299bdec4ba10..54d6a82bcefa1cf38ad06f58cbbf89fafe55ecd1 100644 (file)
@@ -3,6 +3,7 @@
 from __future__ import absolute_import, print_function
 
 import calendar
+import functools
 import itertools
 import re
 import time
@@ -43,6 +44,69 @@ def arvados_node_missing(arvados_node, fresh_time):
     else:
         return not timestamp_fresh(arvados_timestamp(arvados_node["last_ping_at"]), fresh_time)
 
+class RetryMixin(object):
+    """Retry decorator for an method that makes remote requests.
+
+    Use this function to decorate method, and pass in a tuple of exceptions to
+    catch.  If the original method raises a known cloud driver error, or any of
+    the given exception types, this decorator will either go into a
+    sleep-and-retry loop with exponential backoff either by sleeping (if
+    self._timer is None) or by scheduling retries of the method (if self._timer
+    is a timer actor.)
+
+    """
+    def __init__(self, retry_wait, max_retry_wait,
+                 logger, cloud, timer=None):
+        self.min_retry_wait = retry_wait
+        self.max_retry_wait = max_retry_wait
+        self.retry_wait = retry_wait
+        self._logger = logger
+        self._cloud = cloud
+        self._timer = timer
+
+    @staticmethod
+    def _retry(errors=()):
+        def decorator(orig_func):
+            @functools.wraps(orig_func)
+            def retry_wrapper(self, *args, **kwargs):
+                while True:
+                    try:
+                        ret = orig_func(self, *args, **kwargs)
+                    except Exception as error:
+                        if not (isinstance(error, errors) or
+                                self._cloud.is_cloud_exception(error)):
+                            self.retry_wait = self.min_retry_wait
+                            self._logger.warning(
+                                "Re-raising unknown error (no retry): %s",
+                                error, exc_info=error)
+                            raise
+
+                        self._logger.warning(
+                            "Client error: %s - waiting %s seconds",
+                            error, self.retry_wait, exc_info=error)
+
+                        if self._timer:
+                            start_time = time.time()
+                            # reschedule to be called again
+                            self._timer.schedule(start_time + self.retry_wait,
+                                                 getattr(self._later,
+                                                         orig_func.__name__),
+                                                 *args, **kwargs)
+                        else:
+                            # sleep on it.
+                            time.sleep(self.retry_wait)
+
+                        self.retry_wait = min(self.retry_wait * 2,
+                                              self.max_retry_wait)
+                        if self._timer:
+                            # expect to be called again by timer so don't loop
+                            return
+                    else:
+                        self.retry_wait = self.min_retry_wait
+                        return ret
+            return retry_wrapper
+        return decorator
+
 class ShutdownTimer(object):
     """Keep track of a cloud node's shutdown windows.
 
index b366e79ff834fc825697946cd9f3ae5007ad911f..2ae4fc8923612d474b833fcf9f345b255148ee3d 100644 (file)
@@ -10,66 +10,36 @@ import libcloud.common.types as cloud_types
 import pykka
 
 from .. import \
-    arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, arvados_node_missing
+    arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
+    arvados_node_missing, RetryMixin
 from ...clientactor import _notify_subscribers
 from ... import config
 
-class ComputeNodeStateChangeBase(config.actor_class):
+class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
     """Base class for actors that change a compute node's state.
 
     This base class takes care of retrying changes and notifying
     subscribers when the change is finished.
     """
-    def __init__(self, logger_name, cloud_client, arvados_client, timer_actor,
+    def __init__(self, cloud_client, arvados_client, timer_actor,
                  retry_wait, max_retry_wait):
         super(ComputeNodeStateChangeBase, self).__init__()
+        RetryMixin.__init__(self, retry_wait, max_retry_wait,
+                            None, cloud_client, timer_actor)
         self._later = self.actor_ref.proxy()
-        self._logger = logging.getLogger(logger_name)
-        self._cloud = cloud_client
         self._arvados = arvados_client
-        self._timer = timer_actor
-        self.min_retry_wait = retry_wait
-        self.max_retry_wait = max_retry_wait
-        self.retry_wait = retry_wait
         self.subscribers = set()
 
-    @staticmethod
-    def _retry(errors=()):
-        """Retry decorator for an actor method that makes remote requests.
-
-        Use this function to decorator an actor method, and pass in a
-        tuple of exceptions to catch.  This decorator will schedule
-        retries of that method with exponential backoff if the
-        original method raises a known cloud driver error, or any of the
-        given exception types.
-        """
-        def decorator(orig_func):
-            @functools.wraps(orig_func)
-            def retry_wrapper(self, *args, **kwargs):
-                start_time = time.time()
-                try:
-                    orig_func(self, *args, **kwargs)
-                except Exception as error:
-                    if not (isinstance(error, errors) or
-                            self._cloud.is_cloud_exception(error)):
-                        raise
-                    self._logger.warning(
-                        "Client error: %s - waiting %s seconds",
-                        error, self.retry_wait)
-                    self._timer.schedule(start_time + self.retry_wait,
-                                         getattr(self._later,
-                                                 orig_func.__name__),
-                                         *args, **kwargs)
-                    self.retry_wait = min(self.retry_wait * 2,
-                                          self.max_retry_wait)
-                else:
-                    self.retry_wait = self.min_retry_wait
-            return retry_wrapper
-        return decorator
+    def _set_logger(self):
+        self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
+
+    def on_start(self):
+        self._set_logger()
 
     def _finished(self):
         _notify_subscribers(self._later, self.subscribers)
         self.subscribers = None
+        self._logger.info("finished")
 
     def subscribe(self, subscriber):
         if self.subscribers is None:
@@ -93,6 +63,17 @@ class ComputeNodeStateChangeBase(config.actor_class):
                            'last_action': explanation}},
             ).execute()
 
+    @staticmethod
+    def _finish_on_exception(orig_func):
+        @functools.wraps(orig_func)
+        def finish_wrapper(self, *args, **kwargs):
+            try:
+                return orig_func(self, *args, **kwargs)
+            except Exception as error:
+                self._logger.error("Actor error %s", error)
+                self._finished()
+        return finish_wrapper
+
 
 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
     """Actor to create and set up a cloud compute node.
@@ -107,7 +88,7 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
                  cloud_size, arvados_node=None,
                  retry_wait=1, max_retry_wait=180):
         super(ComputeNodeSetupActor, self).__init__(
-            'arvnodeman.nodeup', cloud_client, arvados_client, timer_actor,
+            cloud_client, arvados_client, timer_actor,
             retry_wait, max_retry_wait)
         self.cloud_size = cloud_size
         self.arvados_node = None
@@ -117,20 +98,23 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
         else:
             self._later.prepare_arvados_node(arvados_node)
 
-    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+    @ComputeNodeStateChangeBase._finish_on_exception
+    @RetryMixin._retry(config.ARVADOS_ERRORS)
     def create_arvados_node(self):
         self.arvados_node = self._arvados.nodes().create(body={}).execute()
         self._later.create_cloud_node()
 
-    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+    @ComputeNodeStateChangeBase._finish_on_exception
+    @RetryMixin._retry(config.ARVADOS_ERRORS)
     def prepare_arvados_node(self, node):
         self.arvados_node = self._clean_arvados_node(
             node, "Prepared by Node Manager")
         self._later.create_cloud_node()
 
-    @ComputeNodeStateChangeBase._retry()
+    @ComputeNodeStateChangeBase._finish_on_exception
+    @RetryMixin._retry()
     def create_cloud_node(self):
-        self._logger.info("Creating cloud node with size %s.",
+        self._logger.info("Sending create_node request for node size %s.",
                           self.cloud_size.name)
         self.cloud_node = self._cloud.create_node(self.cloud_size,
                                                   self.arvados_node)
@@ -139,7 +123,8 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
         self._logger.info("Cloud node %s created.", self.cloud_node.id)
         self._later.update_arvados_node_properties()
 
-    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+    @ComputeNodeStateChangeBase._finish_on_exception
+    @RetryMixin._retry(config.ARVADOS_ERRORS)
     def update_arvados_node_properties(self):
         """Tell Arvados some details about the cloud node.
 
@@ -163,7 +148,7 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
         self._logger.info("%s updated properties.", self.arvados_node['uuid'])
         self._later.post_create()
 
-    @ComputeNodeStateChangeBase._retry()
+    @RetryMixin._retry()
     def post_create(self):
         self._cloud.post_create_node(self.cloud_node)
         self._logger.info("%s post-create work done.", self.cloud_node.id)
@@ -193,7 +178,7 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
         # eligible.  Normal shutdowns based on job demand should be
         # cancellable; shutdowns based on node misbehavior should not.
         super(ComputeNodeShutdownActor, self).__init__(
-            'arvnodeman.nodedown', cloud_client, arvados_client, timer_actor,
+            cloud_client, arvados_client, timer_actor,
             retry_wait, max_retry_wait)
         self._monitor = node_monitor.proxy()
         self.cloud_node = self._monitor.cloud_node.get()
@@ -201,7 +186,11 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
         self.cancel_reason = None
         self.success = None
 
+    def _set_logger(self):
+        self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
+
     def on_start(self):
+        super(ComputeNodeShutdownActor, self).on_start()
         self._later.shutdown_node()
 
     def _arvados_node(self):
@@ -214,38 +203,40 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
 
     def cancel_shutdown(self, reason):
         self.cancel_reason = reason
-        self._logger.info("Cloud node %s shutdown cancelled: %s.",
-                          self.cloud_node.id, reason)
+        self._logger.info("Shutdown cancelled: %s.", reason)
         self._finished(success_flag=False)
 
     def _stop_if_window_closed(orig_func):
         @functools.wraps(orig_func)
         def stop_wrapper(self, *args, **kwargs):
             if (self.cancellable and
-                  (not self._monitor.shutdown_eligible().get())):
+                  (self._monitor.shutdown_eligible().get() is not True)):
                 self._later.cancel_shutdown(self.WINDOW_CLOSED)
                 return None
             else:
                 return orig_func(self, *args, **kwargs)
         return stop_wrapper
 
+    @ComputeNodeStateChangeBase._finish_on_exception
     @_stop_if_window_closed
-    @ComputeNodeStateChangeBase._retry()
+    @RetryMixin._retry()
     def shutdown_node(self):
+        self._logger.info("Starting shutdown")
         if not self._cloud.destroy_node(self.cloud_node):
             if self._cloud.broken(self.cloud_node):
                 self._later.cancel_shutdown(self.NODE_BROKEN)
             else:
                 # Force a retry.
                 raise cloud_types.LibcloudError("destroy_node failed")
-        self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
+        self._logger.info("Shutdown success")
         arv_node = self._arvados_node()
         if arv_node is None:
             self._finished(success_flag=True)
         else:
             self._later.clean_arvados_node(arv_node)
 
-    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+    @ComputeNodeStateChangeBase._finish_on_exception
+    @RetryMixin._retry(config.ARVADOS_ERRORS)
     def clean_arvados_node(self, arvados_node):
         self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
         self._finished(success_flag=True)
@@ -314,7 +305,6 @@ class ComputeNodeMonitorActor(config.actor_class):
     ):
         super(ComputeNodeMonitorActor, self).__init__()
         self._later = self.actor_ref.proxy()
-        self._logger = logging.getLogger('arvnodeman.computenode')
         self._last_log = None
         self._shutdowns = shutdown_timer
         self._cloud_node_fqdn = cloud_fqdn_func
@@ -332,6 +322,13 @@ class ComputeNodeMonitorActor(config.actor_class):
         self.last_shutdown_opening = None
         self._later.consider_shutdown()
 
+    def _set_logger(self):
+        self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
+
+    def on_start(self):
+        self._set_logger()
+        self._timer.schedule(self.cloud_node_start_time + self.boot_fail_after, self._later.consider_shutdown)
+
     def subscribe(self, subscriber):
         self.subscribers.add(subscriber)
 
@@ -358,37 +355,49 @@ class ComputeNodeMonitorActor(config.actor_class):
         return result
 
     def shutdown_eligible(self):
+        """Return True if eligible for shutdown, or a string explaining why the node
+        is not eligible for shutdown."""
+
         if not self._shutdowns.window_open():
-            return False
+            return "shutdown window is not open."
         if self.arvados_node is None:
             # Node is unpaired.
             # If it hasn't pinged Arvados after boot_fail seconds, shut it down
-            return not timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after)
+            if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
+                return "node is still booting, will be considered a failed boot at %s" % time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.cloud_node_start_time + self.boot_fail_after))
+            else:
+                return True
         missing = arvados_node_missing(self.arvados_node, self.node_stale_after)
         if missing and self._cloud.broken(self.cloud_node):
             # Node is paired, but Arvados says it is missing and the cloud says the node
             # is in an error state, so shut it down.
             return True
         if missing is None and self._cloud.broken(self.cloud_node):
-            self._logger.warning(
-                "cloud reports broken node, but paired node %s never pinged "
-                "(bug?) -- skipped check for node_stale_after",
+            self._logger.info(
+                "Cloud node considered 'broken' but paired node %s last_ping_at is None, " +
+                "cannot check node_stale_after (node may be shut down and we just haven't gotten the message yet).",
                 self.arvados_node['uuid'])
-        return self.in_state('idle')
+        if self.in_state('idle'):
+            return True
+        else:
+            return "node is not idle."
 
     def consider_shutdown(self):
-        next_opening = self._shutdowns.next_opening()
-        if self.shutdown_eligible():
-            self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
-            _notify_subscribers(self._later, self.subscribers)
-        elif self._shutdowns.window_open():
-            self._debug("Node %s shutdown window open but node busy.",
-                        self.cloud_node.id)
-        elif self.last_shutdown_opening != next_opening:
-            self._debug("Node %s shutdown window closed.  Next at %s.",
-                        self.cloud_node.id, time.ctime(next_opening))
-            self._timer.schedule(next_opening, self._later.consider_shutdown)
-            self.last_shutdown_opening = next_opening
+        try:
+            next_opening = self._shutdowns.next_opening()
+            eligible = self.shutdown_eligible()
+            if eligible is True:
+                self._debug("Suggesting shutdown.")
+                _notify_subscribers(self._later, self.subscribers)
+            elif self._shutdowns.window_open():
+                self._debug("Cannot shut down because %s", eligible)
+            elif self.last_shutdown_opening != next_opening:
+                self._debug("Shutdown window closed.  Next at %s.",
+                            time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
+                self._timer.schedule(next_opening, self._later.consider_shutdown)
+                self.last_shutdown_opening = next_opening
+        except Exception:
+            self._logger.exception("Unexpected exception")
 
     def offer_arvados_pair(self, arvados_node):
         first_ping_s = arvados_node.get('first_ping_at')
index 919b57f42c8973bab91de742d1fee48598296f35..4d70436801564e9a35675e95c18f33fddc125806 100644 (file)
@@ -8,6 +8,7 @@ import time
 from . import \
     ComputeNodeSetupActor, ComputeNodeUpdateActor, ComputeNodeMonitorActor
 from . import ComputeNodeShutdownActor as ShutdownActorBase
+from .. import RetryMixin
 
 class ComputeNodeShutdownActor(ShutdownActorBase):
     SLURM_END_STATES = frozenset(['down\n', 'down*\n',
@@ -21,6 +22,7 @@ class ComputeNodeShutdownActor(ShutdownActorBase):
             self._nodename = None
             return super(ComputeNodeShutdownActor, self).on_start()
         else:
+            self._set_logger()
             self._nodename = arv_node['hostname']
             self._logger.info("Draining SLURM node %s", self._nodename)
             self._later.issue_slurm_drain()
@@ -42,7 +44,7 @@ class ComputeNodeShutdownActor(ShutdownActorBase):
     # of the excessive memory usage that result in the "Cannot allocate memory"
     # error are still being investigated.
 
-    @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError))
+    @RetryMixin._retry((subprocess.CalledProcessError, OSError))
     def cancel_shutdown(self, reason):
         if self._nodename:
             if self._get_slurm_state() in self.SLURM_DRAIN_STATES:
@@ -54,14 +56,14 @@ class ComputeNodeShutdownActor(ShutdownActorBase):
                 pass
         return super(ComputeNodeShutdownActor, self).cancel_shutdown(reason)
 
-    @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError))
+    @RetryMixin._retry((subprocess.CalledProcessError, OSError))
     @ShutdownActorBase._stop_if_window_closed
     def issue_slurm_drain(self):
         self._set_node_state('DRAIN', 'Reason=Node Manager shutdown')
         self._logger.info("Waiting for SLURM node %s to drain", self._nodename)
         self._later.await_slurm_drain()
 
-    @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError))
+    @RetryMixin._retry((subprocess.CalledProcessError, OSError))
     @ShutdownActorBase._stop_if_window_closed
     def await_slurm_drain(self):
         output = self._get_slurm_state()
index 66ffb8099cb82d674381ac587d41454d6457e1f4..c98c95af66d89b257be6f7e79d4fd4371138281c 100644 (file)
@@ -2,14 +2,16 @@
 
 from __future__ import absolute_import, print_function
 
+import logging
 from operator import attrgetter
 
 import libcloud.common.types as cloud_types
 from libcloud.compute.base import NodeDriver, NodeAuthSSHKey
 
 from ...config import NETWORK_ERRORS
+from .. import RetryMixin
 
-class BaseComputeNodeDriver(object):
+class BaseComputeNodeDriver(RetryMixin):
     """Abstract base class for compute node drivers.
 
     libcloud drivers abstract away many of the differences between
@@ -24,7 +26,16 @@ class BaseComputeNodeDriver(object):
     """
     CLOUD_ERRORS = NETWORK_ERRORS + (cloud_types.LibcloudError,)
 
-    def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class):
+    @RetryMixin._retry()
+    def _create_driver(self, driver_class, **auth_kwargs):
+        return driver_class(**auth_kwargs)
+
+    @RetryMixin._retry()
+    def _set_sizes(self):
+        self.sizes = {sz.id: sz for sz in self.real.list_sizes()}
+
+    def __init__(self, auth_kwargs, list_kwargs, create_kwargs,
+                 driver_class, retry_wait=1, max_retry_wait=180):
         """Base initializer for compute node drivers.
 
         Arguments:
@@ -37,7 +48,12 @@ class BaseComputeNodeDriver(object):
           libcloud driver's create_node method to create a new compute node.
         * driver_class: The class of a libcloud driver to use.
         """
-        self.real = driver_class(**auth_kwargs)
+
+        super(BaseComputeNodeDriver, self).__init__(retry_wait, max_retry_wait,
+                                         logging.getLogger(self.__class__.__name__),
+                                         type(self),
+                                         None)
+        self.real = self._create_driver(driver_class, **auth_kwargs)
         self.list_kwargs = list_kwargs
         self.create_kwargs = create_kwargs
         # Transform entries in create_kwargs.  For each key K, if this class
@@ -53,7 +69,7 @@ class BaseComputeNodeDriver(object):
                 if new_pair is not None:
                     self.create_kwargs[new_pair[0]] = new_pair[1]
 
-        self.sizes = {sz.id: sz for sz in self.real.list_sizes()}
+        self._set_sizes()
 
     def _init_ping_host(self, ping_host):
         self.ping_host = ping_host
@@ -115,11 +131,35 @@ class BaseComputeNodeDriver(object):
             self.ping_host, arvados_node['uuid'],
             arvados_node['info']['ping_secret'])
 
+    def find_node(self, name):
+        node = [n for n in self.list_nodes() if n.name == name]
+        if node:
+            return node[0]
+        else:
+            return None
+
     def create_node(self, size, arvados_node):
-        kwargs = self.create_kwargs.copy()
-        kwargs.update(self.arvados_create_kwargs(size, arvados_node))
-        kwargs['size'] = size
-        return self.real.create_node(**kwargs)
+        try:
+            kwargs = self.create_kwargs.copy()
+            kwargs.update(self.arvados_create_kwargs(size, arvados_node))
+            kwargs['size'] = size
+            return self.real.create_node(**kwargs)
+        except self.CLOUD_ERRORS:
+            # Workaround for bug #6702: sometimes the create node request
+            # succeeds but times out and raises an exception instead of
+            # returning a result.  If this happens, we get stuck in a retry
+            # loop forever because subsequent create_node attempts will fail
+            # due to node name collision.  So check if the node we intended to
+            # create shows up in the cloud node list and return it if found.
+            try:
+                node = self.find_node(kwargs['name'])
+                if node:
+                    return node
+            except:
+                # Ignore possible exception from find_node in favor of
+                # re-raising the original create_node exception.
+                pass
+            raise
 
     def post_create_node(self, cloud_node):
         # ComputeNodeSetupActor calls this method after the cloud node is
index 991a2983c7217f1a29368293513587d117d01d59..d89c48e270bcc119638c70fc3d5f2928fbe1f8e3 100644 (file)
@@ -75,6 +75,9 @@ class ComputeNodeDriver(BaseComputeNodeDriver):
         self.real.ex_create_tags(cloud_node,
                                  {'Name': arvados_node_fqdn(arvados_node)})
 
+    def find_node(self, name):
+        raise NotImplementedError("ec2.ComputeNodeDriver.find_node")
+
     def list_nodes(self):
         # Need to populate Node.size
         nodes = super(ComputeNodeDriver, self).list_nodes()
index be3f3f1c133531bf3aececc99b2e998ffca838e1..c5bf0b8cda42d211adcfbb61ffb3d73f460a7830 100644 (file)
@@ -101,6 +101,7 @@ class ComputeNodeDriver(BaseComputeNodeDriver):
                 })
         return result
 
+
     def list_nodes(self):
         # The GCE libcloud driver only supports filtering node lists by zone.
         # Do our own filtering based on tag list.
index 243d3bfaa4cd13fbf9a540affc20ae6e562979a9..0993c479625f23a209c90412fa4426ff2c406d23 100644 (file)
@@ -121,7 +121,6 @@ class NodeManagerDaemonActor(actor_class):
         self._new_arvados = arvados_factory
         self._new_cloud = cloud_factory
         self._cloud_driver = self._new_cloud()
-        self._logger = logging.getLogger('arvnodeman.daemon')
         self._later = self.actor_ref.proxy()
         self.shutdown_windows = shutdown_windows
         self.server_calculator = server_calculator
@@ -143,14 +142,18 @@ class NodeManagerDaemonActor(actor_class):
         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
         self.booted = {}        # Cloud node IDs to _ComputeNodeRecords
         self.shutdowns = {}     # Cloud node IDs to ComputeNodeShutdownActors
-        self._logger.debug("Daemon initialized")
+        self.sizes_booting_shutdown = {} # Actor IDs or Cloud node IDs to node size
+
+    def on_start(self):
+        self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
+        self._logger.debug("Daemon started")
 
     def _update_poll_time(self, poll_key):
         self.last_polls[poll_key] = time.time()
 
     def _pair_nodes(self, node_record, arvados_node):
-        self._logger.info("Cloud node %s has associated with Arvados node %s",
-                          node_record.cloud_node.id, arvados_node['uuid'])
+        self._logger.info("Cloud node %s is now paired with Arvados node %s",
+                          node_record.cloud_node.name, arvados_node['uuid'])
         self._arvados_nodes_actor.subscribe_to(
             arvados_node['uuid'], node_record.actor.update_arvados_node)
         node_record.arvados_node = arvados_node
@@ -198,6 +201,7 @@ class NodeManagerDaemonActor(actor_class):
                 except pykka.ActorDeadError:
                     pass
                 del self.shutdowns[key]
+                del self.sizes_booting_shutdown[key]
             record.actor.stop()
             record.cloud_node = None
 
@@ -214,21 +218,33 @@ class NodeManagerDaemonActor(actor_class):
                     self._pair_nodes(cloud_rec, arv_node)
                     break
 
-    def _nodes_up(self, size):
-        up = 0
-        up += sum(1
-                  for c in self.booting.itervalues()
-                  if size is None or c.cloud_size.get().id == size.id)
-        up += sum(1
-                  for i in (self.booted, self.cloud_nodes.nodes)
-                  for c in i.itervalues()
+    def _nodes_booting(self, size):
+        s = sum(1
+                for c in self.booting.iterkeys()
+                if size is None or self.sizes_booting_shutdown[c].id == size.id)
+        s += sum(1
+                 for c in self.booted.itervalues()
+                 if size is None or c.cloud_node.size.id == size.id)
+        return s
+
+    def _nodes_unpaired(self, size):
+        return sum(1
+                   for c in self.cloud_nodes.unpaired()
+                   if size is None or c.cloud_node.size.id == size.id)
+
+    def _nodes_booted(self, size):
+        return sum(1
+                  for c in self.cloud_nodes.nodes.itervalues()
                   if size is None or c.cloud_node.size.id == size.id)
+
+    def _nodes_up(self, size):
+        up = self._nodes_booting(size) + self._nodes_booted(size)
         return up
 
     def _total_price(self):
         cost = 0
-        cost += sum(self.server_calculator.find_size(c.cloud_size.get().id).price
-                  for c in self.booting.itervalues())
+        cost += sum(self.server_calculator.find_size(self.sizes_booting_shutdown[c].id).price
+                  for c in self.booting.iterkeys())
         cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price
                     for i in (self.booted, self.cloud_nodes.nodes)
                     for c in i.itervalues())
@@ -253,9 +269,9 @@ class NodeManagerDaemonActor(actor_class):
 
     def _size_shutdowns(self, size):
         sh = 0
-        for c in self.shutdowns.itervalues():
+        for c in self.shutdowns.iterkeys():
             try:
-                if c.cloud_node.get().size.id == size.id:
+                if self.sizes_booting_shutdown[c].id == size.id:
                     sh += 1
             except pykka.ActorDeadError:
                 pass
@@ -272,11 +288,18 @@ class NodeManagerDaemonActor(actor_class):
         elif under_min > 0 and size.id == self.min_cloud_size.id:
             return under_min
 
-        up_count = self._nodes_up(size) - (self._size_shutdowns(size) +
-                                           self._nodes_busy(size) +
-                                           self._nodes_missing(size))
+        booting_count = self._nodes_booting(size) + self._nodes_unpaired(size)
+        shutdown_count = self._size_shutdowns(size)
+        busy_count = self._nodes_busy(size)
+        up_count = self._nodes_up(size) - (shutdown_count + busy_count + self._nodes_missing(size))
 
-        self._logger.debug("%s: idle nodes %i, wishlist size %i", size.name, up_count, self._size_wishlist(size))
+        self._logger.info("%s: wishlist %i, up %i (booting %i, idle %i, busy %i), shutting down %i", size.name,
+                          self._size_wishlist(size),
+                          up_count + busy_count,
+                          booting_count,
+                          up_count - booting_count,
+                          busy_count,
+                          shutdown_count)
 
         wanted = self._size_wishlist(size) - up_count
         if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
@@ -298,11 +321,14 @@ class NodeManagerDaemonActor(actor_class):
         self._update_poll_time('server_wishlist')
         self.last_wishlist = wishlist
         for size in reversed(self.server_calculator.cloud_sizes):
-            nodes_wanted = self._nodes_wanted(size)
-            if nodes_wanted > 0:
-                self._later.start_node(size)
-            elif (nodes_wanted < 0) and self.booting:
-                self._later.stop_booting_node(size)
+            try:
+                nodes_wanted = self._nodes_wanted(size)
+                if nodes_wanted > 0:
+                    self._later.start_node(size)
+                elif (nodes_wanted < 0) and self.booting:
+                    self._later.stop_booting_node(size)
+            except Exception as e:
+                self._logger.exception("while calculating nodes wanted for size %s", size)
 
     def _check_poll_freshness(orig_func):
         """Decorator to inhibit a method when poll information is stale.
@@ -327,7 +353,7 @@ class NodeManagerDaemonActor(actor_class):
         if nodes_wanted < 1:
             return None
         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
-        self._logger.info("Want %s more nodes.  Booting a %s node.",
+        self._logger.info("Want %i more %s nodes.  Booting a node.",
                           nodes_wanted, cloud_size.name)
         new_setup = self._node_setup.start(
             timer_actor=self._timer,
@@ -336,6 +362,8 @@ class NodeManagerDaemonActor(actor_class):
             cloud_client=self._new_cloud(),
             cloud_size=cloud_size).proxy()
         self.booting[new_setup.actor_ref.actor_urn] = new_setup
+        self.sizes_booting_shutdown[new_setup.actor_ref.actor_urn] = cloud_size
+
         if arvados_node is not None:
             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
                 time.time())
@@ -349,13 +377,16 @@ class NodeManagerDaemonActor(actor_class):
     def node_up(self, setup_proxy):
         cloud_node = setup_proxy.cloud_node.get()
         del self.booting[setup_proxy.actor_ref.actor_urn]
+        del self.sizes_booting_shutdown[setup_proxy.actor_ref.actor_urn]
+
         setup_proxy.stop()
-        record = self.cloud_nodes.get(cloud_node.id)
-        if record is None:
-            record = self._new_node(cloud_node)
-            self.booted[cloud_node.id] = record
-        self._timer.schedule(time.time() + self.boot_fail_after,
-                             self._later.shutdown_unpaired_node, cloud_node.id)
+        if cloud_node is not None:
+            record = self.cloud_nodes.get(cloud_node.id)
+            if record is None:
+                record = self._new_node(cloud_node)
+                self.booted[cloud_node.id] = record
+            self._timer.schedule(time.time() + self.boot_fail_after,
+                                 self._later.shutdown_unpaired_node, cloud_node.id)
 
     @_check_poll_freshness
     def stop_booting_node(self, size):
@@ -365,12 +396,15 @@ class NodeManagerDaemonActor(actor_class):
         for key, node in self.booting.iteritems():
             if node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
                 del self.booting[key]
+                del self.sizes_booting_shutdown[key]
+
                 if nodes_excess > 1:
                     self._later.stop_booting_node(size)
                 break
 
     def _begin_node_shutdown(self, node_actor, cancellable):
-        cloud_node_id = node_actor.cloud_node.get().id
+        cloud_node_obj = node_actor.cloud_node.get()
+        cloud_node_id = cloud_node_obj.id
         if cloud_node_id in self.shutdowns:
             return None
         shutdown = self._node_shutdown.start(
@@ -378,6 +412,7 @@ class NodeManagerDaemonActor(actor_class):
             arvados_client=self._new_arvados(),
             node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
         self.shutdowns[cloud_node_id] = shutdown
+        self.sizes_booting_shutdown[cloud_node_id] = cloud_node_obj.size
         shutdown.subscribe(self._later.node_finished_shutdown)
 
     @_check_poll_freshness
@@ -404,9 +439,11 @@ class NodeManagerDaemonActor(actor_class):
             if cancel_reason == self._node_shutdown.NODE_BROKEN:
                 self.cloud_nodes.blacklist(cloud_node_id)
             del self.shutdowns[cloud_node_id]
+            del self.sizes_booting_shutdown[cloud_node_id]
         elif cloud_node_id in self.booted:
             self.booted.pop(cloud_node_id).actor.stop()
             del self.shutdowns[cloud_node_id]
+            del self.sizes_booting_shutdown[cloud_node_id]
 
     def shutdown(self):
         self._logger.info("Shutting down after signal.")
index e0f0a5b1ec26045745fe479f76b8614eb700875d..87cf738311730feed045d23c63af6481c0731e06 100644 (file)
@@ -102,7 +102,6 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
     """
 
     CLIENT_ERRORS = ARVADOS_ERRORS
-    LOGGER_NAME = 'arvnodeman.jobqueue'
 
     def __init__(self, client, timer_actor, server_calc, *args, **kwargs):
         super(JobQueueMonitorActor, self).__init__(
@@ -114,6 +113,6 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
 
     def _got_response(self, queue):
         server_list = self._calculator.servers_for_queue(queue)
-        self._logger.debug("Sending server wishlist: %s",
+        self._logger.debug("Calculated wishlist: %s",
                            ', '.join(s.name for s in server_list) or "(empty)")
         return super(JobQueueMonitorActor, self)._got_response(server_list)
index e8c2fe661e5203469a1ee87341159aeb6cdc1aec..c8b3d19485b2c9cb8d6ee6e4353ddeb2c0b9c560 100644 (file)
@@ -103,32 +103,36 @@ def main(args=None):
     for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
         signal.signal(sigcode, shutdown_signal)
 
-    setup_logging(config.get('Logging', 'file'), **config.log_levels())
-    node_setup, node_shutdown, node_update, node_monitor = \
-        config.dispatch_classes()
-    server_calculator = build_server_calculator(config)
-    timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
-        launch_pollers(config, server_calculator)
-    cloud_node_updater = node_update.start(config.new_cloud_client).proxy()
-    node_daemon = NodeManagerDaemonActor.start(
-        job_queue_poller, arvados_node_poller, cloud_node_poller,
-        cloud_node_updater, timer,
-        config.new_arvados_client, config.new_cloud_client,
-        config.shutdown_windows(),
-        server_calculator,
-        config.getint('Daemon', 'min_nodes'),
-        config.getint('Daemon', 'max_nodes'),
-        config.getint('Daemon', 'poll_stale_after'),
-        config.getint('Daemon', 'boot_fail_after'),
-        config.getint('Daemon', 'node_stale_after'),
-        node_setup, node_shutdown, node_monitor,
-        max_total_price=config.getfloat('Daemon', 'max_total_price')).proxy()
-
-    signal.pause()
-    daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
-    while not daemon_stopped():
-        time.sleep(1)
-    pykka.ActorRegistry.stop_all()
+    try:
+        setup_logging(config.get('Logging', 'file'), **config.log_levels())
+        node_setup, node_shutdown, node_update, node_monitor = \
+            config.dispatch_classes()
+        server_calculator = build_server_calculator(config)
+        timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
+            launch_pollers(config, server_calculator)
+        cloud_node_updater = node_update.start(config.new_cloud_client).proxy()
+        node_daemon = NodeManagerDaemonActor.start(
+            job_queue_poller, arvados_node_poller, cloud_node_poller,
+            cloud_node_updater, timer,
+            config.new_arvados_client, config.new_cloud_client,
+            config.shutdown_windows(),
+            server_calculator,
+            config.getint('Daemon', 'min_nodes'),
+            config.getint('Daemon', 'max_nodes'),
+            config.getint('Daemon', 'poll_stale_after'),
+            config.getint('Daemon', 'boot_fail_after'),
+            config.getint('Daemon', 'node_stale_after'),
+            node_setup, node_shutdown, node_monitor,
+            max_total_price=config.getfloat('Daemon', 'max_total_price')).proxy()
+
+        signal.pause()
+        daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
+        while not daemon_stopped():
+            time.sleep(1)
+    except Exception:
+        logging.exception("Uncaught exception during setup")
+    finally:
+        pykka.ActorRegistry.stop_all()
 
 
 if __name__ == '__main__':
index 83dd93f077bfb504a8f41b6f0addd93c717ac7da..f1a661e559247e263a10acdd3eea1271f282bc58 100644 (file)
@@ -11,7 +11,6 @@ class ArvadosNodeListMonitorActor(clientactor.RemotePollLoopActor):
     This actor regularly polls the list of Arvados node records, and
     sends it to subscribers.
     """
-    LOGGER_NAME = 'arvnodeman.arvados_nodes'
 
     def is_common_error(self, exception):
         return isinstance(exception, config.ARVADOS_ERRORS)
@@ -29,7 +28,6 @@ class CloudNodeListMonitorActor(clientactor.RemotePollLoopActor):
     This actor regularly polls the cloud to get a list of running compute
     nodes, and sends it to subscribers.
     """
-    LOGGER_NAME = 'arvnodeman.cloud_nodes'
 
     def is_common_error(self, exception):
         return self._client.is_cloud_exception(exception)
@@ -38,4 +36,5 @@ class CloudNodeListMonitorActor(clientactor.RemotePollLoopActor):
         return node.id
 
     def _send_request(self):
-        return self._client.list_nodes()
+        n = self._client.list_nodes()
+        return n
index 57a0d32d062b87276af90c5945b0d60ceaba8e88..cee9c85a221ad3d33ac914ed02c1bf33ea00a3f1 100644 (file)
@@ -47,7 +47,8 @@ class RemotePollLoopActorTestCase(testutil.RemotePollLoopActorTestMixin,
 
     def test_late_subscribers_get_responses(self):
         self.build_monitor(['pre_late_test', 'late_test'])
-        self.monitor.subscribe(lambda response: None).get(self.TIMEOUT)
+        mock_subscriber = mock.Mock(name='mock_subscriber')
+        self.monitor.subscribe(mock_subscriber).get(self.TIMEOUT)
         self.monitor.subscribe(self.subscriber)
         self.monitor.poll().get(self.TIMEOUT)
         self.stop_proxy(self.monitor)
@@ -146,4 +147,3 @@ class RemotePollLoopActorWithKeysTestCase(testutil.RemotePollLoopActorTestMixin,
 
 if __name__ == '__main__':
     unittest.main()
-
index ecf83c693a5a9da5121b4c26bbb7318e197298fe..9c8af19ea315df129f0c0365862cd8c5fefcab52 100644 (file)
@@ -9,6 +9,7 @@ import arvados.errors as arverror
 import httplib2
 import mock
 import pykka
+import threading
 
 import arvnodeman.computenode.dispatch as dispatch
 from . import testutil
@@ -44,8 +45,11 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
 
     def test_creation_without_arvados_node(self):
         self.make_actor()
+        finished = threading.Event()
+        self.setup_actor.subscribe(lambda _: finished.set())
         self.assertEqual(self.arvados_effect[-1],
                          self.setup_actor.arvados_node.get(self.TIMEOUT))
+        assert(finished.wait(self.TIMEOUT))
         self.assertEqual(1, self.api_client.nodes().create().execute.call_count)
         self.assertEqual(1, self.api_client.nodes().update().execute.call_count)
         self.assert_node_properties_updated()
@@ -55,8 +59,11 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
     def test_creation_with_arvados_node(self):
         self.make_mocks(arvados_effect=[testutil.arvados_node_mock()]*2)
         self.make_actor(testutil.arvados_node_mock())
+        finished = threading.Event()
+        self.setup_actor.subscribe(lambda _: finished.set())
         self.assertEqual(self.arvados_effect[-1],
                          self.setup_actor.arvados_node.get(self.TIMEOUT))
+        assert(finished.wait(self.TIMEOUT))
         self.assert_node_properties_updated()
         self.assertEqual(2, self.api_client.nodes().update().execute.call_count)
         self.assertEqual(self.cloud_client.create_node(),
@@ -339,7 +346,7 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
     def test_no_shutdown_booting(self):
         self.make_actor()
         self.shutdowns._set_state(True, 600)
-        self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+        self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is still booting"))
 
     def test_shutdown_without_arvados_node(self):
         self.make_actor(start_time=0)
@@ -352,7 +359,7 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
                                               last_ping_at='1970-01-01T01:02:03.04050607Z')
         self.make_actor(10, arv_node)
         self.shutdowns._set_state(True, 600)
-        self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+        self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is not idle."))
 
     def test_no_shutdown_running_broken(self):
         arv_node = testutil.arvados_node_mock(12, job_uuid=None,
@@ -360,7 +367,7 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
         self.make_actor(12, arv_node)
         self.shutdowns._set_state(True, 600)
         self.cloud_client.broken.return_value = True
-        self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+        self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is not idle."))
 
     def test_shutdown_missing_broken(self):
         arv_node = testutil.arvados_node_mock(11, job_uuid=None,
@@ -373,23 +380,23 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
 
     def test_no_shutdown_when_window_closed(self):
         self.make_actor(3, testutil.arvados_node_mock(3, job_uuid=None))
-        self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+        self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("shutdown window is not open."))
 
     def test_no_shutdown_when_node_running_job(self):
         self.make_actor(4, testutil.arvados_node_mock(4, job_uuid=True))
         self.shutdowns._set_state(True, 600)
-        self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+        self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is not idle."))
 
     def test_no_shutdown_when_node_state_unknown(self):
         self.make_actor(5, testutil.arvados_node_mock(
             5, crunch_worker_state=None))
         self.shutdowns._set_state(True, 600)
-        self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+        self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is not idle."))
 
     def test_no_shutdown_when_node_state_stale(self):
         self.make_actor(6, testutil.arvados_node_mock(6, age=90000))
         self.shutdowns._set_state(True, 600)
-        self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+        self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is not idle."))
 
     def test_arvados_node_match(self):
         self.make_actor(2)
index 3ef152ef874bb17c7a368c68827e665851c1e2f8..5721abc5f87efeaf029c2eb476bb0fcdf6a14f2a 100644 (file)
@@ -100,3 +100,13 @@ echo compute-000000000000063-zzzzz > /var/tmp/arv-node-data/meta-data/instance-i
 echo z1.test > /var/tmp/arv-node-data/meta-data/instance-type
 """,
                          driver.arvados_create_kwargs(testutil.MockSize(1), arv_node)['ex_customdata'])
+
+    def test_create_raises_but_actually_succeeded(self):
+        arv_node = testutil.arvados_node_mock(1, hostname=None)
+        driver = self.new_driver(create_kwargs={"tag_arvados-class": "dynamic-compute"})
+        nodelist = [testutil.cloud_node_mock(1, tags={"arvados-class": "dynamic-compute"})]
+        nodelist[0].name = 'compute-000000000000001-zzzzz'
+        self.driver_mock().list_nodes.return_value = nodelist
+        self.driver_mock().create_node.side_effect = IOError
+        n = driver.create_node(testutil.MockSize(1), arv_node)
+        self.assertEqual('compute-000000000000001-zzzzz', n.name)
index 41cb1aac8623ee8617f0464623aa4ce3fd3a7d28..e8b2fa36c582876359fa6e667f80e9a7cb1f3013 100644 (file)
@@ -48,6 +48,16 @@ class GCEComputeNodeDriverTestCase(testutil.DriverTestMixin, unittest.TestCase):
         metadata = self.driver_mock().create_node.call_args[1]['ex_metadata']
         self.assertIn('ping_secret=ssshh', metadata.get('arv-ping-url'))
 
+    def test_create_raises_but_actually_succeeded(self):
+        arv_node = testutil.arvados_node_mock(1, hostname=None)
+        driver = self.new_driver()
+        nodelist = [testutil.cloud_node_mock(1)]
+        nodelist[0].name = 'compute-000000000000001-zzzzz'
+        self.driver_mock().list_nodes.return_value = nodelist
+        self.driver_mock().create_node.side_effect = IOError
+        n = driver.create_node(testutil.MockSize(1), arv_node)
+        self.assertEqual('compute-000000000000001-zzzzz', n.name)
+
     def test_create_sets_default_hostname(self):
         driver = self.new_driver()
         driver.create_node(testutil.MockSize(1),
index 200919bfb819183dfa8b50b426c7d33d60d64db9..f41fa6cb1af57b6b1cb005a09bec95207ace0b14 100644 (file)
@@ -208,6 +208,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         mock_shutdown = self.node_shutdown.start(node_monitor=mock_node_monitor)
 
         self.daemon.shutdowns.get()[cloud_nodes[1].id] = mock_shutdown.proxy()
+        self.daemon.sizes_booting_shutdown.get()[cloud_nodes[1].id] = size
 
         self.assertEqual(2, self.alive_monitor_count())
         for mon_ref in self.monitor_list():
index e543c2891698c6f0626c3bf5f444cd1c0b49a86b..6cde766fa312f5b0e07ba53148a93844d26dbf47 100644 (file)
@@ -130,6 +130,14 @@ class DriverTestMixin(object):
     def driver_method_args(self, method_name):
         return getattr(self.driver_mock(), method_name).call_args
 
+    def test_driver_create_retry(self):
+        with mock.patch('time.sleep'):
+            driver_mock2 = mock.MagicMock(name='driver_mock2')
+            self.driver_mock.side_effect = (Exception("oops"), driver_mock2)
+            kwargs = {'user_id': 'foo'}
+            driver = self.new_driver(auth_kwargs=kwargs)
+            self.assertTrue(self.driver_mock.called)
+            self.assertIs(driver.real, driver_mock2)
 
 class RemotePollLoopActorTestMixin(ActorTestMixin):
     def build_monitor(self, *args, **kwargs):
index 4db8152d090ecd7b43f8be48fd2db363d03a2483..325ee59e854dfe36a83b406297d37047c7a90718 100644 (file)
@@ -1 +1,2 @@
 include agpl-3.0.txt
+include crunchstat_summary/chartjs.js
index 662d7835cc0a1878cb3fff7fa2ef468ccc1f8087..e16bd8e0a23776e7561787c6c4eca636ad2d5be6 100755 (executable)
@@ -4,9 +4,12 @@ from __future__ import print_function
 
 import crunchstat_summary.command
 import crunchstat_summary.summarizer
+import logging
 import sys
 
+logging.getLogger().addHandler(logging.StreamHandler())
+
 args = crunchstat_summary.command.ArgumentParser().parse_args(sys.argv[1:])
-s = crunchstat_summary.summarizer.Summarizer(args)
-s.run()
-print(s.report(), end='')
+cmd = crunchstat_summary.command.Command(args)
+cmd.run()
+print(cmd.report(), end='')
index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..c10988db2c0609f836a2f8d210ac0ac7375d8fcd 100644 (file)
@@ -0,0 +1,4 @@
+import logging
+
+logger = logging.getLogger(__name__)
+logger.addHandler(logging.NullHandler())
diff --git a/tools/crunchstat-summary/crunchstat_summary/chartjs.js b/tools/crunchstat-summary/crunchstat_summary/chartjs.js
new file mode 100644 (file)
index 0000000..72ff7a4
--- /dev/null
@@ -0,0 +1,27 @@
+window.onload = function() {
+    var charts = {};
+    sections.forEach(function(section, section_idx) {
+        var h1 = document.createElement('h1');
+        h1.appendChild(document.createTextNode(section.label));
+        document.body.appendChild(h1);
+        section.charts.forEach(function(data, chart_idx) {
+            // Skip chart if every series has zero data points
+            if (0 == data.data.reduce(function(len, series) {
+                return len + series.dataPoints.length;
+            }, 0)) {
+                return;
+            }
+            var id = 'chart-'+section_idx+'-'+chart_idx;
+            var div = document.createElement('div');
+            div.setAttribute('id', id);
+            div.setAttribute('style', 'width: 100%; height: 150px');
+            document.body.appendChild(div);
+            charts[id] = new CanvasJS.Chart(id, data);
+            charts[id].render();
+        });
+    });
+
+    if (typeof window.debug === 'undefined')
+        window.debug = {};
+    window.debug.charts = charts;
+};
diff --git a/tools/crunchstat-summary/crunchstat_summary/chartjs.py b/tools/crunchstat-summary/crunchstat_summary/chartjs.py
new file mode 100644 (file)
index 0000000..fb30041
--- /dev/null
@@ -0,0 +1,70 @@
+from __future__ import print_function
+
+import cgi
+import json
+import pkg_resources
+
+from crunchstat_summary import logger
+
+
+class ChartJS(object):
+    JSLIB = 'https://cdnjs.cloudflare.com/ajax/libs/canvasjs/1.7.0/canvasjs.min.js'
+
+    def __init__(self, label, summarizers):
+        self.label = label
+        self.summarizers = summarizers
+
+    def html(self):
+        return '''<!doctype html><html><head>
+        <title>{} stats</title>
+        <script type="text/javascript" src="{}"></script>
+        <script type="text/javascript">{}</script>
+        </head><body></body></html>
+        '''.format(cgi.escape(self.label), self.JSLIB, self.js())
+
+    def js(self):
+        return 'var sections = {};\n{}'.format(
+            json.dumps(self.sections()),
+            pkg_resources.resource_string('crunchstat_summary', 'chartjs.js'))
+
+    def sections(self):
+        return [
+            {
+                'label': s.long_label(),
+                'charts': self.charts(s.label, s.tasks),
+            }
+            for s in self.summarizers]
+
+    def charts(self, label, tasks):
+        return [
+            {
+                'axisY': {
+                    'minimum': 0,
+                },
+                'data': [
+                    {
+                        'type': 'line',
+                        'markerType': 'none',
+                        'dataPoints': self._datapoints(
+                            label=uuid, task=task, series=task.series[stat]),
+                    }
+                    for uuid, task in tasks.iteritems()
+                ],
+                'title': {
+                    'text': '{}: {} {}'.format(label, stat[0], stat[1]),
+                },
+                'zoomEnabled': True,
+            }
+            for stat in (('cpu', 'user+sys__rate'),
+                         ('mem', 'rss'),
+                         ('net:eth0', 'tx+rx__rate'),
+                         ('net:keep0', 'tx+rx__rate'))]
+
+    def _datapoints(self, label, task, series):
+        points = [
+            {'x': pt[0].total_seconds(), 'y': pt[1]}
+            for pt in series]
+        if len(points) > 0:
+            points[-1]['markerType'] = 'cross'
+            points[-1]['markerSize'] = 12
+        return points
index 8186e5d7579e91c208e714abaeff3d7192c3ab16..78638c60e840b48e2117af842fc472e02d109daa 100644 (file)
@@ -1,4 +1,9 @@
 import argparse
+import gzip
+import logging
+import sys
+
+from crunchstat_summary import logger, summarizer
 
 
 class ArgumentParser(argparse.ArgumentParser):
@@ -8,7 +13,50 @@ class ArgumentParser(argparse.ArgumentParser):
         src = self.add_mutually_exclusive_group()
         src.add_argument(
             '--job', type=str, metavar='UUID',
-            help='Look up the specified job and read its log data from Keep')
+            help='Look up the specified job and read its log data from Keep'
+            ' (or from the Arvados event log, if the job is still running)')
+        src.add_argument(
+            '--pipeline-instance', type=str, metavar='UUID',
+            help='Summarize each component of the given pipeline instance')
         src.add_argument(
             '--log-file', type=str,
             help='Read log data from a regular file')
+        self.add_argument(
+            '--skip-child-jobs', action='store_true',
+            help='Do not include stats from child jobs')
+        self.add_argument(
+            '--format', type=str, choices=('html', 'text'), default='text',
+            help='Report format')
+        self.add_argument(
+            '--verbose', '-v', action='count', default=0,
+            help='Log more information (once for progress, twice for debug)')
+
+
+class Command(object):
+    def __init__(self, args):
+        self.args = args
+        logger.setLevel(logging.WARNING - 10 * args.verbose)
+
+    def run(self):
+        kwargs = {
+            'skip_child_jobs': self.args.skip_child_jobs,
+        }
+        if self.args.pipeline_instance:
+            self.summer = summarizer.PipelineSummarizer(self.args.pipeline_instance, **kwargs)
+        elif self.args.job:
+            self.summer = summarizer.JobSummarizer(self.args.job, **kwargs)
+        elif self.args.log_file:
+            if self.args.log_file.endswith('.gz'):
+                fh = gzip.open(self.args.log_file)
+            else:
+                fh = open(self.args.log_file)
+            self.summer = summarizer.Summarizer(fh, **kwargs)
+        else:
+            self.summer = summarizer.Summarizer(sys.stdin, **kwargs)
+        return self.summer.run()
+
+    def report(self):
+        if self.args.format == 'html':
+            return self.summer.html_report()
+        elif self.args.format == 'text':
+            return self.summer.text_report()
diff --git a/tools/crunchstat-summary/crunchstat_summary/reader.py b/tools/crunchstat-summary/crunchstat_summary/reader.py
new file mode 100644 (file)
index 0000000..049b48f
--- /dev/null
@@ -0,0 +1,69 @@
+from __future__ import print_function
+
+import arvados
+import Queue
+import threading
+
+from crunchstat_summary import logger
+
+
+class CollectionReader(object):
+    def __init__(self, collection_id):
+        logger.debug('load collection %s', collection_id)
+        collection = arvados.collection.CollectionReader(collection_id)
+        filenames = [filename for filename in collection]
+        if len(filenames) != 1:
+            raise ValueError(
+                "collection {} has {} files; need exactly one".format(
+                    collection_id, len(filenames)))
+        self._reader = collection.open(filenames[0])
+
+    def __iter__(self):
+        return iter(self._reader)
+
+
+class LiveLogReader(object):
+    EOF = None
+
+    def __init__(self, job_uuid):
+        logger.debug('load stderr events for job %s', job_uuid)
+        self._filters = [
+            ['object_uuid', '=', job_uuid],
+            ['event_type', '=', 'stderr']]
+        self._label = job_uuid
+
+    def _get_all_pages(self):
+        got = 0
+        last_id = 0
+        while True:
+            page = arvados.api().logs().index(
+                limit=1000,
+                order=['id asc'],
+                filters=self._filters + [['id','>',str(last_id)]],
+            ).execute(num_retries=2)
+            got += len(page['items'])
+            logger.debug(
+                '%s: received %d of %d log events',
+                self._label, got,
+                got + page['items_available'] - len(page['items']))
+            for i in page['items']:
+                for line in i['properties']['text'].split('\n'):
+                    self._queue.put(line+'\n')
+                last_id = i['id']
+            if (len(page['items']) == 0 or
+                len(page['items']) >= page['items_available']):
+                break
+        self._queue.put(self.EOF)
+
+    def __iter__(self):
+        self._queue = Queue.Queue()
+        self._thread = threading.Thread(target=self._get_all_pages)
+        self._thread.daemon = True
+        self._thread.start()
+        return self
+
+    def next(self):
+        line = self._queue.get()
+        if line is self.EOF:
+            raise StopIteration
+        return line
index ac0964b30e990724ecb5c7b16fb1cbd940bce25d..48bec6a9339c0740021b43735797e58a8e6a5756 100644 (file)
@@ -2,17 +2,39 @@ from __future__ import print_function
 
 import arvados
 import collections
+import crunchstat_summary.chartjs
+import crunchstat_summary.reader
+import datetime
 import functools
-import gzip
+import itertools
+import math
 import re
 import sys
 
+from arvados.api import OrderedJsonModel
+from crunchstat_summary import logger
+
+# Recommend memory constraints that are this multiple of an integral
+# number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
+# that have amounts like 7.5 GiB according to the kernel.)
+AVAILABLE_RAM_RATIO = 0.95
+
+
+class Task(object):
+    def __init__(self):
+        self.starttime = None
+        self.series = collections.defaultdict(list)
+
 
 class Summarizer(object):
-    def __init__(self, args):
-        self.args = args
+    def __init__(self, logdata, label=None, skip_child_jobs=False):
+        self._logdata = logdata
+
+        self.label = label
+        self.starttime = None
+        self.finishtime = None
+        self._skip_child_jobs = skip_child_jobs
 
-    def run(self):
         # stats_max: {category: {stat: val}}
         self.stats_max = collections.defaultdict(
             functools.partial(collections.defaultdict,
@@ -20,22 +42,88 @@ class Summarizer(object):
         # task_stats: {task_id: {category: {stat: val}}}
         self.task_stats = collections.defaultdict(
             functools.partial(collections.defaultdict, dict))
-        for line in self._logdata():
+
+        self.seq_to_uuid = {}
+        self.tasks = collections.defaultdict(Task)
+
+        # We won't bother recommending new runtime constraints if the
+        # constraints given when running the job are known to us and
+        # are already suitable.  If applicable, the subclass
+        # constructor will overwrite this with something useful.
+        self.existing_constraints = {}
+
+        logger.debug("%s: logdata %s", self.label, repr(logdata))
+
+    def run(self):
+        logger.debug("%s: parsing log data", self.label)
+        for line in self._logdata:
+            m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
+            if m:
+                seq = int(m.group('seq'))
+                uuid = m.group('task_uuid')
+                self.seq_to_uuid[seq] = uuid
+                logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
+                continue
+
             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) success in (?P<elapsed>\d+) seconds', line)
             if m:
-                task_id = m.group('seq')
+                task_id = self.seq_to_uuid[int(m.group('seq'))]
                 elapsed = int(m.group('elapsed'))
                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
                 if elapsed > self.stats_max['time']['elapsed']:
                     self.stats_max['time']['elapsed'] = elapsed
                 continue
-            m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
+
+            m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
+            if m:
+                uuid = m.group('uuid')
+                if self._skip_child_jobs:
+                    logger.warning('%s: omitting stats from child job %s'
+                                   ' because --skip-child-jobs flag is on',
+                                   self.label, uuid)
+                    continue
+                logger.debug('%s: follow %s', self.label, uuid)
+                child_summarizer = JobSummarizer(uuid)
+                child_summarizer.stats_max = self.stats_max
+                child_summarizer.task_stats = self.task_stats
+                child_summarizer.tasks = self.tasks
+                child_summarizer.run()
+                logger.debug('%s: done %s', self.label, uuid)
+                continue
+
+            m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
             if not m:
                 continue
+
+            if self.label is None:
+                self.label = m.group('job_uuid')
+                logger.debug('%s: using job uuid as label', self.label)
             if m.group('category').endswith(':'):
-                # "notice:" etc.
+                # "stderr crunchstat: notice: ..."
+                continue
+            elif m.group('category') in ('error', 'caught'):
                 continue
-            task_id = m.group('seq')
+            elif m.group('category') == 'read':
+                # "stderr crunchstat: read /proc/1234/net/dev: ..."
+                # (crunchstat formatting fixed, but old logs still say this)
+                continue
+            task_id = self.seq_to_uuid[int(m.group('seq'))]
+            task = self.tasks[task_id]
+
+            # Use the first and last crunchstat timestamps as
+            # approximations of starttime and finishtime.
+            timestamp = datetime.datetime.strptime(
+                m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
+            if not task.starttime:
+                task.starttime = timestamp
+                logger.debug('%s: task %s starttime %s',
+                             self.label, task_id, timestamp)
+            task.finishtime = timestamp
+
+            if not self.starttime:
+                self.starttime = timestamp
+            self.finishtime = timestamp
+
             this_interval_s = None
             for group in ['current', 'interval']:
                 if not m.group(group):
@@ -44,10 +132,15 @@ class Summarizer(object):
                 words = m.group(group).split(' ')
                 stats = {}
                 for val, stat in zip(words[::2], words[1::2]):
-                    if '.' in val:
-                        stats[stat] = float(val)
-                    else:
-                        stats[stat] = int(val)
+                    try:
+                        if '.' in val:
+                            stats[stat] = float(val)
+                        else:
+                            stats[stat] = int(val)
+                    except ValueError as e:
+                        raise ValueError(
+                            'Error parsing {} stat in "{}": {!r}'.format(
+                                stat, line, e))
                 if 'user' in stats or 'sys' in stats:
                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
                 if 'tx' in stats or 'rx' in stats:
@@ -58,23 +151,26 @@ class Summarizer(object):
                             this_interval_s = val
                             continue
                         elif not (this_interval_s > 0):
-                            print("BUG? interval stat given with duration {!r}".
-                                  format(this_interval_s),
-                                  file=sys.stderr)
+                            logger.error(
+                                "BUG? interval stat given with duration {!r}".
+                                format(this_interval_s))
                             continue
                         else:
                             stat = stat + '__rate'
                             val = val / this_interval_s
+                            if stat in ['user+sys__rate', 'tx+rx__rate']:
+                                task.series[category, stat].append(
+                                    (timestamp - task.starttime, val))
                     else:
+                        if stat in ['rss']:
+                            task.series[category, stat].append(
+                                (timestamp - task.starttime, val))
                         self.task_stats[task_id][category][stat] = val
                     if val > self.stats_max[category][stat]:
                         self.stats_max[category][stat] = val
+        logger.debug('%s: done parsing', self.label)
 
-    def report(self):
-        return "\n".join(self._report_gen()) + "\n"
-
-    def _report_gen(self):
-        job_tot = collections.defaultdict(
+        self.job_tot = collections.defaultdict(
             functools.partial(collections.defaultdict, int))
         for task_id, task_stat in self.task_stats.iteritems():
             for category, stat_last in task_stat.iteritems():
@@ -82,7 +178,32 @@ class Summarizer(object):
                     if stat in ['cpus', 'cache', 'swap', 'rss']:
                         # meaningless stats like 16 cpu cores x 5 tasks = 80
                         continue
-                    job_tot[category][stat] += val
+                    self.job_tot[category][stat] += val
+        logger.debug('%s: done totals', self.label)
+
+    def long_label(self):
+        label = self.label
+        if self.finishtime:
+            label += ' -- elapsed time '
+            s = (self.finishtime - self.starttime).total_seconds()
+            if s > 86400:
+                label += '{}d'.format(int(s/86400))
+            if s > 3600:
+                label += '{}h'.format(int(s/3600) % 24)
+            if s > 60:
+                label += '{}m'.format(int(s/60) % 60)
+            label += '{}s'.format(int(s) % 60)
+        return label
+
+    def text_report(self):
+        return "\n".join(itertools.chain(
+            self._text_report_gen(),
+            self._recommend_gen())) + "\n"
+
+    def html_report(self):
+        return crunchstat_summary.chartjs.ChartJS(self.label, [self]).html()
+
+    def _text_report_gen(self):
         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
         for category, stat_max in sorted(self.stats_max.iteritems()):
             for stat, val in sorted(stat_max.iteritems()):
@@ -90,9 +211,12 @@ class Summarizer(object):
                     continue
                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
                 val = self._format(val)
-                tot = self._format(job_tot[category].get(stat, '-'))
+                tot = self._format(self.job_tot[category].get(stat, '-'))
                 yield "\t".join([category, stat, str(val), max_rate, tot])
         for args in (
+                ('Number of tasks: {}',
+                 len(self.tasks),
+                 None),
                 ('Max CPU time spent by a single task: {}s',
                  self.stats_max['cpu']['user+sys'],
                  None),
@@ -100,7 +224,8 @@ class Summarizer(object):
                  self.stats_max['cpu']['user+sys__rate'],
                  lambda x: x * 100),
                 ('Overall CPU usage: {}%',
-                 job_tot['cpu']['user+sys'] / job_tot['time']['elapsed'],
+                 self.job_tot['cpu']['user+sys'] /
+                 self.job_tot['time']['elapsed'],
                  lambda x: x * 100),
                 ('Max memory used by a single task: {}GB',
                  self.stats_max['mem']['rss'],
@@ -118,6 +243,81 @@ class Summarizer(object):
                 val = transform(val)
             yield "# "+format_string.format(self._format(val))
 
+    def _recommend_gen(self):
+        return itertools.chain(
+            self._recommend_cpu(),
+            self._recommend_ram())
+
+    def _recommend_cpu(self):
+        """Recommend asking for 4 cores if max CPU usage was 333%"""
+
+        cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
+        if cpu_max_rate == float('-Inf'):
+            logger.warning('%s: no CPU usage data', self.label)
+            return
+        used_cores = int(math.ceil(cpu_max_rate))
+        asked_cores = self.existing_constraints.get('min_cores_per_node')
+        if asked_cores is None or used_cores < asked_cores:
+            yield (
+                '#!! {} max CPU usage was {}% -- '
+                'try runtime_constraints "min_cores_per_node":{}'
+            ).format(
+                self.label,
+                int(math.ceil(cpu_max_rate*100)),
+                int(used_cores))
+
+    def _recommend_ram(self):
+        """Recommend an economical RAM constraint for this job.
+
+        Nodes that are advertised as "8 gibibytes" actually have what
+        we might call "8 nearlygibs" of memory available for jobs.
+        Here, we calculate a whole number of nearlygibs that would
+        have sufficed to run the job, then recommend requesting a node
+        with that number of nearlygibs (expressed as mebibytes).
+
+        Requesting a node with "nearly 8 gibibytes" is our best hope
+        of getting a node that actually has nearly 8 gibibytes
+        available.  If the node manager is smart enough to account for
+        the discrepancy itself when choosing/creating a node, we'll
+        get an 8 GiB node with nearly 8 GiB available.  Otherwise, the
+        advertised size of the next-size-smaller node (say, 6 GiB)
+        will be too low to satisfy our request, so we will effectively
+        get rounded up to 8 GiB.
+
+        For example, if we need 7500 MiB, we can ask for 7500 MiB, and
+        we will generally get a node that is advertised as "8 GiB" and
+        has at least 7500 MiB available.  However, asking for 8192 MiB
+        would either result in an unnecessarily expensive 12 GiB node
+        (if node manager knows about the discrepancy), or an 8 GiB
+        node which has less than 8192 MiB available and is therefore
+        considered by crunch-dispatch to be too small to meet our
+        constraint.
+
+        When node manager learns how to predict the available memory
+        for each node type such that crunch-dispatch always agrees
+        that a node is big enough to run the job it was brought up
+        for, all this will be unnecessary.  We'll just ask for exactly
+        the memory we want -- even if that happens to be 8192 MiB.
+        """
+
+        used_bytes = self.stats_max['mem']['rss']
+        if used_bytes == float('-Inf'):
+            logger.warning('%s: no memory usage data', self.label)
+            return
+        used_mib = math.ceil(float(used_bytes) / 1048576)
+        asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
+
+        nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
+        if asked_mib is None or (
+                math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
+            yield (
+                '#!! {} max RSS was {} MiB -- '
+                'try runtime_constraints "min_ram_mb_per_node":{}'
+            ).format(
+                self.label,
+                int(used_mib),
+                int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
+
     def _format(self, val):
         """Return a string representation of a stat.
 
@@ -127,25 +327,67 @@ class Summarizer(object):
         else:
             return '{}'.format(val)
 
-    def _logdata(self):
-        if self.args.log_file:
-            if self.args.log_file.endswith('.gz'):
-                return gzip.open(self.args.log_file)
-            else:
-                return open(self.args.log_file)
-        elif self.args.job:
-            arv = arvados.api('v1')
-            job = arv.jobs().get(uuid=self.args.job).execute()
-            if not job['log']:
-                raise ValueError(
-                    "job {} has no log; live summary not implemented".format(
-                        self.args.job))
-            collection = arvados.collection.CollectionReader(job['log'])
-            filenames = [filename for filename in collection]
-            if len(filenames) != 1:
-                raise ValueError(
-                    "collection {} has {} files; need exactly one".format(
-                        job.log, len(filenames)))
-            return collection.open(filenames[0])
+
+class CollectionSummarizer(Summarizer):
+    def __init__(self, collection_id, **kwargs):
+        super(CollectionSummarizer, self).__init__(
+            crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
+        self.label = collection_id
+
+
+class JobSummarizer(Summarizer):
+    def __init__(self, job, **kwargs):
+        arv = arvados.api('v1')
+        if isinstance(job, basestring):
+            self.job = arv.jobs().get(uuid=job).execute()
+        else:
+            self.job = job
+        if self.job['log']:
+            rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
+            label = self.job['uuid']
         else:
-            return sys.stdin
+            rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
+            label = self.job['uuid'] + ' (partial)'
+        super(JobSummarizer, self).__init__(rdr, **kwargs)
+        self.label = label
+        self.existing_constraints = self.job.get('runtime_constraints', {})
+
+
+class PipelineSummarizer(object):
+    def __init__(self, pipeline_instance_uuid, **kwargs):
+        arv = arvados.api('v1', model=OrderedJsonModel())
+        instance = arv.pipeline_instances().get(
+            uuid=pipeline_instance_uuid).execute()
+        self.summarizers = collections.OrderedDict()
+        for cname, component in instance['components'].iteritems():
+            if 'job' not in component:
+                logger.warning(
+                    "%s: skipping component with no job assigned", cname)
+            elif component['job'].get('log') is None:
+                logger.warning(
+                    "%s: skipping job %s with no log available",
+                    cname, component['job'].get('uuid'))
+            else:
+                logger.info(
+                    "%s: logdata %s", cname, component['job']['log'])
+                summarizer = JobSummarizer(component['job'], **kwargs)
+                summarizer.label = cname
+                self.summarizers[cname] = summarizer
+        self.label = pipeline_instance_uuid
+
+    def run(self):
+        for summarizer in self.summarizers.itervalues():
+            summarizer.run()
+
+    def text_report(self):
+        txt = ''
+        for cname, summarizer in self.summarizers.iteritems():
+            txt += '### Summary for {} ({})\n'.format(
+                cname, summarizer.job['uuid'])
+            txt += summarizer.text_report()
+            txt += '\n'
+        return txt
+
+    def html_report(self):
+        return crunchstat_summary.chartjs.ChartJS(
+            self.label, self.summarizers.itervalues()).html()
index aeb0fe6c3369bc30fdc125985374895461685546..f3c10bdaec0dbffd210a2e2b6a30886903be0a88 100755 (executable)
@@ -23,6 +23,7 @@ setup(name='crunchstat_summary',
       download_url="https://github.com/curoverse/arvados.git",
       license='GNU Affero General Public License, version 3.0',
       packages=['crunchstat_summary'],
+      include_package_data=True,
       scripts=[
           'bin/crunchstat-summary'
       ],
@@ -33,6 +34,7 @@ setup(name='crunchstat_summary',
           'arvados-python-client',
       ],
       test_suite='tests',
+      tests_require=['pbr<1.7.0', 'mock>=1.0'],
       zip_safe=False,
       cmdclass={'egg_info': tagger},
       )
diff --git a/tools/crunchstat-summary/tests/crunchstat_error_messages.txt b/tools/crunchstat-summary/tests/crunchstat_error_messages.txt
new file mode 100644 (file)
index 0000000..bf6dd5c
--- /dev/null
@@ -0,0 +1,9 @@
+2016-01-07_00:15:33 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr 
+2016-01-07_00:15:33 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr old error message:
+2016-01-07_00:15:33 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr crunchstat: read /proc/3305/net/dev: open /proc/3305/net/dev: no such file or directory
+2016-01-07_00:15:34 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr 
+2016-01-07_00:15:34 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr new error message:
+2016-01-07_00:15:34 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr crunchstat: error reading /proc/3305/net/dev: open /proc/3305/net/dev: no such file or directory
+2016-01-07_00:15:34 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr
+2016-01-07_00:15:34 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr cancelled job:
+2016-01-07_00:15:34 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr crunchstat: caught signal: interrupt
index ef7beb11c0057009f8465aa2836349ca32235f5a..0ba0181c1215620c7f0a4334c2eb77dc3d64d7a6 100644 (file)
@@ -22,9 +22,12 @@ net:keep0    rx      0       0.00    0
 net:keep0      tx      0       0.00    0
 net:keep0      tx+rx   0       0.00    0
 time   elapsed 80      -       80
+# Number of tasks: 1
 # Max CPU time spent by a single task: 5.75s
 # Max CPU usage in a single interval: 13.00%
 # Overall CPU usage: 7.19%
 # Max memory used by a single task: 0.35GB
 # Max network traffic in a single task: 1.79GB
 # Max network speed in a single interval: 42.58MB/s
+#!! 4xphq-8i9sb-jq0ekny1xou3zoh max CPU usage was 13% -- try runtime_constraints "min_cores_per_node":1
+#!! 4xphq-8i9sb-jq0ekny1xou3zoh max RSS was 334 MiB -- try runtime_constraints "min_ram_mb_per_node":972
index 38af3e7e8c7752eb07532f5a2f4fd54a616eab4e..0641bbac9f6c4f88c008adb279440e88be4f628a 100644 (file)
@@ -11,7 +11,9 @@ net:eth0      rx      90      -       90
 net:eth0       tx      90      -       90
 net:eth0       tx+rx   180     -       180
 time   elapsed 2       -       4
+# Number of tasks: 2
 # Max CPU time spent by a single task: 0.00s
 # Overall CPU usage: 0.00%
 # Max memory used by a single task: 0.00GB
 # Max network traffic in a single task: 0.00GB
+#!! 4xphq-8i9sb-zvb2ocfycpomrup max RSS was 1 MiB -- try runtime_constraints "min_ram_mb_per_node":972
index 7e42d612b753fadb25e4bac7cee7e941e01231e3..19fe0ed764d4fc2c229c348161c0079d9d67b1dd 100644 (file)
@@ -11,7 +11,9 @@ net:eth0      rx      90      -       90
 net:eth0       tx      90      -       90
 net:eth0       tx+rx   180     -       180
 time   elapsed 2       -       3
+# Number of tasks: 2
 # Max CPU time spent by a single task: 0.00s
 # Overall CPU usage: 0.00%
 # Max memory used by a single task: 0.00GB
 # Max network traffic in a single task: 0.00GB
+#!! 4xphq-8i9sb-v831jm2uq0g2g9x max RSS was 1 MiB -- try runtime_constraints "min_ram_mb_per_node":972
index dbc3843c698c616e9ef2f98b95b250e632b9f3c7..6c1443733c35ec9ea76bd35e49fb3c69d6f83906 100644 (file)
+import arvados
+import collections
 import crunchstat_summary.command
-import crunchstat_summary.summarizer
 import difflib
 import glob
+import gzip
+import mock
 import os
 import unittest
 
+TESTS_DIR = os.path.dirname(os.path.abspath(__file__))
 
-class ExampleLogsTestCase(unittest.TestCase):
+
+class ReportDiff(unittest.TestCase):
+    def diff_known_report(self, logfile, cmd):
+        expectfile = logfile+'.report'
+        expect = open(expectfile).readlines()
+        self.diff_report(cmd, expect, expectfile=expectfile)
+
+    def diff_report(self, cmd, expect, expectfile=None):
+        got = [x+"\n" for x in cmd.report().strip("\n").split("\n")]
+        self.assertEqual(got, expect, "\n"+"".join(difflib.context_diff(
+            expect, got, fromfile=expectfile, tofile="(generated)")))
+
+
+class SummarizeFile(ReportDiff):
     def test_example_files(self):
-        dirname = os.path.dirname(os.path.abspath(__file__))
-        for fnm in glob.glob(os.path.join(dirname, '*.txt.gz')):
-            logfile = os.path.join(dirname, fnm)
+        for fnm in glob.glob(os.path.join(TESTS_DIR, '*.txt.gz')):
+            logfile = os.path.join(TESTS_DIR, fnm)
             args = crunchstat_summary.command.ArgumentParser().parse_args(
                 ['--log-file', logfile])
-            summarizer = crunchstat_summary.summarizer.Summarizer(args)
-            summarizer.run()
-            got = [x+"\n" for x in summarizer.report().strip("\n").split("\n")]
-            expectfile = logfile+'.report'
-            expect = open(expectfile).readlines()
-            self.assertEqual(got, expect, "\n"+"".join(difflib.context_diff(
-                expect, got, fromfile=expectfile, tofile="(generated)")))
+            cmd = crunchstat_summary.command.Command(args)
+            cmd.run()
+            self.diff_known_report(logfile, cmd)
+
+
+class HTMLFromFile(ReportDiff):
+    def test_example_files(self):
+        # Note we don't test the output content at all yet; we're
+        # mainly just verifying the --format=html option isn't ignored
+        # and the HTML code path doesn't crash.
+        for fnm in glob.glob(os.path.join(TESTS_DIR, '*.txt.gz')):
+            logfile = os.path.join(TESTS_DIR, fnm)
+            args = crunchstat_summary.command.ArgumentParser().parse_args(
+                ['--format=html', '--log-file', logfile])
+            cmd = crunchstat_summary.command.Command(args)
+            cmd.run()
+            self.assertRegexpMatches(cmd.report(), r'(?is)<html>.*</html>\s*$')
+
+
+class SummarizeEdgeCases(unittest.TestCase):
+    def test_error_messages(self):
+        logfile = open(os.path.join(TESTS_DIR, 'crunchstat_error_messages.txt'))
+        s = crunchstat_summary.summarizer.Summarizer(logfile)
+        s.run()
+
+
+class SummarizeJob(ReportDiff):
+    fake_job_uuid = '4xphq-8i9sb-jq0ekny1xou3zoh'
+    fake_log_id = 'fake-log-collection-id'
+    fake_job = {
+        'uuid': fake_job_uuid,
+        'log': fake_log_id,
+    }
+    logfile = os.path.join(TESTS_DIR, 'logfile_20151204190335.txt.gz')
+
+    @mock.patch('arvados.collection.CollectionReader')
+    @mock.patch('arvados.api')
+    def test_job_report(self, mock_api, mock_cr):
+        mock_api().jobs().get().execute.return_value = self.fake_job
+        mock_cr().__iter__.return_value = ['fake-logfile.txt']
+        mock_cr().open.return_value = gzip.open(self.logfile)
+        args = crunchstat_summary.command.ArgumentParser().parse_args(
+            ['--job', self.fake_job_uuid])
+        cmd = crunchstat_summary.command.Command(args)
+        cmd.run()
+        self.diff_known_report(self.logfile, cmd)
+        mock_api().jobs().get.assert_called_with(uuid=self.fake_job_uuid)
+        mock_cr.assert_called_with(self.fake_log_id)
+        mock_cr().open.assert_called_with('fake-logfile.txt')
+
+
+class SummarizePipeline(ReportDiff):
+    fake_instance = {
+        'uuid': 'zzzzz-d1hrv-i3e77t9z5y8j9cc',
+        'owner_uuid': 'zzzzz-tpzed-xurymjxw79nv3jz',
+        'components': collections.OrderedDict([
+            ['foo', {
+                'job': {
+                    'uuid': 'zzzzz-8i9sb-000000000000000',
+                    'log': 'fake-log-pdh-0',
+                    'runtime_constraints': {
+                        'min_ram_mb_per_node': 900,
+                        'min_cores_per_node': 1,
+                    },
+                },
+            }],
+            ['bar', {
+                'job': {
+                    'uuid': 'zzzzz-8i9sb-000000000000001',
+                    'log': 'fake-log-pdh-1',
+                    'runtime_constraints': {
+                        'min_ram_mb_per_node': 900,
+                        'min_cores_per_node': 1,
+                    },
+                },
+            }],
+            ['no-job-assigned', {}],
+            ['unfinished-job', {
+                'job': {
+                    'uuid': 'zzzzz-8i9sb-xxxxxxxxxxxxxxx',
+                },
+            }],
+            ['baz', {
+                'job': {
+                    'uuid': 'zzzzz-8i9sb-000000000000002',
+                    'log': 'fake-log-pdh-2',
+                    'runtime_constraints': {
+                        'min_ram_mb_per_node': 900,
+                        'min_cores_per_node': 1,
+                    },
+                },
+            }]]),
+    }
+
+    @mock.patch('arvados.collection.CollectionReader')
+    @mock.patch('arvados.api')
+    def test_pipeline(self, mock_api, mock_cr):
+        logfile = os.path.join(TESTS_DIR, 'logfile_20151204190335.txt.gz')
+        mock_api().pipeline_instances().get().execute. \
+            return_value = self.fake_instance
+        mock_cr().__iter__.return_value = ['fake-logfile.txt']
+        mock_cr().open.side_effect = [gzip.open(logfile) for _ in range(3)]
+        args = crunchstat_summary.command.ArgumentParser().parse_args(
+            ['--pipeline-instance', self.fake_instance['uuid']])
+        cmd = crunchstat_summary.command.Command(args)
+        cmd.run()
+
+        job_report = [
+            line for line in open(logfile+'.report').readlines()
+            if not line.startswith('#!! ')]
+        expect = (
+            ['### Summary for foo (zzzzz-8i9sb-000000000000000)\n'] +
+            job_report + ['\n'] +
+            ['### Summary for bar (zzzzz-8i9sb-000000000000001)\n'] +
+            job_report + ['\n'] +
+            ['### Summary for baz (zzzzz-8i9sb-000000000000002)\n'] +
+            job_report)
+        self.diff_report(cmd, expect)
+        mock_cr.assert_has_calls(
+            [
+                mock.call('fake-log-pdh-0'),
+                mock.call('fake-log-pdh-1'),
+                mock.call('fake-log-pdh-2'),
+            ], any_order=True)
+        mock_cr().open.assert_called_with('fake-logfile.txt')