therubyracer
uglifier (>= 1.0.3)
wiselinks
-
-BUNDLED WITH
- 1.10.6
@api_client.ssl_config.verify_mode = OpenSSL::SSL::VERIFY_NONE
else
# Use system CA certificates
- @api_client.ssl_config.add_trust_ca('/etc/ssl/certs')
+ ["/etc/ssl/certs/ca-certificates.crt",
+ "/etc/pki/tls/certs/ca-bundle.crt"]
+ .select { |ca_path| File.readable?(ca_path) }
+ .each { |ca_path| @api_client.ssl_config.add_trust_ca(ca_path) }
end
if Rails.configuration.api_response_compression
@api_client.transparent_gzip_decompression = true
# FIXME: Remove this line after #6885 is done.
fpm_args+=(--iteration 2)
+
+# FIXME: Remove once support for llfuse 0.42+ is in place
+fpm_args+=(-v 0.41.1)
def sub_glob(v):
l = glob.glob(v)
if len(l) == 0:
- raise SubstitutionError("$(glob {}) no match fonud".format(v))
+ raise SubstitutionError("$(glob {}) no match found".format(v))
else:
return l[0]
active = 1
pids = set([s.pid for s in subprocesses])
while len(pids) > 0:
- (pid, status) = os.wait()
- pids.discard(pid)
- if not taskp.get("task.ignore_rcode"):
- rcode[pid] = (status >> 8)
+ try:
+ (pid, status) = os.wait()
+ except OSError as e:
+ if e.errno == errno.EINTR:
+ pass
+ else:
+ raise
else:
- rcode[pid] = 0
+ pids.discard(pid)
+ if not taskp.get("task.ignore_rcode"):
+ rcode[pid] = (status >> 8)
+ else:
+ rcode[pid] = 0
if sig.sig is not None:
logger.critical("terminating on signal %s" % sig.sig)
{% include 'notebox_begin' %}
-Arvados requires git version 1.7.10 or later. If you are using an earlier version of git, please update your git version.
+The Arvados API and Git servers require Git 1.7.10 or later. You can get this version on CentOS 6 from RepoForge. "Install the repository":http://repoforge.org/use/, then run:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo yum install --enablerepo=rpmforge-extras git</span>
+</code></pre>
+</notextile>
+
{% include 'notebox_end' %}
--- /dev/null
+{% include 'notebox_begin' %}
+
+If you are installing on CentOS 6, you will need to modify PostgreSQL's configuration to allow password authentication for local users. The default configuration allows 'ident' authentication only. The following commands will make the configuration change, and restart PostgreSQL for it to take effect.
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo sed -ri -e 's/^(host +all +all +(127\.0\.0\.1\/32|::1\/128) +)ident$/\1md5/' {{pg_hba_path}}</span>
+~$ <span class="userinput">sudo service {{pg_service}} restart</span>
+</code></pre>
+</notextile>
+
+{% include 'notebox_end' %}
<pre><code><span class="userinput">sudo yum install \
libyaml-devel glibc-headers autoconf gcc-c++ glibc-devel \
patch readline-devel zlib-devel libffi-devel openssl-devel \
- automake libtool bison sqlite-devel
+ automake libtool bison sqlite-devel tar
</span></code></pre></notextile>
Install prerequisites for Ubuntu 12.04 or 14.04:
--- /dev/null
+On Debian-based systems:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo apt-get install runit</span>
+</code></pre>
+</notextile>
+
+On Red Hat-based systems:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo yum install runit</span>
+</code></pre>
+</notextile>
Enter it again: <span class="userinput">paste-password-again</span>
</code></pre></notextile>
-{% include 'notebox_begin' %}
-
-This user setup assumes that your PostgreSQL is configured to accept password authentication. Red Hat systems use ident-based authentication by default. You may need to either adapt the user creation, or reconfigure PostgreSQL (in @pg_hba.conf@) to accept password authentication.
-
-{% include 'notebox_end' %}
+{% assign pg_hba_path = "/opt/rh/postgresql92/root/var/lib/pgsql/data/pg_hba.conf" %}
+{% assign pg_service = "postgresql92-postgresql" %}
+{% include 'install_redhat_postgres_auth' %}
Create the database:
</code></pre>
</notextile>
-h2. Set up configuration files
-
-The API server package uses configuration files that you write to @/etc/arvados/api@ and ensures they're consistently deployed. Create this directory and copy the example configuration files to it:
-
-<notextile>
-<pre><code>~$ <span class="userinput">sudo mkdir -p /etc/arvados/api</span>
-~$ <span class="userinput">sudo chmod 700 /etc/arvados/api</span>
-~$ <span class="userinput">cd /var/www/arvados-api/current</span>
-/var/www/arvados-api/current$ <span class="userinput">sudo cp config/database.yml.example /etc/arvados/api/database.yml</span>
-/var/www/arvados-api/current$ <span class="userinput">sudo cp config/application.yml.example /etc/arvados/api/application.yml</span>
-</code></pre>
-</notextile>
-
h2. Configure the database connection
Edit @/etc/arvados/api/database.yml@ and replace the @xxxxxxxx@ database password placeholders with the PostgreSQL password you generated above.
h2(#configure_application). Configure the API server
-Edit @/etc/arvados/api/application.yml@ to configure the settings described in the following sections. The deployment script will consistently deploy this to the API server's configuration directory. The API server reads both @application.yml@ and its own @config/application.default.yml@ file. The settings in @application.yml@ take precedence over the defaults that are defined in @config/application.default.yml@. The @config/application.yml.example@ file is not read by the API server and is provided as a starting template only.
+Edit @/etc/arvados/api/application.yml@ to configure the settings described in the following sections. The API server reads both @application.yml@ and its own @config/application.default.yml@ file. The settings in @application.yml@ take precedence over the defaults that are defined in @config/application.default.yml@. The @config/application.yml.example@ file is not read by the API server and is provided as a starting template only.
@config/application.default.yml@ documents additional configuration settings not listed here. You can "view the current source version":https://dev.arvados.org/projects/arvados/repository/revisions/master/entry/services/api/config/application.default.yml for reference.
<ol>
<li><a href="https://www.phusionpassenger.com/library/walkthroughs/deploy/ruby/ownserver/nginx/oss/install_passenger_main.html">Install Nginx and Phusion Passenger</a>.</li>
-<li><p>Puma is already included with the API server's gems. We recommend you run it as a service under <a href="http://smarden.org/runit/">runit</a> or a similar tool. Here's a sample runit script for that:</p>
+<li><p>Install runit to supervise the Puma daemon. {% include 'install_runit' %}<notextile></p></li>
+
+<li><p>Install the script below as the run script for the Puma service, modifying it as directed by the comments.</p>
<pre><code>#!/bin/bash
h2. Install gitolite
-Check "https://github.com/sitaramc/gitolite/tags":https://github.com/sitaramc/gitolite/tags for the latest stable version. This guide was tested with @v3.6.3@. _Versions below 3.0 are missing some features needed by Arvados, and should not be used._
+Check "https://github.com/sitaramc/gitolite/tags":https://github.com/sitaramc/gitolite/tags for the latest stable version. This guide was tested with @v3.6.4@. _Versions below 3.0 are missing some features needed by Arvados, and should not be used._
Download and install the version you selected.
<notextile>
<pre><code>git@gitserver:~$ <span class="userinput">echo 'PATH=$HOME/bin:$PATH' >.profile</span>
git@gitserver:~$ <span class="userinput">source .profile</span>
-git@gitserver:~$ <span class="userinput">git clone --branch <b>v3.6.3</b> git://github.com/sitaramc/gitolite</span>
+git@gitserver:~$ <span class="userinput">git clone --branch <b>v3.6.4</b> https://github.com/sitaramc/gitolite</span>
...
Note: checking out '5d24ae666bfd2fa9093d67c840eb8d686992083f'.
...
h3. Enable arvados-git-httpd
-On Debian-based systems, install runit:
-
-<notextile>
-<pre><code>~$ <span class="userinput">sudo apt-get install runit</span>
-</code></pre>
-</notextile>
-
-On Red Hat-based systems, "install runit from source":http://smarden.org/runit/install.html or use an alternative daemon supervisor.
+Install runit to supervise the arvados-git-httpd daemon. {% include 'install_runit' %}
Configure runit to run arvados-git-httpd, making sure to update the API host to match your site:
<notextile>
-<pre><code>~$ <span class="userinput">cd /etc/sv</span>
+<pre><code>~$ <span class="userinput">sudo mkdir -p /etc/sv</span>
+~$ <span class="userinput">cd /etc/sv</span>
/etc/sv$ <span class="userinput">sudo mkdir arvados-git-httpd; cd arvados-git-httpd</span>
/etc/sv/arvados-git-httpd$ <span class="userinput">sudo mkdir log</span>
/etc/sv/arvados-git-httpd$ <span class="userinput">sudo sh -c 'cat >log/run' <<'EOF'
exec chpst -u git:git arvados-git-httpd -address=:9001 -git-command=/var/lib/arvados/git/gitolite/src/gitolite-shell -repo-root=<b>/var/lib/arvados/git</b>/repositories 2>&1
EOF</span>
/etc/sv/arvados-git-httpd$ <span class="userinput">sudo chmod +x run log/run</span>
+/etc/sv/arvados-git-httpd$ <span class="userinput">sudo ln -s "$(pwd)" /etc/service/</span>
</code></pre>
</notextile>
h2. Copy configuration files from the dispatcher (API server)
-The @/etc/slurm-llnl/slurm.conf@ and @/etc/munge/munge.key@ files need to be identicaly across the dispatcher and all compute nodes. Copy the files you created in the "Install the Crunch dispatcher":install-crunch-dispatch.html step to this compute node.
+The @slurm.conf@ and @/etc/munge/munge.key@ files need to be identical across the dispatcher and all compute nodes. Copy the files you created in the "Install the Crunch dispatcher":install-crunch-dispatch.html step to this compute node.
h2. Configure FUSE
The arvados-docker-cleaner program removes least recently used docker images as needed to keep disk usage below a configured limit.
{% include 'notebox_begin' %}
-This also removes all containers as soon as they exit, as if they were run with `docker run --rm`. If you need to debug or inspect containers after they stop, temporarily stop arvados-docker-cleaner or run it with `--remove-stopped-containers never`.
+This also removes all containers as soon as they exit, as if they were run with @docker run --rm@. If you need to debug or inspect containers after they stop, temporarily stop arvados-docker-cleaner or run it with @--remove-stopped-containers never@.
{% include 'notebox_end' %}
-On Debian-based systems, install runit:
-
-<notextile>
-<pre><code>~$ <span class="userinput">sudo apt-get install runit</span>
-</code></pre>
-</notextile>
-
-On Red Hat-based systems, "install runit from source":http://smarden.org/runit/install.html or use an alternative daemon supervisor.
+Install runit to supervise the Docker cleaner daemon. {% include 'install_runit' %}
Configure runit to run the image cleaner using a suitable quota for your compute nodes and workload:
<notextile>
-<pre><code>~$ <span class="userinput">cd /etc/sv</span>
+<pre><code>~$ <span class="userinput">sudo mkdir -p /etc/sv</span>
+~$ <span class="userinput">cd /etc/sv</span>
/etc/sv$ <span class="userinput">sudo mkdir arvados-docker-cleaner; cd arvados-docker-cleaner</span>
/etc/sv/arvados-docker-cleaner$ <span class="userinput">sudo mkdir log log/main</span>
/etc/sv/arvados-docker-cleaner$ <span class="userinput">sudo sh -c 'cat >log/run' <<'EOF'
exec python3 -m arvados_docker.cleaner --quota <b>50G</b>
EOF</span>
/etc/sv/arvados-docker-cleaner$ <span class="userinput">sudo chmod +x run log/run</span>
+/etc/sv/arvados-docker-cleaner$ <span class="userinput">sudo ln -s "$(pwd)" /etc/service/</span>
</code></pre>
</notextile>
import arvados, json, socket
fqdn = socket.getfqdn()
hostname, _, domain = fqdn.partition('.')
-ip_address = socket.gethostbyname(fqdn)
-node = arvados.api('v1').nodes().create(body={'hostname': hostname, 'domain': domain, 'ip_address': ip_address}).execute()
+node = arvados.api('v1').nodes().create(body={'hostname': hostname, 'domain': domain}).execute()
with open('/root/node.json', 'w') as node_file:
json.dump(node, node_file, indent=2)
EOF
</code>
</pre>
</notextile>
-
</code></pre>
</notextile>
-On Red Hat-based systems, "install SLURM and munge from source following their installation guide":https://computing.llnl.gov/linux/slurm/quickstart_admin.html.
+On Red Hat-based systems:
-Now we need to give SLURM a configuration file in @/etc/slurm-llnl/slurm.conf@. Here's an example:
+<notextile>
+<pre><code>~$ <span class="userinput">sudo yum install slurm munge slurm-munge</span>
+</code></pre>
+</notextile>
+
+Now we need to give SLURM a configuration file. On Debian-based systems, this is installed at @/etc/slurm-llnl/slurm.conf@. On Red Hat-based systems, this is installed at @/etc/slurm/slurm.conf@. Here's an example @slurm.conf@:
<notextile>
<pre>
# SCHEDULING
SchedulerType=sched/backfill
SchedulerPort=7321
-SelectType=select/cons_res
-SelectTypeParameters=CR_CPU_Memory
-FastSchedule=1
+SelectType=select/linear
+FastSchedule=0
#
# LOGGING
SlurmctldDebug=3
Each hostname in @slurm.conf@ must also resolve correctly on all SLURM worker nodes as well as the controller itself. Furthermore, the hostnames used in the configuration file must match the hostnames reported by @hostname@ or @hostname -s@ on the nodes themselves. This applies to the ControlMachine as well as the worker nodes.
For example:
-* In @/etc/slurm-llnl/slurm.conf@ on control and worker nodes: @ControlMachine=uuid_prefix.your.domain@
-* In @/etc/slurm-llnl/slurm.conf@ on control and worker nodes: @NodeName=compute[0-255]@
+* In @slurm.conf@ on control and worker nodes: @ControlMachine=uuid_prefix.your.domain@
+* In @slurm.conf@ on control and worker nodes: @NodeName=compute[0-255]@
* In @/etc/resolv.conf@ on control and worker nodes: @search uuid_prefix.your.domain@
* On the control node: @hostname@ reports @uuid_prefix.your.domain@
* On worker node 123: @hostname@ reports @compute123.uuid_prefix.your.domain@
* @crunch-job@ needs the installation path of the Perl SDK in its @PERLLIB@.
* @crunch-job@ needs the @ARVADOS_API_HOST@ (and, if necessary, @ARVADOS_API_HOST_INSECURE@) environment variable set.
-We recommend you run @crunch-dispatch.rb@ under "runit":http://smarden.org/runit/ or a similar supervisor. Here's an example runit service script:
+Install runit to monitor the Crunch dispatch daemon. {% include 'install_runit' %}
+
+Install the script below as the run script for the Crunch dispatch service, modifying it as directed by the comments.
<notextile>
<pre><code>#!/bin/sh
{% assign railsout = "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz" %}
If you intend to use Keep-web to serve public data to anonymous clients, configure it with an anonymous token. You can use the same one you used when you set up your Keepproxy server, or use the following command on the <strong>API server</strong> to create another. {% include 'install_rails_command' %}
-We recommend running Keep-web under "runit":https://packages.debian.org/search?keywords=runit or a similar supervisor. The basic command to start Keep-web is:
+Install runit to supervise the Keep-web daemon. {% include 'install_runit' %}
+
+The basic command to start Keep-web in the service run script is:
<notextile>
<pre><code>export ARVADOS_API_HOST=<span class="userinput">uuid_prefix</span>.your.domain
h3. Set up the Keepproxy service
-We recommend you run Keepproxy under "runit":http://smarden.org/runit/ or a similar supervisor. Make sure the launcher sets the envirnoment variables @ARVADOS_API_TOKEN@ (with the token you just generated), @ARVADOS_API_HOST@, and, if needed, @ARVADOS_API_HOST_INSECURE@. The core keepproxy command to run is:
+Install runit to supervise the keepproxy daemon. {% include 'install_runit' %}
+
+The run script for the keepproxy service should set the environment variables @ARVADOS_API_TOKEN@ (with the token you just generated), @ARVADOS_API_HOST@, and, if needed, @ARVADOS_API_HOST_INSECURE@. The core keepproxy command to run is:
<notextile>
<pre><code>ARVADOS_API_TOKEN=<span class="userinput">{{railsout}}</span> ARVADOS_API_HOST=<span class="userinput">uuid_prefix.your.domain</span> exec keepproxy
server {
listen <span class="userinput">[your public IP address]</span>:443 ssl;
- server_name keep.<span class="userinput">uuid_prefix</span>.your.domain
+ server_name keep.<span class="userinput">uuid_prefix</span>.your.domain;
proxy_connect_timeout 90s;
proxy_read_timeout 300s;
h3. Run keepstore as a supervised service
-We recommend running Keepstore under "runit":http://smarden.org/runit/ or something similar, using a run script like the following:
+Install runit to supervise the keepstore daemon. {% include 'install_runit' %}
+
+Install this script as the run script for the keepstore service, modifying it as directed below.
<notextile>
<pre><code>#!/bin/sh
h2. Install the Ruby SDK and utilities
-If you're using RVM:
+First, install the curl development libraries necessary to build the Arvados Ruby SDK. On Debian-based systems:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo apt-get install libcurl4-openssl-dev</span>
+</code></pre>
+</notextile>
+
+On Red Hat-based systems:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo yum install libcurl-devel</span>
+</code></pre>
+</notextile>
+
+Next, install the arvados-cli Ruby gem. If you're using RVM:
<notextile>
<pre><code>~$ <span class="userinput">sudo /usr/local/rvm/bin/rvm-exec default gem install arvados-cli</span>
</code></pre>
</notextile>
-{% include 'notebox_begin' %}
-
-If you are installing on CentOS6, you will need to modify PostgreSQL's configuration to allow password authentication for local users. The default configuration allows 'ident' only. The following commands will make the configuration change, and restart PostgreSQL for it to take effect.
-<br/>
-<notextile>
-<pre><code>~$ <span class="userinput">sudo sed -i -e "s/127.0.0.1\/32 ident/127.0.0.1\/32 md5/" /var/lib/pgsql/data/pg_hba.conf</span>
-~$ <span class="userinput">sudo sed -i -e "s/::1\/128 ident/::1\/128 md5/" /var/lib/pgsql/data/pg_hba.conf</span>
-~$ <span class="userinput">sudo service postgresql restart</span>
-</code></pre>
-</notextile>
-{% include 'notebox_end' %}
-
+{% assign pg_service = "postgresql" %}
+{% assign pg_hba_path = "/var/lib/pgsql/data/pg_hba.conf" %}
+{% include 'install_redhat_postgres_auth' %}
Next, generate a new database password. Nobody ever needs to memorize it or type it, so make a strong one:
</code></pre>
</notextile>
-h2. Set up configuration files
-
-The Workbench server package uses configuration files that you write to @/etc/arvados/workbench@ and ensures they're consistently deployed. Create this directory and copy the example configuration files to it:
-
-<notextile>
-<pre><code>~$ <span class="userinput">sudo mkdir -p /etc/arvados/workbench</span>
-~$ <span class="userinput">sudo chmod 700 /etc/arvados/workbench</span>
-~$ <span class="userinput">sudo cp /var/www/arvados-workbench/current/config/application.yml.example /etc/arvados/workbench/application.yml</span>
-</code></pre>
-</notextile>
-
h2(#configure). Configure Workbench
-Edit @/etc/arvados/workbench/application.yml@ following the instructions below. The deployment script will consistently deploy this to Workbench's configuration directory. Workbench reads both @application.yml@ and its own @config/application.defaults.yml@ file. Values in @application.yml@ take precedence over the defaults that are defined in @config/application.defaults.yml@. The @config/application.yml.example@ file is not read by Workbench and is provided for installation convenience only.
+Edit @/etc/arvados/workbench/application.yml@ following the instructions below. Workbench reads both @application.yml@ and its own @config/application.defaults.yml@ file. Values in @application.yml@ take precedence over the defaults that are defined in @config/application.defaults.yml@. The @config/application.yml.example@ file is not read by Workbench and is provided for installation convenience only.
Consult @config/application.default.yml@ for a full list of configuration options. Always put your local configuration in @/etc/arvados/workbench/application.yml@—never edit @config/application.default.yml@.
<li>If you're deploying on an older Red Hat-based distribution and installed Pythyon 2.7 from Software Collections, configure Nginx to use it:
<pre><code>~$ <span class="userinput">sudo usermod --shell /bin/bash nginx</span>
-~$ <span class="userinput">sudo -u nginx sh -c 'echo "[[ -z \$PS1 && -e /opt/rh/python27/enable ]] && source /opt/rh/python27/enable" >>~/.bash_profile'</span>
+~$ <span class="userinput">sudo -u nginx sh -c 'echo "[[ -z \$PS1 ]] && source scl_source enable python27" >>~/.bash_profile'</span>
</code></pre>
</li>
# If this job requires a Docker image, install that.
my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
if ($docker_locator = $Job->{docker_image_locator}) {
+ Log (undef, "Install docker image $docker_locator");
($docker_stream, $docker_hash) = find_docker_image($docker_locator);
if (!$docker_hash)
{
croak("No Docker image hash found from locator $docker_locator");
}
+ Log (undef, "docker image hash is $docker_hash");
$docker_stream =~ s/^\.//;
my $docker_install_script = qq{
if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
check_refresh_wanted();
check_squeue();
update_progress_stats();
- select (undef, undef, undef, 0.1);
}
elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
{
update_progress_stats();
}
+ if (!$gotsome) {
+ select (undef, undef, undef, 0.1);
+ }
$working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
$_->{node}->{hold_count} < 4 } @slot);
if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
foreach my $job (keys %reader)
{
my $buf;
- while (0 < sysread ($reader{$job}, $buf, 8192))
+ if (0 < sysread ($reader{$job}, $buf, 65536))
{
print STDERR $buf if $ENV{CRUNCH_DEBUG};
$jobstep[$job]->{stderr_at} = time;
$jobstep[$job]->{stderr} .= $buf;
+
+ # Consume everything up to the last \n
preprocess_stderr ($job);
+
if (length ($jobstep[$job]->{stderr}) > 16384)
{
- substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
+ # If we get a lot of stderr without a newline, chop off the
+ # front to avoid letting our buffer grow indefinitely.
+ substr ($jobstep[$job]->{stderr},
+ 0, length($jobstep[$job]->{stderr}) - 8192) = "";
}
$gotsome = 1;
}
# whoa.
$main::please_freeze = 1;
}
- elsif ($line =~ /srun: error: Node failure on/) {
+ elsif ($line =~ /srun: error: (Node failure on|Aborting, .*\bio error\b)/) {
my $job_slot_index = $jobstep[$job]->{slotindex};
$slot[$job_slot_index]->{node}->{fail_count}++;
$jobstep[$job]->{tempfail} = 1;
_logger = logging.getLogger('arvados.events')
+
class EventClient(WebSocketClient):
def __init__(self, url, filters, on_event, last_log_id):
ssl_options = {'ca_certs': arvados.util.ca_certs_path()}
self.filters = filters
self.on_event = on_event
self.last_log_id = last_log_id
- self._closed_lock = threading.RLock()
- self._closed = False
+ self._closing_lock = threading.RLock()
+ self._closing = False
+ self._closed = threading.Event()
def opened(self):
self.subscribe(self.filters, self.last_log_id)
+ def closed(self, code, reason=None):
+ self._closed.set()
+
def received_message(self, m):
- with self._closed_lock:
- if not self._closed:
+ with self._closing_lock:
+ if not self._closing:
self.on_event(json.loads(str(m)))
- def close(self, code=1000, reason=''):
- """Close event client and wait for it to finish."""
+ def close(self, code=1000, reason='', timeout=0):
+ """Close event client and optionally wait for it to finish.
+
+ :timeout: is the number of seconds to wait for ws4py to
+ indicate that the connection has closed.
+ """
super(EventClient, self).close(code, reason)
- with self._closed_lock:
+ with self._closing_lock:
# make sure we don't process any more messages.
- self._closed = True
+ self._closing = True
+ # wait for ws4py to tell us the connection is closed.
+ self._closed.wait(timeout=timeout)
def subscribe(self, filters, last_log_id=None):
m = {"method": "subscribe", "filters": filters}
def unsubscribe(self, filters):
self.send(json.dumps({"method": "unsubscribe", "filters": filters}))
+
class PollClient(threading.Thread):
def __init__(self, api, filters, on_event, poll_time, last_log_id):
super(PollClient, self).__init__()
self.on_event = on_event
self.poll_time = poll_time
self.daemon = True
- self.stop = threading.Event()
self.last_log_id = last_log_id
+ self._closing = threading.Event()
+ self._closing_lock = threading.RLock()
def run(self):
self.id = 0
self.on_event({'status': 200})
- while not self.stop.isSet():
+ while not self._closing.is_set():
max_id = self.id
moreitems = False
for f in self.filters:
for i in items["items"]:
if i['id'] > max_id:
max_id = i['id']
- self.on_event(i)
+ with self._closing_lock:
+ if self._closing.is_set():
+ return
+ self.on_event(i)
if items["items_available"] > len(items["items"]):
moreitems = True
self.id = max_id
if not moreitems:
- self.stop.wait(self.poll_time)
+ self._closing.wait(self.poll_time)
def run_forever(self):
# Have to poll here, otherwise KeyboardInterrupt will never get processed.
- while not self.stop.is_set():
- self.stop.wait(1)
+ while not self._closing.is_set():
+ self._closing.wait(1)
+
+ def close(self, code=None, reason=None, timeout=0):
+ """Close poll client and optionally wait for it to finish.
+
+ If an :on_event: handler is running in a different thread,
+ first wait (indefinitely) for it to return.
+
+ After closing, wait up to :timeout: seconds for the thread to
+ finish the poll request in progress (if any).
- def close(self):
- """Close poll client and wait for it to finish."""
+ :code: and :reason: are ignored. They are present for
+ interface compatibility with EventClient.
+ """
- self.stop.set()
+ with self._closing_lock:
+ self._closing.set()
try:
- self.join()
+ self.join(timeout=timeout)
except RuntimeError:
# "join() raises a RuntimeError if an attempt is made to join the
# current thread as that would cause a deadlock. It is also an
gem 'themes_for_rails'
gem 'arvados', '>= 0.1.20150615153458'
-gem 'arvados-cli', '>= 0.1.20151023185755'
+gem 'arvados-cli', '>= 0.1.20151207150126'
# pg_power lets us use partial indexes in schema.rb in Rails 3
gem 'pg_power'
google-api-client (~> 0.6.3, >= 0.6.3)
json (~> 1.7, >= 1.7.7)
jwt (>= 0.1.5, < 1.0.0)
- arvados-cli (0.1.20151023190001)
+ arvados-cli (0.1.20151207150126)
activesupport (~> 3.2, >= 3.2.13)
andand (~> 1.3, >= 1.3.3)
arvados (~> 0.1, >= 0.1.20150128223554)
acts_as_api
andand
arvados (>= 0.1.20150615153458)
- arvados-cli (>= 0.1.20151023185755)
+ arvados-cli (>= 0.1.20151207150126)
coffee-rails (~> 3.2.0)
database_cleaner
factory_girl_rails
return render_not_found
end
ping_data = {
- ip: params[:local_ipv4] || request.env['REMOTE_ADDR'],
+ ip: params[:local_ipv4] || request.remote_ip,
ec2_instance_id: params[:instance_id]
}
[:ping_secret, :total_cpu_cores, :total_ram_mb, :total_scratch_mb]
["#{table_name}.modified_at desc", "#{table_name}.uuid"]
end
+ def self.unique_columns
+ ["id", "uuid"]
+ end
+
# If current user can manage the object, return an array of uuids of
# users and groups that have permission to write the object. The
# first two elements are always [self.owner_uuid, current user's
fpm_depends+=(libcurl-devel postgresql-devel)
;;
debian* | ubuntu*)
- fpm_depends+=(libcurl4-openssl-dev libpq-dev)
+ fpm_depends+=(libcurl-ssl-dev libpq-dev)
;;
esac
jobrecord = Job.find_by_uuid(job_done.uuid)
- if exit_status == EXIT_RETRY_UNLOCKED
- # The job failed because all of the nodes allocated to it
- # failed. Only this crunch-dispatch process can retry the job:
+ if exit_status == EXIT_RETRY_UNLOCKED or (exit_tempfail and @job_retry_counts.include? jobrecord.uuid)
+ # Only this crunch-dispatch process can retry the job:
# it's already locked, and there's no way to put it back in the
# Queued state. Put it in our internal todo list unless the job
# has failed this way excessively.
# (e.g., [] or ['owner_uuid desc']), fall back on the default
# orders to ensure repeating the same request (possibly with
# different limit/offset) will return records in the same order.
- @orders += model_class.default_orders
+ #
+ # Clean up the resulting list of orders such that no column
+ # uselessly appears twice (Postgres might not optimize this out
+ # for us) and no columns uselessly appear after a unique column
+ # (Postgres does not optimize this out for us; as of 9.2, "order
+ # by id, modified_at desc, uuid" is slow but "order by id" is
+ # fast).
+ orders_given_and_default = @orders + model_class.default_orders
+ order_cols_used = {}
+ @orders = []
+ orders_given_and_default.each do |order|
+ otablecol = order.split(' ')[0]
+
+ next if order_cols_used[otablecol]
+ order_cols_used[otablecol] = true
+
+ @orders << order
+
+ otable, ocol = otablecol.split('.')
+ if otable == table_name and model_class.unique_columns.include?(ocol)
+ # we already have a full ordering; subsequent entries would be
+ # superfluous
+ break
+ end
+ end
case params[:select]
when Array
job_uuid: ~
info:
ping_secret: "abcdyi0x4lb5q4gzqqtrnq30oyj08r8dtdimmanbqw49z1anz2"
+
+node_with_no_ip_address_yet:
+ uuid: zzzzz-7ekkf-nodenoipaddryet
+ owner_uuid: zzzzz-tpzed-000000000000000
+ hostname: noipaddr
+ slot_number: ~
+ last_ping_at: ~
+ first_ping_at: ~
+ job_uuid: ~
+ info:
+ ping_secret: "abcdyefg4lb5q4gzqqtrnq30oyj08r8dtdimmanbqw49z1anz2"
}
assert_response 422
end
+
+ test "first ping should set ip addr using local_ipv4 when provided" do
+ post :ping, {
+ id: 'zzzzz-7ekkf-nodenoipaddryet',
+ instance_id: 'i-0000000',
+ local_ipv4: '172.17.2.172',
+ ping_secret: 'abcdyefg4lb5q4gzqqtrnq30oyj08r8dtdimmanbqw49z1anz2'
+ }
+ assert_response :success
+ response = JSON.parse(@response.body)
+ assert_equal 'zzzzz-7ekkf-nodenoipaddryet', response['uuid']
+ assert_equal '172.17.2.172', response['ip_address']
+ end
+
+ test "first ping should set ip addr using remote_ip when local_ipv4 is not provided" do
+ post :ping, {
+ id: 'zzzzz-7ekkf-nodenoipaddryet',
+ instance_id: 'i-0000000',
+ ping_secret: 'abcdyefg4lb5q4gzqqtrnq30oyj08r8dtdimmanbqw49z1anz2'
+ }
+ assert_response :success
+ response = JSON.parse(@response.body)
+ assert_equal 'zzzzz-7ekkf-nodenoipaddryet', response['uuid']
+ assert_equal request.remote_ip, response['ip_address']
+ end
+
+ test "future pings should not change previous ip address" do
+ post :ping, {
+ id: 'zzzzz-7ekkf-2z3mc76g2q73aio',
+ instance_id: 'i-0000000',
+ local_ipv4: '172.17.2.175',
+ ping_secret: '69udawxvn3zzj45hs8bumvndricrha4lcpi23pd69e44soanc0'
+ }
+ assert_response :success
+ response = JSON.parse(@response.body)
+ assert_equal 'zzzzz-7ekkf-2z3mc76g2q73aio', response['uuid']
+ assert_equal '172.17.2.174', response['ip_address'] # original ip address is not overwritten
+ end
end
--- /dev/null
+require 'test_helper'
+
+class Arvados::V1::QueryTest < ActionController::TestCase
+ test 'no fallback orders when order is unambiguous' do
+ @controller = Arvados::V1::LogsController.new
+ authorize_with :active
+ get :index, {
+ order: ['id asc'],
+ controller: 'logs',
+ }
+ assert_response :success
+ assert_equal ['logs.id asc'], assigns(:objects).order_values
+ end
+
+ test 'fallback orders when order is ambiguous' do
+ @controller = Arvados::V1::LogsController.new
+ authorize_with :active
+ get :index, {
+ order: ['event_type asc'],
+ controller: 'logs',
+ }
+ assert_response :success
+ assert_equal('logs.event_type asc, logs.modified_at desc, logs.uuid',
+ assigns(:objects).order_values.join(', '))
+ end
+
+ test 'skip fallback orders already given by client' do
+ @controller = Arvados::V1::LogsController.new
+ authorize_with :active
+ get :index, {
+ order: ['modified_at asc'],
+ controller: 'logs',
+ }
+ assert_response :success
+ assert_equal('logs.modified_at asc, logs.uuid',
+ assigns(:objects).order_values.join(', '))
+ end
+
+ test 'eliminate superfluous orders' do
+ @controller = Arvados::V1::LogsController.new
+ authorize_with :active
+ get :index, {
+ order: ['logs.modified_at asc',
+ 'modified_at desc',
+ 'event_type desc',
+ 'logs.event_type asc'],
+ controller: 'logs',
+ }
+ assert_response :success
+ assert_equal('logs.modified_at asc, logs.event_type desc, logs.uuid',
+ assigns(:objects).order_values.join(', '))
+ end
+
+ test 'eliminate orders after the first unique column' do
+ @controller = Arvados::V1::LogsController.new
+ authorize_with :active
+ get :index, {
+ order: ['event_type asc',
+ 'id asc',
+ 'uuid asc',
+ 'modified_at desc'],
+ controller: 'logs',
+ }
+ assert_response :success
+ assert_equal('logs.event_type asc, logs.id asc',
+ assigns(:objects).order_values.join(', '))
+ end
+end
statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
stats, err := ioutil.ReadFile(statsFilename)
if err != nil {
- statLog.Printf("read %s: %s\n", statsFilename, err)
+ statLog.Printf("error reading %s: %s\n", statsFilename, err)
continue
}
return strings.NewReader(string(stats)), nil
if cmd.Process != nil {
cmd.Process.Signal(catch)
}
- statLog.Println("caught signal:", catch)
+ statLog.Println("notice: caught signal:", catch)
}(sigChan)
signal.Notify(sigChan, syscall.SIGTERM)
signal.Notify(sigChan, syscall.SIGINT)
self.add_argument('--crunchstat-interval', type=float, help="Write stats to stderr every N seconds (default disabled)", default=0)
+ self.add_argument('--unmount-timeout',
+ type=float, default=2.0,
+ help="Time to wait for graceful shutdown after --exec program exits and filesystem is unmounted")
+
self.add_argument('--exec', type=str, nargs=argparse.REMAINDER,
dest="exec_args", metavar=('command', 'args', '...', '--'),
help="""Mount, run a command, then unmount and exit""")
def __init__(self, args, logger=logging.getLogger('arvados.arv-mount')):
self.logger = logger
self.args = args
+ self.listen_for_events = False
self.args.mountpoint = os.path.realpath(self.args.mountpoint)
if self.args.logfile:
def __enter__(self):
llfuse.init(self.operations, self.args.mountpoint, self._fuse_options())
- if self.args.mode != 'by_pdh':
+ if self.listen_for_events:
self.operations.listen_for_events()
- t = threading.Thread(None, lambda: llfuse.main())
- t.start()
+ self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
+ self.llfuse_thread.daemon = True
+ self.llfuse_thread.start()
self.operations.initlock.wait()
def __exit__(self, exc_type, exc_value, traceback):
subprocess.call(["fusermount", "-u", "-z", self.args.mountpoint])
- self.operations.destroy()
+ self.llfuse_thread.join(timeout=self.args.unmount_timeout)
+ if self.llfuse_thread.is_alive():
+ self.logger.warning("Mount.__exit__:"
+ " llfuse thread still alive %fs after umount"
+ " -- abandoning and exiting anyway",
+ self.args.unmount_timeout)
def run(self):
if self.args.exec_args:
mount_readme = True
if dir_class is not None:
- self.operations.inodes.add_entry(dir_class(*dir_args))
+ ent = dir_class(*dir_args)
+ self.operations.inodes.add_entry(ent)
+ self.listen_for_events = ent.want_event_subscribe()
return
e = self.operations.inodes.add_entry(Directory(
if name in ['', '.', '..'] or '/' in name:
sys.exit("Mount point '{}' is not supported.".format(name))
tld._entries[name] = self.operations.inodes.add_entry(ent)
+ self.listen_for_events = (self.listen_for_events or ent.want_event_subscribe())
def _readme_text(self, api_host, user_email):
return '''
'''.format(api_host, user_email)
def _run_exec(self):
- # Initialize the fuse connection
- llfuse.init(self.operations, self.args.mountpoint, self._fuse_options())
-
- # Subscribe to change events from API server
- if self.args.mode != 'by_pdh':
- self.operations.listen_for_events()
-
- t = threading.Thread(None, lambda: llfuse.main())
- t.start()
-
- # wait until the driver is finished initializing
- self.operations.initlock.wait()
-
rc = 255
- try:
- sp = subprocess.Popen(self.args.exec_args, shell=False)
-
- # forward signals to the process.
- signal.signal(signal.SIGINT, lambda signum, frame: sp.send_signal(signum))
- signal.signal(signal.SIGTERM, lambda signum, frame: sp.send_signal(signum))
- signal.signal(signal.SIGQUIT, lambda signum, frame: sp.send_signal(signum))
-
- # wait for process to complete.
- rc = sp.wait()
-
- # restore default signal handlers.
- signal.signal(signal.SIGINT, signal.SIG_DFL)
- signal.signal(signal.SIGTERM, signal.SIG_DFL)
- signal.signal(signal.SIGQUIT, signal.SIG_DFL)
- except Exception as e:
- self.logger.exception(
- 'arv-mount: exception during exec %s', self.args.exec_args)
+ with self:
try:
- rc = e.errno
- except AttributeError:
- pass
- finally:
- subprocess.call(["fusermount", "-u", "-z", self.args.mountpoint])
- self.operations.destroy()
+ sp = subprocess.Popen(self.args.exec_args, shell=False)
+
+ # forward signals to the process.
+ signal.signal(signal.SIGINT, lambda signum, frame: sp.send_signal(signum))
+ signal.signal(signal.SIGTERM, lambda signum, frame: sp.send_signal(signum))
+ signal.signal(signal.SIGQUIT, lambda signum, frame: sp.send_signal(signum))
+
+ # wait for process to complete.
+ rc = sp.wait()
+
+ # restore default signal handlers.
+ signal.signal(signal.SIGINT, signal.SIG_DFL)
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ signal.signal(signal.SIGQUIT, signal.SIG_DFL)
+ except Exception as e:
+ self.logger.exception(
+ 'arv-mount: exception during exec %s', self.args.exec_args)
+ try:
+ rc = e.errno
+ except AttributeError:
+ pass
exit(rc)
def _run_standalone(self):
try:
llfuse.init(self.operations, self.args.mountpoint, self._fuse_options())
- if not (self.args.exec_args or self.args.foreground):
- self.daemon_ctx = daemon.DaemonContext(working_directory=os.path.dirname(self.args.mountpoint),
- files_preserve=range(3, resource.getrlimit(resource.RLIMIT_NOFILE)[1]))
+ if not self.args.foreground:
+ self.daemon_ctx = daemon.DaemonContext(
+ working_directory=os.path.dirname(self.args.mountpoint),
+ files_preserve=range(
+ 3, resource.getrlimit(resource.RLIMIT_NOFILE)[1]))
self.daemon_ctx.open()
# Subscribe to change events from API server
- self.operations.listen_for_events()
+ if self.listen_for_events:
+ self.operations.listen_for_events()
- llfuse.main()
+ self._llfuse_main()
except Exception as e:
self.logger.exception('arv-mount: exception during mount: %s', e)
exit(getattr(e, 'errno', 1))
- finally:
- self.operations.destroy()
exit(0)
+
+ def _llfuse_main(self):
+ try:
+ llfuse.main()
+ except:
+ llfuse.close(unmount=False)
+ raise
+ llfuse.close()
def flush(self):
pass
+ def want_event_subscribe(self):
+ raise NotImplementedError()
+
def create(self, name):
raise NotImplementedError()
def writable(self):
return self.collection.writable() if self.collection is not None else self._writable
+ def want_event_subscribe(self):
+ return (uuid_pattern.match(self.collection_locator) is not None)
+
# Used by arv-web.py to switch the contents of the CollectionDirectory
def change_collection(self, new_locator):
"""Switch the contents of the CollectionDirectory.
def writable(self):
return True
+ def want_event_subscribe(self):
+ return False
+
def finalize(self):
self.collection.stop_threads()
def clear(self, force=False):
pass
+ def want_event_subscribe(self):
+ return not self.pdh_only
+
class RecursiveInvalidateDirectory(Directory):
def invalidate(self):
self._poll = True
self._poll_time = poll_time
+ def want_event_subscribe(self):
+ return True
+
@use_counter
def update(self):
with llfuse.lock_released:
self._poll = poll
self._poll_time = poll_time
+ def want_event_subscribe(self):
+ return True
+
@use_counter
def update(self):
with llfuse.lock_released:
self._updating_lock = threading.Lock()
self._current_user = None
+ def want_event_subscribe(self):
+ return True
+
def createDirectory(self, i):
if collection_uuid_pattern.match(i['uuid']):
return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
except Exception:
_logger.exception()
+
+ def want_event_subscribe(self):
+ return True
--- /dev/null
+fpm_depends+=(fuse)
],
install_requires=[
'arvados-python-client >= 0.1.20151118035730',
- 'llfuse>=0.40',
+ 'llfuse==0.41.1',
'python-daemon',
'ciso8601'
],
def wrapper(self, *args, **kwargs):
with arvados_fuse.command.Mount(
arvados_fuse.command.ArgumentParser().parse_args(
- argv + ['--foreground', self.mnt])):
+ argv + ['--foreground',
+ '--unmount-timeout=0.1',
+ self.mnt])):
return func(self, *args, **kwargs)
return wrapper
return decorator
run_test_server.authorize_with("admin")
self.api = api if api else arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
+ # This is a copy of Mount's method. TODO: Refactor MountTestBase
+ # to use a Mount instead of copying its code.
+ def _llfuse_main(self):
+ try:
+ llfuse.main()
+ except:
+ llfuse.close(unmount=False)
+ raise
+ llfuse.close()
+
def make_mount(self, root_class, **root_kwargs):
self.operations = fuse.Operations(
os.getuid(), os.getgid(),
self.operations.inodes.add_entry(root_class(
llfuse.ROOT_INODE, self.operations.inodes, self.api, 0, **root_kwargs))
llfuse.init(self.operations, self.mounttmp, [])
- threading.Thread(None, llfuse.main).start()
+ self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
+ self.llfuse_thread.daemon = True
+ self.llfuse_thread.start()
# wait until the driver is finished initializing
self.operations.initlock.wait()
return self.operations.inodes[llfuse.ROOT_INODE]
self.pool.join()
del self.pool
- # llfuse.close is buggy, so use fusermount instead.
- #llfuse.close(unmount=True)
-
- count = 0
- success = 1
- while (count < 9 and success != 0):
- success = subprocess.call(["fusermount", "-u", self.mounttmp])
- time.sleep(0.1)
- count += 1
-
- self.operations.destroy()
+ subprocess.call(["fusermount", "-u", "-z", self.mounttmp])
+ self.llfuse_thread.join(timeout=1)
+ if self.llfuse_thread.is_alive():
+ logger.warning("MountTestBase.tearDown():"
+ " llfuse thread still alive 1s after umount"
+ " -- abandoning and exiting anyway")
os.rmdir(self.mounttmp)
if self.keeptmp:
import json
import llfuse
import logging
+import mock
import os
import run_test_server
import sys
self.mnt = arvados_fuse.command.Mount(args)
e = self.check_ent_type(arvados_fuse.MagicDirectory)
self.assertEqual(e.pdh_only, False)
+ self.assertEqual(True, self.mnt.listen_for_events)
@noexit
def test_by_pdh(self):
self.mnt = arvados_fuse.command.Mount(args)
e = self.check_ent_type(arvados_fuse.MagicDirectory)
self.assertEqual(e.pdh_only, True)
+ self.assertEqual(False, self.mnt.listen_for_events)
@noexit
def test_by_tag(self):
self.assertEqual(args.mode, 'by_tag')
self.mnt = arvados_fuse.command.Mount(args)
e = self.check_ent_type(arvados_fuse.TagsDirectory)
+ self.assertEqual(True, self.mnt.listen_for_events)
@noexit
def test_collection(self, id_type='uuid'):
self.mnt = arvados_fuse.command.Mount(args)
e = self.check_ent_type(arvados_fuse.CollectionDirectory)
self.assertEqual(e.collection_locator, cid)
+ self.assertEqual(id_type == 'uuid', self.mnt.listen_for_events)
def test_collection_pdh(self):
self.test_collection('portable_data_hash')
e = self.check_ent_type(arvados_fuse.ProjectDirectory)
self.assertEqual(e.project_object['uuid'],
run_test_server.fixture('users')['active']['uuid'])
+ self.assertEqual(True, self.mnt.listen_for_events)
def test_mutually_exclusive_args(self):
cid = run_test_server.fixture('collections')['public_text_file']['uuid']
e = self.check_ent_type(arvados_fuse.SharedDirectory)
self.assertEqual(e.current_user['uuid'],
run_test_server.fixture('users')['active']['uuid'])
+ self.assertEqual(True, self.mnt.listen_for_events)
@noexit
- def test_custom(self):
+ @mock.patch('arvados.events.subscribe')
+ def test_custom(self, mock_subscribe):
args = arvados_fuse.command.ArgumentParser().parse_args([
'--mount-tmp', 'foo',
'--mount-tmp', 'bar',
e = self.check_ent_type(arvados_fuse.ProjectDirectory, 'my_home')
self.assertEqual(e.project_object['uuid'],
run_test_server.fixture('users')['active']['uuid'])
+ self.assertEqual(True, self.mnt.listen_for_events)
+ with self.mnt:
+ pass
+ self.assertEqual(1, mock_subscribe.call_count)
+
+ @noexit
+ @mock.patch('arvados.events.subscribe')
+ def test_custom_no_listen(self, mock_subscribe):
+ args = arvados_fuse.command.ArgumentParser().parse_args([
+ '--mount-by-pdh', 'pdh',
+ '--mount-tmp', 'foo',
+ '--mount-tmp', 'bar',
+ '--foreground', self.mntdir])
+ self.mnt = arvados_fuse.command.Mount(args)
+ self.assertEqual(False, self.mnt.listen_for_events)
+ with self.mnt:
+ pass
+ self.assertEqual(0, mock_subscribe.call_count)
def test_custom_unsupported_layouts(self):
for name in ['.', '..', '', 'foo/bar', '/foo']:
--- /dev/null
+import arvados_fuse.command
+import json
+import multiprocessing
+import os
+import run_test_server
+import tempfile
+import unittest
+
+try:
+ from shlex import quote
+except:
+ from pipes import quote
+
+def try_exec(mnt, cmd):
+ try:
+ arvados_fuse.command.Mount(
+ arvados_fuse.command.ArgumentParser().parse_args([
+ '--read-write',
+ '--mount-tmp=zzz',
+ '--unmount-timeout=0.1',
+ mnt,
+ '--exec'] + cmd)).run()
+ except SystemExit:
+ pass
+ else:
+ raise AssertionError('should have exited')
+
+
+class ExecMode(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ run_test_server.run()
+ run_test_server.run_keep(enforce_permissions=True, num_servers=2)
+ run_test_server.authorize_with('active')
+
+ @classmethod
+ def tearDownClass(cls):
+ run_test_server.stop_keep(num_servers=2)
+
+ def setUp(self):
+ self.mnt = tempfile.mkdtemp()
+ _, self.okfile = tempfile.mkstemp()
+ self.pool = multiprocessing.Pool(1)
+
+ def tearDown(self):
+ self.pool.terminate()
+ self.pool.join()
+ os.rmdir(self.mnt)
+ os.unlink(self.okfile)
+
+ def test_exec(self):
+ self.pool.apply(try_exec, (self.mnt, [
+ 'sh', '-c',
+ 'echo -n foo >{}; cp {} {}'.format(
+ quote(os.path.join(self.mnt, 'zzz', 'foo.txt')),
+ quote(os.path.join(self.mnt, 'zzz', '.arvados#collection')),
+ quote(os.path.join(self.okfile)))]))
+ self.assertRegexpMatches(
+ json.load(open(self.okfile))['manifest_text'],
+ r' 0:3:foo.txt\n')
}
func (s *azureVolumeAdder) Set(containerName string) error {
+ if trashLifetime != 0 {
+ return ErrNotImplemented
+ }
+
if containerName == "" {
return errors.New("no container name given")
}
}
}
-// Delete a Keep block.
-func (v *AzureBlobVolume) Delete(loc string) error {
+// Trash a Keep block.
+func (v *AzureBlobVolume) Trash(loc string) error {
if v.readonly {
return MethodDisabledError
}
+
+ if trashLifetime != 0 {
+ return ErrNotImplemented
+ }
+
// Ideally we would use If-Unmodified-Since, but that
// particular condition seems to be ignored by Azure. Instead,
// we get the Etag before checking Mtime, and use If-Match to
})
}
+// Untrash a Keep block.
+// TBD
+func (v *AzureBlobVolume) Untrash(loc string) error {
+ return ErrNotImplemented
+}
+
// Status returns a VolumeStatus struct with placeholder data.
func (v *AzureBlobVolume) Status() *VolumeStatus {
return &VolumeStatus{
t.Errorf("Got X-Keep-Replicas-Stored: %q, expected %q", r, "1")
}
}
+
+func TestUntrashHandler(t *testing.T) {
+ defer teardown()
+
+ // Set up Keep volumes
+ KeepVM = MakeTestVolumeManager(2)
+ defer KeepVM.Close()
+ vols := KeepVM.AllWritable()
+ vols[0].Put(TestHash, TestBlock)
+
+ dataManagerToken = "DATA MANAGER TOKEN"
+
+ // unauthenticatedReq => UnauthorizedError
+ unauthenticatedReq := &RequestTester{
+ method: "PUT",
+ uri: "/untrash/" + TestHash,
+ }
+ response := IssueRequest(unauthenticatedReq)
+ ExpectStatusCode(t,
+ "Unauthenticated request",
+ UnauthorizedError.HTTPCode,
+ response)
+
+ // notDataManagerReq => UnauthorizedError
+ notDataManagerReq := &RequestTester{
+ method: "PUT",
+ uri: "/untrash/" + TestHash,
+ apiToken: knownToken,
+ }
+
+ response = IssueRequest(notDataManagerReq)
+ ExpectStatusCode(t,
+ "Non-datamanager token",
+ UnauthorizedError.HTTPCode,
+ response)
+
+ // datamanagerWithBadHashReq => StatusBadRequest
+ datamanagerWithBadHashReq := &RequestTester{
+ method: "PUT",
+ uri: "/untrash/thisisnotalocator",
+ apiToken: dataManagerToken,
+ }
+ response = IssueRequest(datamanagerWithBadHashReq)
+ ExpectStatusCode(t,
+ "Bad locator in untrash request",
+ http.StatusBadRequest,
+ response)
+
+ // datamanagerWrongMethodReq => StatusBadRequest
+ datamanagerWrongMethodReq := &RequestTester{
+ method: "GET",
+ uri: "/untrash/" + TestHash,
+ apiToken: dataManagerToken,
+ }
+ response = IssueRequest(datamanagerWrongMethodReq)
+ ExpectStatusCode(t,
+ "Only PUT method is supported for untrash",
+ http.StatusBadRequest,
+ response)
+
+ // datamanagerReq => StatusOK
+ datamanagerReq := &RequestTester{
+ method: "PUT",
+ uri: "/untrash/" + TestHash,
+ apiToken: dataManagerToken,
+ }
+ response = IssueRequest(datamanagerReq)
+ ExpectStatusCode(t,
+ "",
+ http.StatusOK,
+ response)
+ expected := "Successfully untrashed on: [MockVolume],[MockVolume]"
+ if response.Body.String() != expected {
+ t.Errorf(
+ "Untrash response mismatched: expected %s, got:\n%s",
+ expected, response.Body.String())
+ }
+}
+
+func TestUntrashHandlerWithNoWritableVolumes(t *testing.T) {
+ defer teardown()
+
+ // Set up readonly Keep volumes
+ vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()}
+ vols[0].Readonly = true
+ vols[1].Readonly = true
+ KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
+ defer KeepVM.Close()
+
+ dataManagerToken = "DATA MANAGER TOKEN"
+
+ // datamanagerReq => StatusOK
+ datamanagerReq := &RequestTester{
+ method: "PUT",
+ uri: "/untrash/" + TestHash,
+ apiToken: dataManagerToken,
+ }
+ response := IssueRequest(datamanagerReq)
+ ExpectStatusCode(t,
+ "No writable volumes",
+ http.StatusNotFound,
+ response)
+}
"regexp"
"runtime"
"strconv"
+ "strings"
"sync"
"time"
)
// Replace the current trash queue.
rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
+ // Untrash moves blocks from trash back into store
+ rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
+
// Any request which does not match any of these routes gets
// 400 Bad Request.
rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
Failed int `json:"copies_failed"`
}
for _, vol := range KeepVM.AllWritable() {
- if err := vol.Delete(hash); err == nil {
+ if err := vol.Trash(hash); err == nil {
result.Deleted++
} else if os.IsNotExist(err) {
continue
trashq.ReplaceQueue(tlist)
}
+// UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
+func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
+ // Reject unauthorized requests.
+ if !IsDataManagerToken(GetApiToken(req)) {
+ http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
+ return
+ }
+
+ hash := mux.Vars(req)["hash"]
+
+ if len(KeepVM.AllWritable()) == 0 {
+ http.Error(resp, "No writable volumes", http.StatusNotFound)
+ return
+ }
+
+ var untrashedOn, failedOn []string
+ var numNotFound int
+ for _, vol := range KeepVM.AllWritable() {
+ err := vol.Untrash(hash)
+
+ if os.IsNotExist(err) {
+ numNotFound++
+ } else if err != nil {
+ log.Printf("Error untrashing %v on volume %v", hash, vol.String())
+ failedOn = append(failedOn, vol.String())
+ } else {
+ log.Printf("Untrashed %v on volume %v", hash, vol.String())
+ untrashedOn = append(untrashedOn, vol.String())
+ }
+ }
+
+ if numNotFound == len(KeepVM.AllWritable()) {
+ http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
+ return
+ }
+
+ if len(failedOn) == len(KeepVM.AllWritable()) {
+ http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
+ } else {
+ respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
+ if len(failedOn) > 0 {
+ respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
+ }
+ resp.Write([]byte(respBody))
+ }
+}
+
// ==============================
// GetBlock and PutBlock implement lower-level code for handling
// blocks by rooting through volumes connected to the local machine.
// actually deleting anything.
var neverDelete = true
+// trashLifetime is the time duration after a block is trashed
+// during which it can be recovered using an /untrash request
+var trashLifetime time.Duration
+
var maxBuffers = 128
var bufs *bufferPool
SizeRequiredError = &KeepError{411, "Missing Content-Length"}
TooLongError = &KeepError{413, "Block is too large"}
MethodDisabledError = &KeepError{405, "Method disabled"}
+ ErrNotImplemented = &KeepError{500, "Unsupported configuration"}
)
func (e *KeepError) Error() string {
"max-buffers",
maxBuffers,
fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
+ flag.DurationVar(
+ &trashLifetime,
+ "trash-lifetime",
+ 0*time.Second,
+ "Interval after a block is trashed during which it can be recovered using an /untrash request")
flag.Parse()
}
func (s *s3VolumeAdder) Set(bucketName string) error {
+ if trashLifetime != 0 {
+ return ErrNotImplemented
+ }
if bucketName == "" {
return fmt.Errorf("no container name given")
}
return nil
}
-func (v *S3Volume) Delete(loc string) error {
+func (v *S3Volume) Trash(loc string) error {
if v.readonly {
return MethodDisabledError
}
+ if trashLifetime != 0 {
+ return ErrNotImplemented
+ }
if t, err := v.Mtime(loc); err != nil {
return err
} else if time.Since(t) < blobSignatureTTL {
return v.Bucket.Del(loc)
}
+// TBD
+func (v *S3Volume) Untrash(loc string) error {
+ return ErrNotImplemented
+}
+
func (v *S3Volume) Status() *VolumeStatus {
return &VolumeStatus{
DeviceNum: 1,
if neverDelete {
err = errors.New("did not delete block because neverDelete is true")
} else {
- err = volume.Delete(trashRequest.Locator)
+ err = volume.Trash(trashRequest.Locator)
}
if err != nil {
// particular order.
IndexTo(prefix string, writer io.Writer) error
- // Delete deletes the block data from the underlying storage
- // device.
+ // Trash moves the block data from the underlying storage
+ // device to trash area. The block then stays in trash for
+ // -trash-lifetime interval before it is actually deleted.
//
// loc is as described in Get.
//
// If the timestamp for the given locator is newer than
- // blobSignatureTTL, Delete must not delete the data.
+ // blobSignatureTTL, Trash must not trash the data.
//
- // If a Delete operation overlaps with any Touch or Put
+ // If a Trash operation overlaps with any Touch or Put
// operations on the same locator, the implementation must
// ensure one of the following outcomes:
//
// - Touch and Put return a non-nil error, or
- // - Delete does not delete the block, or
+ // - Trash does not trash the block, or
// - Both of the above.
//
// If it is possible for the storage device to be accessed by
// reliably or fail outright.
//
// Corollary: A successful Touch or Put guarantees a block
- // will not be deleted for at least blobSignatureTTL
+ // will not be trashed for at least blobSignatureTTL
// seconds.
- Delete(loc string) error
+ Trash(loc string) error
+
+ // Untrash moves block from trash back into store
+ Untrash(loc string) error
// Status returns a *VolumeStatus representing the current
// in-use and available storage capacity and an
v.Put(TestHash, TestBlock)
- if err := v.Delete(TestHash); err != nil {
+ if err := v.Trash(TestHash); err != nil {
t.Error(err)
}
data, err := v.Get(TestHash)
v.Put(TestHash, TestBlock)
v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
- if err := v.Delete(TestHash); err != nil {
+ if err := v.Trash(TestHash); err != nil {
t.Error(err)
}
if _, err := v.Get(TestHash); err == nil || !os.IsNotExist(err) {
v := factory(t)
defer v.Teardown()
- if err := v.Delete(TestHash2); err == nil {
+ if err := v.Trash(TestHash2); err == nil {
t.Errorf("Expected error when attempting to delete a non-existing block")
}
}
}
// Delete a block from a read-only volume should result in error
- err = v.Delete(TestHash)
+ err = v.Trash(TestHash)
if err == nil {
t.Errorf("Expected error when deleting block from a read-only volume")
}
return nil
}
-func (v *MockVolume) Delete(loc string) error {
+func (v *MockVolume) Trash(loc string) error {
v.gotCall("Delete")
<-v.Gate
if v.Readonly {
return os.ErrNotExist
}
+// TBD
+func (v *MockVolume) Untrash(loc string) error {
+ return nil
+}
+
func (v *MockVolume) Status() *VolumeStatus {
var used uint64
for _, block := range v.Store {
}
func (vs *unixVolumeAdder) Set(value string) error {
+ if trashLifetime != 0 {
+ return ErrNotImplemented
+ }
if dirs := strings.Split(value, ","); len(dirs) > 1 {
log.Print("DEPRECATED: using comma-separated volume list.")
for _, dir := range dirs {
}
// Delete deletes the block data from the unix storage
-func (v *UnixVolume) Delete(loc string) error {
+func (v *UnixVolume) Trash(loc string) error {
// Touch() must be called before calling Write() on a block. Touch()
// also uses lockfile(). This avoids a race condition between Write()
// and Delete() because either (a) the file will be deleted and Touch()
if v.readonly {
return MethodDisabledError
}
+ if trashLifetime != 0 {
+ return ErrNotImplemented
+ }
if v.locker != nil {
v.locker.Lock()
defer v.locker.Unlock()
return os.Remove(p)
}
+// Untrash moves block from trash back into store
+// TBD
+func (v *UnixVolume) Untrash(loc string) error {
+ return ErrNotImplemented
+}
+
// blockDir returns the fully qualified directory name for the directory
// where loc is (or would be) stored on this volume.
func (v *UnixVolume) blockDir(loc string) string {
t.Errorf("got err %v, expected MethodDisabledError", err)
}
- err = v.Delete(TestHash)
+ err = v.Trash(TestHash)
if err != MethodDisabledError {
t.Errorf("got err %v, expected MethodDisabledError", err)
}
logins.each do |l|
next if seen[l[:username]]
seen[l[:username]] = true if not seen.has_key?(l[:username])
- @homedir = "/home/#{l[:username]}"
unless uids[l[:username]]
STDERR.puts "Creating account #{l[:username]}"
out: devnull)
end
# Create .ssh directory if necessary
+ @homedir = Etc.getpwnam(l[:username]).dir
userdotssh = File.join(@homedir, ".ssh")
Dir.mkdir(userdotssh) if !File.exists?(userdotssh)
@key = "#######################################################################################
puts bang.backtrace.join("\n")
exit 1
end
-
super(RemotePollLoopActor, self).__init__()
self._client = client
self._timer = timer_actor
- self._logger = logging.getLogger(self.LOGGER_NAME)
self._later = self.actor_ref.proxy()
self._polling_started = False
- self.log_prefix = "{} (at {})".format(self.__class__.__name__, id(self))
self.min_poll_wait = poll_wait
self.max_poll_wait = max_poll_wait
self.poll_wait = self.min_poll_wait
if hasattr(self, '_item_key'):
self.subscribe_to = self._subscribe_to
+ def on_start(self):
+ self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, id(self.actor_urn[9:])))
+
def _start_polling(self):
if not self._polling_started:
self._polling_started = True
def subscribe(self, subscriber):
self.all_subscribers.add(subscriber)
- self._logger.debug("%r subscribed to all events", subscriber)
+ self._logger.debug("%s subscribed to all events", subscriber.actor_ref.actor_urn)
self._start_polling()
# __init__ exposes this method to the proxy if the subclass defines
# _item_key.
def _subscribe_to(self, key, subscriber):
self.key_subscribers.setdefault(key, set()).add(subscriber)
- self._logger.debug("%r subscribed to events for '%s'", subscriber, key)
+ self._logger.debug("%s subscribed to events for '%s'", subscriber.actor_ref.actor_urn, key)
self._start_polling()
def _send_request(self):
raise NotImplementedError("subclasses must implement request method")
def _got_response(self, response):
- self._logger.debug("%s got response with %d items",
- self.log_prefix, len(response))
self.poll_wait = self.min_poll_wait
_notify_subscribers(response, self.all_subscribers)
if hasattr(self, '_item_key'):
def _got_error(self, error):
self.poll_wait = min(self.poll_wait * 2, self.max_poll_wait)
- return "{} got error: {} - waiting {} seconds".format(
- self.log_prefix, error, self.poll_wait)
+ return "got error: {} - will try again in {} seconds".format(
+ error, self.poll_wait)
def is_common_error(self, exception):
return False
def poll(self, scheduled_start=None):
- self._logger.debug("%s sending poll", self.log_prefix)
+ self._logger.debug("sending request")
start_time = time.time()
if scheduled_start is None:
scheduled_start = start_time
else:
self._got_response(response)
next_poll = scheduled_start + self.poll_wait
+ self._logger.info("got response with %d items in %s seconds, next poll at %s",
+ len(response), (time.time() - scheduled_start),
+ time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_poll)))
end_time = time.time()
if next_poll < end_time: # We've drifted too much; start fresh.
next_poll = end_time + self.poll_wait
from __future__ import absolute_import, print_function
import calendar
+import functools
import itertools
import re
import time
else:
return not timestamp_fresh(arvados_timestamp(arvados_node["last_ping_at"]), fresh_time)
+class RetryMixin(object):
+ """Retry decorator for an method that makes remote requests.
+
+ Use this function to decorate method, and pass in a tuple of exceptions to
+ catch. If the original method raises a known cloud driver error, or any of
+ the given exception types, this decorator will either go into a
+ sleep-and-retry loop with exponential backoff either by sleeping (if
+ self._timer is None) or by scheduling retries of the method (if self._timer
+ is a timer actor.)
+
+ """
+ def __init__(self, retry_wait, max_retry_wait,
+ logger, cloud, timer=None):
+ self.min_retry_wait = retry_wait
+ self.max_retry_wait = max_retry_wait
+ self.retry_wait = retry_wait
+ self._logger = logger
+ self._cloud = cloud
+ self._timer = timer
+
+ @staticmethod
+ def _retry(errors=()):
+ def decorator(orig_func):
+ @functools.wraps(orig_func)
+ def retry_wrapper(self, *args, **kwargs):
+ while True:
+ try:
+ ret = orig_func(self, *args, **kwargs)
+ except Exception as error:
+ if not (isinstance(error, errors) or
+ self._cloud.is_cloud_exception(error)):
+ self.retry_wait = self.min_retry_wait
+ self._logger.warning(
+ "Re-raising unknown error (no retry): %s",
+ error, exc_info=error)
+ raise
+
+ self._logger.warning(
+ "Client error: %s - waiting %s seconds",
+ error, self.retry_wait, exc_info=error)
+
+ if self._timer:
+ start_time = time.time()
+ # reschedule to be called again
+ self._timer.schedule(start_time + self.retry_wait,
+ getattr(self._later,
+ orig_func.__name__),
+ *args, **kwargs)
+ else:
+ # sleep on it.
+ time.sleep(self.retry_wait)
+
+ self.retry_wait = min(self.retry_wait * 2,
+ self.max_retry_wait)
+ if self._timer:
+ # expect to be called again by timer so don't loop
+ return
+ else:
+ self.retry_wait = self.min_retry_wait
+ return ret
+ return retry_wrapper
+ return decorator
+
class ShutdownTimer(object):
"""Keep track of a cloud node's shutdown windows.
import pykka
from .. import \
- arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, arvados_node_missing
+ arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
+ arvados_node_missing, RetryMixin
from ...clientactor import _notify_subscribers
from ... import config
-class ComputeNodeStateChangeBase(config.actor_class):
+class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
"""Base class for actors that change a compute node's state.
This base class takes care of retrying changes and notifying
subscribers when the change is finished.
"""
- def __init__(self, logger_name, cloud_client, arvados_client, timer_actor,
+ def __init__(self, cloud_client, arvados_client, timer_actor,
retry_wait, max_retry_wait):
super(ComputeNodeStateChangeBase, self).__init__()
+ RetryMixin.__init__(self, retry_wait, max_retry_wait,
+ None, cloud_client, timer_actor)
self._later = self.actor_ref.proxy()
- self._logger = logging.getLogger(logger_name)
- self._cloud = cloud_client
self._arvados = arvados_client
- self._timer = timer_actor
- self.min_retry_wait = retry_wait
- self.max_retry_wait = max_retry_wait
- self.retry_wait = retry_wait
self.subscribers = set()
- @staticmethod
- def _retry(errors=()):
- """Retry decorator for an actor method that makes remote requests.
-
- Use this function to decorator an actor method, and pass in a
- tuple of exceptions to catch. This decorator will schedule
- retries of that method with exponential backoff if the
- original method raises a known cloud driver error, or any of the
- given exception types.
- """
- def decorator(orig_func):
- @functools.wraps(orig_func)
- def retry_wrapper(self, *args, **kwargs):
- start_time = time.time()
- try:
- orig_func(self, *args, **kwargs)
- except Exception as error:
- if not (isinstance(error, errors) or
- self._cloud.is_cloud_exception(error)):
- raise
- self._logger.warning(
- "Client error: %s - waiting %s seconds",
- error, self.retry_wait)
- self._timer.schedule(start_time + self.retry_wait,
- getattr(self._later,
- orig_func.__name__),
- *args, **kwargs)
- self.retry_wait = min(self.retry_wait * 2,
- self.max_retry_wait)
- else:
- self.retry_wait = self.min_retry_wait
- return retry_wrapper
- return decorator
+ def _set_logger(self):
+ self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
+
+ def on_start(self):
+ self._set_logger()
def _finished(self):
_notify_subscribers(self._later, self.subscribers)
self.subscribers = None
+ self._logger.info("finished")
def subscribe(self, subscriber):
if self.subscribers is None:
'last_action': explanation}},
).execute()
+ @staticmethod
+ def _finish_on_exception(orig_func):
+ @functools.wraps(orig_func)
+ def finish_wrapper(self, *args, **kwargs):
+ try:
+ return orig_func(self, *args, **kwargs)
+ except Exception as error:
+ self._logger.error("Actor error %s", error)
+ self._finished()
+ return finish_wrapper
+
class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
"""Actor to create and set up a cloud compute node.
cloud_size, arvados_node=None,
retry_wait=1, max_retry_wait=180):
super(ComputeNodeSetupActor, self).__init__(
- 'arvnodeman.nodeup', cloud_client, arvados_client, timer_actor,
+ cloud_client, arvados_client, timer_actor,
retry_wait, max_retry_wait)
self.cloud_size = cloud_size
self.arvados_node = None
else:
self._later.prepare_arvados_node(arvados_node)
- @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+ @ComputeNodeStateChangeBase._finish_on_exception
+ @RetryMixin._retry(config.ARVADOS_ERRORS)
def create_arvados_node(self):
self.arvados_node = self._arvados.nodes().create(body={}).execute()
self._later.create_cloud_node()
- @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+ @ComputeNodeStateChangeBase._finish_on_exception
+ @RetryMixin._retry(config.ARVADOS_ERRORS)
def prepare_arvados_node(self, node):
self.arvados_node = self._clean_arvados_node(
node, "Prepared by Node Manager")
self._later.create_cloud_node()
- @ComputeNodeStateChangeBase._retry()
+ @ComputeNodeStateChangeBase._finish_on_exception
+ @RetryMixin._retry()
def create_cloud_node(self):
- self._logger.info("Creating cloud node with size %s.",
+ self._logger.info("Sending create_node request for node size %s.",
self.cloud_size.name)
self.cloud_node = self._cloud.create_node(self.cloud_size,
self.arvados_node)
self._logger.info("Cloud node %s created.", self.cloud_node.id)
self._later.update_arvados_node_properties()
- @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+ @ComputeNodeStateChangeBase._finish_on_exception
+ @RetryMixin._retry(config.ARVADOS_ERRORS)
def update_arvados_node_properties(self):
"""Tell Arvados some details about the cloud node.
self._logger.info("%s updated properties.", self.arvados_node['uuid'])
self._later.post_create()
- @ComputeNodeStateChangeBase._retry()
+ @RetryMixin._retry()
def post_create(self):
self._cloud.post_create_node(self.cloud_node)
self._logger.info("%s post-create work done.", self.cloud_node.id)
# eligible. Normal shutdowns based on job demand should be
# cancellable; shutdowns based on node misbehavior should not.
super(ComputeNodeShutdownActor, self).__init__(
- 'arvnodeman.nodedown', cloud_client, arvados_client, timer_actor,
+ cloud_client, arvados_client, timer_actor,
retry_wait, max_retry_wait)
self._monitor = node_monitor.proxy()
self.cloud_node = self._monitor.cloud_node.get()
self.cancel_reason = None
self.success = None
+ def _set_logger(self):
+ self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
+
def on_start(self):
+ super(ComputeNodeShutdownActor, self).on_start()
self._later.shutdown_node()
def _arvados_node(self):
def cancel_shutdown(self, reason):
self.cancel_reason = reason
- self._logger.info("Cloud node %s shutdown cancelled: %s.",
- self.cloud_node.id, reason)
+ self._logger.info("Shutdown cancelled: %s.", reason)
self._finished(success_flag=False)
def _stop_if_window_closed(orig_func):
@functools.wraps(orig_func)
def stop_wrapper(self, *args, **kwargs):
if (self.cancellable and
- (not self._monitor.shutdown_eligible().get())):
+ (self._monitor.shutdown_eligible().get() is not True)):
self._later.cancel_shutdown(self.WINDOW_CLOSED)
return None
else:
return orig_func(self, *args, **kwargs)
return stop_wrapper
+ @ComputeNodeStateChangeBase._finish_on_exception
@_stop_if_window_closed
- @ComputeNodeStateChangeBase._retry()
+ @RetryMixin._retry()
def shutdown_node(self):
+ self._logger.info("Starting shutdown")
if not self._cloud.destroy_node(self.cloud_node):
if self._cloud.broken(self.cloud_node):
self._later.cancel_shutdown(self.NODE_BROKEN)
else:
# Force a retry.
raise cloud_types.LibcloudError("destroy_node failed")
- self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
+ self._logger.info("Shutdown success")
arv_node = self._arvados_node()
if arv_node is None:
self._finished(success_flag=True)
else:
self._later.clean_arvados_node(arv_node)
- @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+ @ComputeNodeStateChangeBase._finish_on_exception
+ @RetryMixin._retry(config.ARVADOS_ERRORS)
def clean_arvados_node(self, arvados_node):
self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
self._finished(success_flag=True)
):
super(ComputeNodeMonitorActor, self).__init__()
self._later = self.actor_ref.proxy()
- self._logger = logging.getLogger('arvnodeman.computenode')
self._last_log = None
self._shutdowns = shutdown_timer
self._cloud_node_fqdn = cloud_fqdn_func
self.last_shutdown_opening = None
self._later.consider_shutdown()
+ def _set_logger(self):
+ self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
+
+ def on_start(self):
+ self._set_logger()
+ self._timer.schedule(self.cloud_node_start_time + self.boot_fail_after, self._later.consider_shutdown)
+
def subscribe(self, subscriber):
self.subscribers.add(subscriber)
return result
def shutdown_eligible(self):
+ """Return True if eligible for shutdown, or a string explaining why the node
+ is not eligible for shutdown."""
+
if not self._shutdowns.window_open():
- return False
+ return "shutdown window is not open."
if self.arvados_node is None:
# Node is unpaired.
# If it hasn't pinged Arvados after boot_fail seconds, shut it down
- return not timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after)
+ if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
+ return "node is still booting, will be considered a failed boot at %s" % time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.cloud_node_start_time + self.boot_fail_after))
+ else:
+ return True
missing = arvados_node_missing(self.arvados_node, self.node_stale_after)
if missing and self._cloud.broken(self.cloud_node):
# Node is paired, but Arvados says it is missing and the cloud says the node
# is in an error state, so shut it down.
return True
if missing is None and self._cloud.broken(self.cloud_node):
- self._logger.warning(
- "cloud reports broken node, but paired node %s never pinged "
- "(bug?) -- skipped check for node_stale_after",
+ self._logger.info(
+ "Cloud node considered 'broken' but paired node %s last_ping_at is None, " +
+ "cannot check node_stale_after (node may be shut down and we just haven't gotten the message yet).",
self.arvados_node['uuid'])
- return self.in_state('idle')
+ if self.in_state('idle'):
+ return True
+ else:
+ return "node is not idle."
def consider_shutdown(self):
- next_opening = self._shutdowns.next_opening()
- if self.shutdown_eligible():
- self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
- _notify_subscribers(self._later, self.subscribers)
- elif self._shutdowns.window_open():
- self._debug("Node %s shutdown window open but node busy.",
- self.cloud_node.id)
- elif self.last_shutdown_opening != next_opening:
- self._debug("Node %s shutdown window closed. Next at %s.",
- self.cloud_node.id, time.ctime(next_opening))
- self._timer.schedule(next_opening, self._later.consider_shutdown)
- self.last_shutdown_opening = next_opening
+ try:
+ next_opening = self._shutdowns.next_opening()
+ eligible = self.shutdown_eligible()
+ if eligible is True:
+ self._debug("Suggesting shutdown.")
+ _notify_subscribers(self._later, self.subscribers)
+ elif self._shutdowns.window_open():
+ self._debug("Cannot shut down because %s", eligible)
+ elif self.last_shutdown_opening != next_opening:
+ self._debug("Shutdown window closed. Next at %s.",
+ time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
+ self._timer.schedule(next_opening, self._later.consider_shutdown)
+ self.last_shutdown_opening = next_opening
+ except Exception:
+ self._logger.exception("Unexpected exception")
def offer_arvados_pair(self, arvados_node):
first_ping_s = arvados_node.get('first_ping_at')
from . import \
ComputeNodeSetupActor, ComputeNodeUpdateActor, ComputeNodeMonitorActor
from . import ComputeNodeShutdownActor as ShutdownActorBase
+from .. import RetryMixin
class ComputeNodeShutdownActor(ShutdownActorBase):
SLURM_END_STATES = frozenset(['down\n', 'down*\n',
self._nodename = None
return super(ComputeNodeShutdownActor, self).on_start()
else:
+ self._set_logger()
self._nodename = arv_node['hostname']
self._logger.info("Draining SLURM node %s", self._nodename)
self._later.issue_slurm_drain()
# of the excessive memory usage that result in the "Cannot allocate memory"
# error are still being investigated.
- @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError))
+ @RetryMixin._retry((subprocess.CalledProcessError, OSError))
def cancel_shutdown(self, reason):
if self._nodename:
if self._get_slurm_state() in self.SLURM_DRAIN_STATES:
pass
return super(ComputeNodeShutdownActor, self).cancel_shutdown(reason)
- @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError))
+ @RetryMixin._retry((subprocess.CalledProcessError, OSError))
@ShutdownActorBase._stop_if_window_closed
def issue_slurm_drain(self):
self._set_node_state('DRAIN', 'Reason=Node Manager shutdown')
self._logger.info("Waiting for SLURM node %s to drain", self._nodename)
self._later.await_slurm_drain()
- @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError))
+ @RetryMixin._retry((subprocess.CalledProcessError, OSError))
@ShutdownActorBase._stop_if_window_closed
def await_slurm_drain(self):
output = self._get_slurm_state()
from __future__ import absolute_import, print_function
+import logging
from operator import attrgetter
import libcloud.common.types as cloud_types
from libcloud.compute.base import NodeDriver, NodeAuthSSHKey
from ...config import NETWORK_ERRORS
+from .. import RetryMixin
-class BaseComputeNodeDriver(object):
+class BaseComputeNodeDriver(RetryMixin):
"""Abstract base class for compute node drivers.
libcloud drivers abstract away many of the differences between
"""
CLOUD_ERRORS = NETWORK_ERRORS + (cloud_types.LibcloudError,)
- def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class):
+ @RetryMixin._retry()
+ def _create_driver(self, driver_class, **auth_kwargs):
+ return driver_class(**auth_kwargs)
+
+ @RetryMixin._retry()
+ def _set_sizes(self):
+ self.sizes = {sz.id: sz for sz in self.real.list_sizes()}
+
+ def __init__(self, auth_kwargs, list_kwargs, create_kwargs,
+ driver_class, retry_wait=1, max_retry_wait=180):
"""Base initializer for compute node drivers.
Arguments:
libcloud driver's create_node method to create a new compute node.
* driver_class: The class of a libcloud driver to use.
"""
- self.real = driver_class(**auth_kwargs)
+
+ super(BaseComputeNodeDriver, self).__init__(retry_wait, max_retry_wait,
+ logging.getLogger(self.__class__.__name__),
+ type(self),
+ None)
+ self.real = self._create_driver(driver_class, **auth_kwargs)
self.list_kwargs = list_kwargs
self.create_kwargs = create_kwargs
# Transform entries in create_kwargs. For each key K, if this class
if new_pair is not None:
self.create_kwargs[new_pair[0]] = new_pair[1]
- self.sizes = {sz.id: sz for sz in self.real.list_sizes()}
+ self._set_sizes()
def _init_ping_host(self, ping_host):
self.ping_host = ping_host
self.ping_host, arvados_node['uuid'],
arvados_node['info']['ping_secret'])
+ def find_node(self, name):
+ node = [n for n in self.list_nodes() if n.name == name]
+ if node:
+ return node[0]
+ else:
+ return None
+
def create_node(self, size, arvados_node):
- kwargs = self.create_kwargs.copy()
- kwargs.update(self.arvados_create_kwargs(size, arvados_node))
- kwargs['size'] = size
- return self.real.create_node(**kwargs)
+ try:
+ kwargs = self.create_kwargs.copy()
+ kwargs.update(self.arvados_create_kwargs(size, arvados_node))
+ kwargs['size'] = size
+ return self.real.create_node(**kwargs)
+ except self.CLOUD_ERRORS:
+ # Workaround for bug #6702: sometimes the create node request
+ # succeeds but times out and raises an exception instead of
+ # returning a result. If this happens, we get stuck in a retry
+ # loop forever because subsequent create_node attempts will fail
+ # due to node name collision. So check if the node we intended to
+ # create shows up in the cloud node list and return it if found.
+ try:
+ node = self.find_node(kwargs['name'])
+ if node:
+ return node
+ except:
+ # Ignore possible exception from find_node in favor of
+ # re-raising the original create_node exception.
+ pass
+ raise
def post_create_node(self, cloud_node):
# ComputeNodeSetupActor calls this method after the cloud node is
self.real.ex_create_tags(cloud_node,
{'Name': arvados_node_fqdn(arvados_node)})
+ def find_node(self, name):
+ raise NotImplementedError("ec2.ComputeNodeDriver.find_node")
+
def list_nodes(self):
# Need to populate Node.size
nodes = super(ComputeNodeDriver, self).list_nodes()
})
return result
+
def list_nodes(self):
# The GCE libcloud driver only supports filtering node lists by zone.
# Do our own filtering based on tag list.
self._new_arvados = arvados_factory
self._new_cloud = cloud_factory
self._cloud_driver = self._new_cloud()
- self._logger = logging.getLogger('arvnodeman.daemon')
self._later = self.actor_ref.proxy()
self.shutdown_windows = shutdown_windows
self.server_calculator = server_calculator
self.booting = {} # Actor IDs to ComputeNodeSetupActors
self.booted = {} # Cloud node IDs to _ComputeNodeRecords
self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
- self._logger.debug("Daemon initialized")
+ self.sizes_booting_shutdown = {} # Actor IDs or Cloud node IDs to node size
+
+ def on_start(self):
+ self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
+ self._logger.debug("Daemon started")
def _update_poll_time(self, poll_key):
self.last_polls[poll_key] = time.time()
def _pair_nodes(self, node_record, arvados_node):
- self._logger.info("Cloud node %s has associated with Arvados node %s",
- node_record.cloud_node.id, arvados_node['uuid'])
+ self._logger.info("Cloud node %s is now paired with Arvados node %s",
+ node_record.cloud_node.name, arvados_node['uuid'])
self._arvados_nodes_actor.subscribe_to(
arvados_node['uuid'], node_record.actor.update_arvados_node)
node_record.arvados_node = arvados_node
except pykka.ActorDeadError:
pass
del self.shutdowns[key]
+ del self.sizes_booting_shutdown[key]
record.actor.stop()
record.cloud_node = None
self._pair_nodes(cloud_rec, arv_node)
break
- def _nodes_up(self, size):
- up = 0
- up += sum(1
- for c in self.booting.itervalues()
- if size is None or c.cloud_size.get().id == size.id)
- up += sum(1
- for i in (self.booted, self.cloud_nodes.nodes)
- for c in i.itervalues()
+ def _nodes_booting(self, size):
+ s = sum(1
+ for c in self.booting.iterkeys()
+ if size is None or self.sizes_booting_shutdown[c].id == size.id)
+ s += sum(1
+ for c in self.booted.itervalues()
+ if size is None or c.cloud_node.size.id == size.id)
+ return s
+
+ def _nodes_unpaired(self, size):
+ return sum(1
+ for c in self.cloud_nodes.unpaired()
+ if size is None or c.cloud_node.size.id == size.id)
+
+ def _nodes_booted(self, size):
+ return sum(1
+ for c in self.cloud_nodes.nodes.itervalues()
if size is None or c.cloud_node.size.id == size.id)
+
+ def _nodes_up(self, size):
+ up = self._nodes_booting(size) + self._nodes_booted(size)
return up
def _total_price(self):
cost = 0
- cost += sum(self.server_calculator.find_size(c.cloud_size.get().id).price
- for c in self.booting.itervalues())
+ cost += sum(self.server_calculator.find_size(self.sizes_booting_shutdown[c].id).price
+ for c in self.booting.iterkeys())
cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price
for i in (self.booted, self.cloud_nodes.nodes)
for c in i.itervalues())
def _size_shutdowns(self, size):
sh = 0
- for c in self.shutdowns.itervalues():
+ for c in self.shutdowns.iterkeys():
try:
- if c.cloud_node.get().size.id == size.id:
+ if self.sizes_booting_shutdown[c].id == size.id:
sh += 1
except pykka.ActorDeadError:
pass
elif under_min > 0 and size.id == self.min_cloud_size.id:
return under_min
- up_count = self._nodes_up(size) - (self._size_shutdowns(size) +
- self._nodes_busy(size) +
- self._nodes_missing(size))
+ booting_count = self._nodes_booting(size) + self._nodes_unpaired(size)
+ shutdown_count = self._size_shutdowns(size)
+ busy_count = self._nodes_busy(size)
+ up_count = self._nodes_up(size) - (shutdown_count + busy_count + self._nodes_missing(size))
- self._logger.debug("%s: idle nodes %i, wishlist size %i", size.name, up_count, self._size_wishlist(size))
+ self._logger.info("%s: wishlist %i, up %i (booting %i, idle %i, busy %i), shutting down %i", size.name,
+ self._size_wishlist(size),
+ up_count + busy_count,
+ booting_count,
+ up_count - booting_count,
+ busy_count,
+ shutdown_count)
wanted = self._size_wishlist(size) - up_count
if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
self._update_poll_time('server_wishlist')
self.last_wishlist = wishlist
for size in reversed(self.server_calculator.cloud_sizes):
- nodes_wanted = self._nodes_wanted(size)
- if nodes_wanted > 0:
- self._later.start_node(size)
- elif (nodes_wanted < 0) and self.booting:
- self._later.stop_booting_node(size)
+ try:
+ nodes_wanted = self._nodes_wanted(size)
+ if nodes_wanted > 0:
+ self._later.start_node(size)
+ elif (nodes_wanted < 0) and self.booting:
+ self._later.stop_booting_node(size)
+ except Exception as e:
+ self._logger.exception("while calculating nodes wanted for size %s", size)
def _check_poll_freshness(orig_func):
"""Decorator to inhibit a method when poll information is stale.
if nodes_wanted < 1:
return None
arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
- self._logger.info("Want %s more nodes. Booting a %s node.",
+ self._logger.info("Want %i more %s nodes. Booting a node.",
nodes_wanted, cloud_size.name)
new_setup = self._node_setup.start(
timer_actor=self._timer,
cloud_client=self._new_cloud(),
cloud_size=cloud_size).proxy()
self.booting[new_setup.actor_ref.actor_urn] = new_setup
+ self.sizes_booting_shutdown[new_setup.actor_ref.actor_urn] = cloud_size
+
if arvados_node is not None:
self.arvados_nodes[arvados_node['uuid']].assignment_time = (
time.time())
def node_up(self, setup_proxy):
cloud_node = setup_proxy.cloud_node.get()
del self.booting[setup_proxy.actor_ref.actor_urn]
+ del self.sizes_booting_shutdown[setup_proxy.actor_ref.actor_urn]
+
setup_proxy.stop()
- record = self.cloud_nodes.get(cloud_node.id)
- if record is None:
- record = self._new_node(cloud_node)
- self.booted[cloud_node.id] = record
- self._timer.schedule(time.time() + self.boot_fail_after,
- self._later.shutdown_unpaired_node, cloud_node.id)
+ if cloud_node is not None:
+ record = self.cloud_nodes.get(cloud_node.id)
+ if record is None:
+ record = self._new_node(cloud_node)
+ self.booted[cloud_node.id] = record
+ self._timer.schedule(time.time() + self.boot_fail_after,
+ self._later.shutdown_unpaired_node, cloud_node.id)
@_check_poll_freshness
def stop_booting_node(self, size):
for key, node in self.booting.iteritems():
if node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
del self.booting[key]
+ del self.sizes_booting_shutdown[key]
+
if nodes_excess > 1:
self._later.stop_booting_node(size)
break
def _begin_node_shutdown(self, node_actor, cancellable):
- cloud_node_id = node_actor.cloud_node.get().id
+ cloud_node_obj = node_actor.cloud_node.get()
+ cloud_node_id = cloud_node_obj.id
if cloud_node_id in self.shutdowns:
return None
shutdown = self._node_shutdown.start(
arvados_client=self._new_arvados(),
node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
self.shutdowns[cloud_node_id] = shutdown
+ self.sizes_booting_shutdown[cloud_node_id] = cloud_node_obj.size
shutdown.subscribe(self._later.node_finished_shutdown)
@_check_poll_freshness
if cancel_reason == self._node_shutdown.NODE_BROKEN:
self.cloud_nodes.blacklist(cloud_node_id)
del self.shutdowns[cloud_node_id]
+ del self.sizes_booting_shutdown[cloud_node_id]
elif cloud_node_id in self.booted:
self.booted.pop(cloud_node_id).actor.stop()
del self.shutdowns[cloud_node_id]
+ del self.sizes_booting_shutdown[cloud_node_id]
def shutdown(self):
self._logger.info("Shutting down after signal.")
"""
CLIENT_ERRORS = ARVADOS_ERRORS
- LOGGER_NAME = 'arvnodeman.jobqueue'
def __init__(self, client, timer_actor, server_calc, *args, **kwargs):
super(JobQueueMonitorActor, self).__init__(
def _got_response(self, queue):
server_list = self._calculator.servers_for_queue(queue)
- self._logger.debug("Sending server wishlist: %s",
+ self._logger.debug("Calculated wishlist: %s",
', '.join(s.name for s in server_list) or "(empty)")
return super(JobQueueMonitorActor, self)._got_response(server_list)
for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
signal.signal(sigcode, shutdown_signal)
- setup_logging(config.get('Logging', 'file'), **config.log_levels())
- node_setup, node_shutdown, node_update, node_monitor = \
- config.dispatch_classes()
- server_calculator = build_server_calculator(config)
- timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
- launch_pollers(config, server_calculator)
- cloud_node_updater = node_update.start(config.new_cloud_client).proxy()
- node_daemon = NodeManagerDaemonActor.start(
- job_queue_poller, arvados_node_poller, cloud_node_poller,
- cloud_node_updater, timer,
- config.new_arvados_client, config.new_cloud_client,
- config.shutdown_windows(),
- server_calculator,
- config.getint('Daemon', 'min_nodes'),
- config.getint('Daemon', 'max_nodes'),
- config.getint('Daemon', 'poll_stale_after'),
- config.getint('Daemon', 'boot_fail_after'),
- config.getint('Daemon', 'node_stale_after'),
- node_setup, node_shutdown, node_monitor,
- max_total_price=config.getfloat('Daemon', 'max_total_price')).proxy()
-
- signal.pause()
- daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
- while not daemon_stopped():
- time.sleep(1)
- pykka.ActorRegistry.stop_all()
+ try:
+ setup_logging(config.get('Logging', 'file'), **config.log_levels())
+ node_setup, node_shutdown, node_update, node_monitor = \
+ config.dispatch_classes()
+ server_calculator = build_server_calculator(config)
+ timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
+ launch_pollers(config, server_calculator)
+ cloud_node_updater = node_update.start(config.new_cloud_client).proxy()
+ node_daemon = NodeManagerDaemonActor.start(
+ job_queue_poller, arvados_node_poller, cloud_node_poller,
+ cloud_node_updater, timer,
+ config.new_arvados_client, config.new_cloud_client,
+ config.shutdown_windows(),
+ server_calculator,
+ config.getint('Daemon', 'min_nodes'),
+ config.getint('Daemon', 'max_nodes'),
+ config.getint('Daemon', 'poll_stale_after'),
+ config.getint('Daemon', 'boot_fail_after'),
+ config.getint('Daemon', 'node_stale_after'),
+ node_setup, node_shutdown, node_monitor,
+ max_total_price=config.getfloat('Daemon', 'max_total_price')).proxy()
+
+ signal.pause()
+ daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
+ while not daemon_stopped():
+ time.sleep(1)
+ except Exception:
+ logging.exception("Uncaught exception during setup")
+ finally:
+ pykka.ActorRegistry.stop_all()
if __name__ == '__main__':
This actor regularly polls the list of Arvados node records, and
sends it to subscribers.
"""
- LOGGER_NAME = 'arvnodeman.arvados_nodes'
def is_common_error(self, exception):
return isinstance(exception, config.ARVADOS_ERRORS)
This actor regularly polls the cloud to get a list of running compute
nodes, and sends it to subscribers.
"""
- LOGGER_NAME = 'arvnodeman.cloud_nodes'
def is_common_error(self, exception):
return self._client.is_cloud_exception(exception)
return node.id
def _send_request(self):
- return self._client.list_nodes()
+ n = self._client.list_nodes()
+ return n
def test_late_subscribers_get_responses(self):
self.build_monitor(['pre_late_test', 'late_test'])
- self.monitor.subscribe(lambda response: None).get(self.TIMEOUT)
+ mock_subscriber = mock.Mock(name='mock_subscriber')
+ self.monitor.subscribe(mock_subscriber).get(self.TIMEOUT)
self.monitor.subscribe(self.subscriber)
self.monitor.poll().get(self.TIMEOUT)
self.stop_proxy(self.monitor)
if __name__ == '__main__':
unittest.main()
-
import httplib2
import mock
import pykka
+import threading
import arvnodeman.computenode.dispatch as dispatch
from . import testutil
def test_creation_without_arvados_node(self):
self.make_actor()
+ finished = threading.Event()
+ self.setup_actor.subscribe(lambda _: finished.set())
self.assertEqual(self.arvados_effect[-1],
self.setup_actor.arvados_node.get(self.TIMEOUT))
+ assert(finished.wait(self.TIMEOUT))
self.assertEqual(1, self.api_client.nodes().create().execute.call_count)
self.assertEqual(1, self.api_client.nodes().update().execute.call_count)
self.assert_node_properties_updated()
def test_creation_with_arvados_node(self):
self.make_mocks(arvados_effect=[testutil.arvados_node_mock()]*2)
self.make_actor(testutil.arvados_node_mock())
+ finished = threading.Event()
+ self.setup_actor.subscribe(lambda _: finished.set())
self.assertEqual(self.arvados_effect[-1],
self.setup_actor.arvados_node.get(self.TIMEOUT))
+ assert(finished.wait(self.TIMEOUT))
self.assert_node_properties_updated()
self.assertEqual(2, self.api_client.nodes().update().execute.call_count)
self.assertEqual(self.cloud_client.create_node(),
def test_no_shutdown_booting(self):
self.make_actor()
self.shutdowns._set_state(True, 600)
- self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+ self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is still booting"))
def test_shutdown_without_arvados_node(self):
self.make_actor(start_time=0)
last_ping_at='1970-01-01T01:02:03.04050607Z')
self.make_actor(10, arv_node)
self.shutdowns._set_state(True, 600)
- self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+ self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is not idle."))
def test_no_shutdown_running_broken(self):
arv_node = testutil.arvados_node_mock(12, job_uuid=None,
self.make_actor(12, arv_node)
self.shutdowns._set_state(True, 600)
self.cloud_client.broken.return_value = True
- self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+ self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is not idle."))
def test_shutdown_missing_broken(self):
arv_node = testutil.arvados_node_mock(11, job_uuid=None,
def test_no_shutdown_when_window_closed(self):
self.make_actor(3, testutil.arvados_node_mock(3, job_uuid=None))
- self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+ self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("shutdown window is not open."))
def test_no_shutdown_when_node_running_job(self):
self.make_actor(4, testutil.arvados_node_mock(4, job_uuid=True))
self.shutdowns._set_state(True, 600)
- self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+ self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is not idle."))
def test_no_shutdown_when_node_state_unknown(self):
self.make_actor(5, testutil.arvados_node_mock(
5, crunch_worker_state=None))
self.shutdowns._set_state(True, 600)
- self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+ self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is not idle."))
def test_no_shutdown_when_node_state_stale(self):
self.make_actor(6, testutil.arvados_node_mock(6, age=90000))
self.shutdowns._set_state(True, 600)
- self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+ self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is not idle."))
def test_arvados_node_match(self):
self.make_actor(2)
echo z1.test > /var/tmp/arv-node-data/meta-data/instance-type
""",
driver.arvados_create_kwargs(testutil.MockSize(1), arv_node)['ex_customdata'])
+
+ def test_create_raises_but_actually_succeeded(self):
+ arv_node = testutil.arvados_node_mock(1, hostname=None)
+ driver = self.new_driver(create_kwargs={"tag_arvados-class": "dynamic-compute"})
+ nodelist = [testutil.cloud_node_mock(1, tags={"arvados-class": "dynamic-compute"})]
+ nodelist[0].name = 'compute-000000000000001-zzzzz'
+ self.driver_mock().list_nodes.return_value = nodelist
+ self.driver_mock().create_node.side_effect = IOError
+ n = driver.create_node(testutil.MockSize(1), arv_node)
+ self.assertEqual('compute-000000000000001-zzzzz', n.name)
metadata = self.driver_mock().create_node.call_args[1]['ex_metadata']
self.assertIn('ping_secret=ssshh', metadata.get('arv-ping-url'))
+ def test_create_raises_but_actually_succeeded(self):
+ arv_node = testutil.arvados_node_mock(1, hostname=None)
+ driver = self.new_driver()
+ nodelist = [testutil.cloud_node_mock(1)]
+ nodelist[0].name = 'compute-000000000000001-zzzzz'
+ self.driver_mock().list_nodes.return_value = nodelist
+ self.driver_mock().create_node.side_effect = IOError
+ n = driver.create_node(testutil.MockSize(1), arv_node)
+ self.assertEqual('compute-000000000000001-zzzzz', n.name)
+
def test_create_sets_default_hostname(self):
driver = self.new_driver()
driver.create_node(testutil.MockSize(1),
mock_shutdown = self.node_shutdown.start(node_monitor=mock_node_monitor)
self.daemon.shutdowns.get()[cloud_nodes[1].id] = mock_shutdown.proxy()
+ self.daemon.sizes_booting_shutdown.get()[cloud_nodes[1].id] = size
self.assertEqual(2, self.alive_monitor_count())
for mon_ref in self.monitor_list():
def driver_method_args(self, method_name):
return getattr(self.driver_mock(), method_name).call_args
+ def test_driver_create_retry(self):
+ with mock.patch('time.sleep'):
+ driver_mock2 = mock.MagicMock(name='driver_mock2')
+ self.driver_mock.side_effect = (Exception("oops"), driver_mock2)
+ kwargs = {'user_id': 'foo'}
+ driver = self.new_driver(auth_kwargs=kwargs)
+ self.assertTrue(self.driver_mock.called)
+ self.assertIs(driver.real, driver_mock2)
class RemotePollLoopActorTestMixin(ActorTestMixin):
def build_monitor(self, *args, **kwargs):
include agpl-3.0.txt
+include crunchstat_summary/chartjs.js
import crunchstat_summary.command
import crunchstat_summary.summarizer
+import logging
import sys
+logging.getLogger().addHandler(logging.StreamHandler())
+
args = crunchstat_summary.command.ArgumentParser().parse_args(sys.argv[1:])
-s = crunchstat_summary.summarizer.Summarizer(args)
-s.run()
-print(s.report(), end='')
+cmd = crunchstat_summary.command.Command(args)
+cmd.run()
+print(cmd.report(), end='')
+import logging
+
+logger = logging.getLogger(__name__)
+logger.addHandler(logging.NullHandler())
--- /dev/null
+window.onload = function() {
+ var charts = {};
+ sections.forEach(function(section, section_idx) {
+ var h1 = document.createElement('h1');
+ h1.appendChild(document.createTextNode(section.label));
+ document.body.appendChild(h1);
+ section.charts.forEach(function(data, chart_idx) {
+ // Skip chart if every series has zero data points
+ if (0 == data.data.reduce(function(len, series) {
+ return len + series.dataPoints.length;
+ }, 0)) {
+ return;
+ }
+ var id = 'chart-'+section_idx+'-'+chart_idx;
+ var div = document.createElement('div');
+ div.setAttribute('id', id);
+ div.setAttribute('style', 'width: 100%; height: 150px');
+ document.body.appendChild(div);
+ charts[id] = new CanvasJS.Chart(id, data);
+ charts[id].render();
+ });
+ });
+
+ if (typeof window.debug === 'undefined')
+ window.debug = {};
+ window.debug.charts = charts;
+};
--- /dev/null
+from __future__ import print_function
+
+import cgi
+import json
+import pkg_resources
+
+from crunchstat_summary import logger
+
+
+class ChartJS(object):
+ JSLIB = 'https://cdnjs.cloudflare.com/ajax/libs/canvasjs/1.7.0/canvasjs.min.js'
+
+ def __init__(self, label, summarizers):
+ self.label = label
+ self.summarizers = summarizers
+
+ def html(self):
+ return '''<!doctype html><html><head>
+ <title>{} stats</title>
+ <script type="text/javascript" src="{}"></script>
+ <script type="text/javascript">{}</script>
+ </head><body></body></html>
+ '''.format(cgi.escape(self.label), self.JSLIB, self.js())
+
+ def js(self):
+ return 'var sections = {};\n{}'.format(
+ json.dumps(self.sections()),
+ pkg_resources.resource_string('crunchstat_summary', 'chartjs.js'))
+
+ def sections(self):
+ return [
+ {
+ 'label': s.long_label(),
+ 'charts': self.charts(s.label, s.tasks),
+ }
+ for s in self.summarizers]
+
+ def charts(self, label, tasks):
+ return [
+ {
+ 'axisY': {
+ 'minimum': 0,
+ },
+ 'data': [
+ {
+ 'type': 'line',
+ 'markerType': 'none',
+ 'dataPoints': self._datapoints(
+ label=uuid, task=task, series=task.series[stat]),
+ }
+ for uuid, task in tasks.iteritems()
+ ],
+ 'title': {
+ 'text': '{}: {} {}'.format(label, stat[0], stat[1]),
+ },
+ 'zoomEnabled': True,
+ }
+ for stat in (('cpu', 'user+sys__rate'),
+ ('mem', 'rss'),
+ ('net:eth0', 'tx+rx__rate'),
+ ('net:keep0', 'tx+rx__rate'))]
+
+ def _datapoints(self, label, task, series):
+ points = [
+ {'x': pt[0].total_seconds(), 'y': pt[1]}
+ for pt in series]
+ if len(points) > 0:
+ points[-1]['markerType'] = 'cross'
+ points[-1]['markerSize'] = 12
+ return points
import argparse
+import gzip
+import logging
+import sys
+
+from crunchstat_summary import logger, summarizer
class ArgumentParser(argparse.ArgumentParser):
src = self.add_mutually_exclusive_group()
src.add_argument(
'--job', type=str, metavar='UUID',
- help='Look up the specified job and read its log data from Keep')
+ help='Look up the specified job and read its log data from Keep'
+ ' (or from the Arvados event log, if the job is still running)')
+ src.add_argument(
+ '--pipeline-instance', type=str, metavar='UUID',
+ help='Summarize each component of the given pipeline instance')
src.add_argument(
'--log-file', type=str,
help='Read log data from a regular file')
+ self.add_argument(
+ '--skip-child-jobs', action='store_true',
+ help='Do not include stats from child jobs')
+ self.add_argument(
+ '--format', type=str, choices=('html', 'text'), default='text',
+ help='Report format')
+ self.add_argument(
+ '--verbose', '-v', action='count', default=0,
+ help='Log more information (once for progress, twice for debug)')
+
+
+class Command(object):
+ def __init__(self, args):
+ self.args = args
+ logger.setLevel(logging.WARNING - 10 * args.verbose)
+
+ def run(self):
+ kwargs = {
+ 'skip_child_jobs': self.args.skip_child_jobs,
+ }
+ if self.args.pipeline_instance:
+ self.summer = summarizer.PipelineSummarizer(self.args.pipeline_instance, **kwargs)
+ elif self.args.job:
+ self.summer = summarizer.JobSummarizer(self.args.job, **kwargs)
+ elif self.args.log_file:
+ if self.args.log_file.endswith('.gz'):
+ fh = gzip.open(self.args.log_file)
+ else:
+ fh = open(self.args.log_file)
+ self.summer = summarizer.Summarizer(fh, **kwargs)
+ else:
+ self.summer = summarizer.Summarizer(sys.stdin, **kwargs)
+ return self.summer.run()
+
+ def report(self):
+ if self.args.format == 'html':
+ return self.summer.html_report()
+ elif self.args.format == 'text':
+ return self.summer.text_report()
--- /dev/null
+from __future__ import print_function
+
+import arvados
+import Queue
+import threading
+
+from crunchstat_summary import logger
+
+
+class CollectionReader(object):
+ def __init__(self, collection_id):
+ logger.debug('load collection %s', collection_id)
+ collection = arvados.collection.CollectionReader(collection_id)
+ filenames = [filename for filename in collection]
+ if len(filenames) != 1:
+ raise ValueError(
+ "collection {} has {} files; need exactly one".format(
+ collection_id, len(filenames)))
+ self._reader = collection.open(filenames[0])
+
+ def __iter__(self):
+ return iter(self._reader)
+
+
+class LiveLogReader(object):
+ EOF = None
+
+ def __init__(self, job_uuid):
+ logger.debug('load stderr events for job %s', job_uuid)
+ self._filters = [
+ ['object_uuid', '=', job_uuid],
+ ['event_type', '=', 'stderr']]
+ self._label = job_uuid
+
+ def _get_all_pages(self):
+ got = 0
+ last_id = 0
+ while True:
+ page = arvados.api().logs().index(
+ limit=1000,
+ order=['id asc'],
+ filters=self._filters + [['id','>',str(last_id)]],
+ ).execute(num_retries=2)
+ got += len(page['items'])
+ logger.debug(
+ '%s: received %d of %d log events',
+ self._label, got,
+ got + page['items_available'] - len(page['items']))
+ for i in page['items']:
+ for line in i['properties']['text'].split('\n'):
+ self._queue.put(line+'\n')
+ last_id = i['id']
+ if (len(page['items']) == 0 or
+ len(page['items']) >= page['items_available']):
+ break
+ self._queue.put(self.EOF)
+
+ def __iter__(self):
+ self._queue = Queue.Queue()
+ self._thread = threading.Thread(target=self._get_all_pages)
+ self._thread.daemon = True
+ self._thread.start()
+ return self
+
+ def next(self):
+ line = self._queue.get()
+ if line is self.EOF:
+ raise StopIteration
+ return line
import arvados
import collections
+import crunchstat_summary.chartjs
+import crunchstat_summary.reader
+import datetime
import functools
-import gzip
+import itertools
+import math
import re
import sys
+from arvados.api import OrderedJsonModel
+from crunchstat_summary import logger
+
+# Recommend memory constraints that are this multiple of an integral
+# number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
+# that have amounts like 7.5 GiB according to the kernel.)
+AVAILABLE_RAM_RATIO = 0.95
+
+
+class Task(object):
+ def __init__(self):
+ self.starttime = None
+ self.series = collections.defaultdict(list)
+
class Summarizer(object):
- def __init__(self, args):
- self.args = args
+ def __init__(self, logdata, label=None, skip_child_jobs=False):
+ self._logdata = logdata
+
+ self.label = label
+ self.starttime = None
+ self.finishtime = None
+ self._skip_child_jobs = skip_child_jobs
- def run(self):
# stats_max: {category: {stat: val}}
self.stats_max = collections.defaultdict(
functools.partial(collections.defaultdict,
# task_stats: {task_id: {category: {stat: val}}}
self.task_stats = collections.defaultdict(
functools.partial(collections.defaultdict, dict))
- for line in self._logdata():
+
+ self.seq_to_uuid = {}
+ self.tasks = collections.defaultdict(Task)
+
+ # We won't bother recommending new runtime constraints if the
+ # constraints given when running the job are known to us and
+ # are already suitable. If applicable, the subclass
+ # constructor will overwrite this with something useful.
+ self.existing_constraints = {}
+
+ logger.debug("%s: logdata %s", self.label, repr(logdata))
+
+ def run(self):
+ logger.debug("%s: parsing log data", self.label)
+ for line in self._logdata:
+ m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
+ if m:
+ seq = int(m.group('seq'))
+ uuid = m.group('task_uuid')
+ self.seq_to_uuid[seq] = uuid
+ logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
+ continue
+
m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) success in (?P<elapsed>\d+) seconds', line)
if m:
- task_id = m.group('seq')
+ task_id = self.seq_to_uuid[int(m.group('seq'))]
elapsed = int(m.group('elapsed'))
self.task_stats[task_id]['time'] = {'elapsed': elapsed}
if elapsed > self.stats_max['time']['elapsed']:
self.stats_max['time']['elapsed'] = elapsed
continue
- m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
+
+ m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
+ if m:
+ uuid = m.group('uuid')
+ if self._skip_child_jobs:
+ logger.warning('%s: omitting stats from child job %s'
+ ' because --skip-child-jobs flag is on',
+ self.label, uuid)
+ continue
+ logger.debug('%s: follow %s', self.label, uuid)
+ child_summarizer = JobSummarizer(uuid)
+ child_summarizer.stats_max = self.stats_max
+ child_summarizer.task_stats = self.task_stats
+ child_summarizer.tasks = self.tasks
+ child_summarizer.run()
+ logger.debug('%s: done %s', self.label, uuid)
+ continue
+
+ m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
if not m:
continue
+
+ if self.label is None:
+ self.label = m.group('job_uuid')
+ logger.debug('%s: using job uuid as label', self.label)
if m.group('category').endswith(':'):
- # "notice:" etc.
+ # "stderr crunchstat: notice: ..."
+ continue
+ elif m.group('category') in ('error', 'caught'):
continue
- task_id = m.group('seq')
+ elif m.group('category') == 'read':
+ # "stderr crunchstat: read /proc/1234/net/dev: ..."
+ # (crunchstat formatting fixed, but old logs still say this)
+ continue
+ task_id = self.seq_to_uuid[int(m.group('seq'))]
+ task = self.tasks[task_id]
+
+ # Use the first and last crunchstat timestamps as
+ # approximations of starttime and finishtime.
+ timestamp = datetime.datetime.strptime(
+ m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
+ if not task.starttime:
+ task.starttime = timestamp
+ logger.debug('%s: task %s starttime %s',
+ self.label, task_id, timestamp)
+ task.finishtime = timestamp
+
+ if not self.starttime:
+ self.starttime = timestamp
+ self.finishtime = timestamp
+
this_interval_s = None
for group in ['current', 'interval']:
if not m.group(group):
words = m.group(group).split(' ')
stats = {}
for val, stat in zip(words[::2], words[1::2]):
- if '.' in val:
- stats[stat] = float(val)
- else:
- stats[stat] = int(val)
+ try:
+ if '.' in val:
+ stats[stat] = float(val)
+ else:
+ stats[stat] = int(val)
+ except ValueError as e:
+ raise ValueError(
+ 'Error parsing {} stat in "{}": {!r}'.format(
+ stat, line, e))
if 'user' in stats or 'sys' in stats:
stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
if 'tx' in stats or 'rx' in stats:
this_interval_s = val
continue
elif not (this_interval_s > 0):
- print("BUG? interval stat given with duration {!r}".
- format(this_interval_s),
- file=sys.stderr)
+ logger.error(
+ "BUG? interval stat given with duration {!r}".
+ format(this_interval_s))
continue
else:
stat = stat + '__rate'
val = val / this_interval_s
+ if stat in ['user+sys__rate', 'tx+rx__rate']:
+ task.series[category, stat].append(
+ (timestamp - task.starttime, val))
else:
+ if stat in ['rss']:
+ task.series[category, stat].append(
+ (timestamp - task.starttime, val))
self.task_stats[task_id][category][stat] = val
if val > self.stats_max[category][stat]:
self.stats_max[category][stat] = val
+ logger.debug('%s: done parsing', self.label)
- def report(self):
- return "\n".join(self._report_gen()) + "\n"
-
- def _report_gen(self):
- job_tot = collections.defaultdict(
+ self.job_tot = collections.defaultdict(
functools.partial(collections.defaultdict, int))
for task_id, task_stat in self.task_stats.iteritems():
for category, stat_last in task_stat.iteritems():
if stat in ['cpus', 'cache', 'swap', 'rss']:
# meaningless stats like 16 cpu cores x 5 tasks = 80
continue
- job_tot[category][stat] += val
+ self.job_tot[category][stat] += val
+ logger.debug('%s: done totals', self.label)
+
+ def long_label(self):
+ label = self.label
+ if self.finishtime:
+ label += ' -- elapsed time '
+ s = (self.finishtime - self.starttime).total_seconds()
+ if s > 86400:
+ label += '{}d'.format(int(s/86400))
+ if s > 3600:
+ label += '{}h'.format(int(s/3600) % 24)
+ if s > 60:
+ label += '{}m'.format(int(s/60) % 60)
+ label += '{}s'.format(int(s) % 60)
+ return label
+
+ def text_report(self):
+ return "\n".join(itertools.chain(
+ self._text_report_gen(),
+ self._recommend_gen())) + "\n"
+
+ def html_report(self):
+ return crunchstat_summary.chartjs.ChartJS(self.label, [self]).html()
+
+ def _text_report_gen(self):
yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
for category, stat_max in sorted(self.stats_max.iteritems()):
for stat, val in sorted(stat_max.iteritems()):
continue
max_rate = self._format(stat_max.get(stat+'__rate', '-'))
val = self._format(val)
- tot = self._format(job_tot[category].get(stat, '-'))
+ tot = self._format(self.job_tot[category].get(stat, '-'))
yield "\t".join([category, stat, str(val), max_rate, tot])
for args in (
+ ('Number of tasks: {}',
+ len(self.tasks),
+ None),
('Max CPU time spent by a single task: {}s',
self.stats_max['cpu']['user+sys'],
None),
self.stats_max['cpu']['user+sys__rate'],
lambda x: x * 100),
('Overall CPU usage: {}%',
- job_tot['cpu']['user+sys'] / job_tot['time']['elapsed'],
+ self.job_tot['cpu']['user+sys'] /
+ self.job_tot['time']['elapsed'],
lambda x: x * 100),
('Max memory used by a single task: {}GB',
self.stats_max['mem']['rss'],
val = transform(val)
yield "# "+format_string.format(self._format(val))
+ def _recommend_gen(self):
+ return itertools.chain(
+ self._recommend_cpu(),
+ self._recommend_ram())
+
+ def _recommend_cpu(self):
+ """Recommend asking for 4 cores if max CPU usage was 333%"""
+
+ cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
+ if cpu_max_rate == float('-Inf'):
+ logger.warning('%s: no CPU usage data', self.label)
+ return
+ used_cores = int(math.ceil(cpu_max_rate))
+ asked_cores = self.existing_constraints.get('min_cores_per_node')
+ if asked_cores is None or used_cores < asked_cores:
+ yield (
+ '#!! {} max CPU usage was {}% -- '
+ 'try runtime_constraints "min_cores_per_node":{}'
+ ).format(
+ self.label,
+ int(math.ceil(cpu_max_rate*100)),
+ int(used_cores))
+
+ def _recommend_ram(self):
+ """Recommend an economical RAM constraint for this job.
+
+ Nodes that are advertised as "8 gibibytes" actually have what
+ we might call "8 nearlygibs" of memory available for jobs.
+ Here, we calculate a whole number of nearlygibs that would
+ have sufficed to run the job, then recommend requesting a node
+ with that number of nearlygibs (expressed as mebibytes).
+
+ Requesting a node with "nearly 8 gibibytes" is our best hope
+ of getting a node that actually has nearly 8 gibibytes
+ available. If the node manager is smart enough to account for
+ the discrepancy itself when choosing/creating a node, we'll
+ get an 8 GiB node with nearly 8 GiB available. Otherwise, the
+ advertised size of the next-size-smaller node (say, 6 GiB)
+ will be too low to satisfy our request, so we will effectively
+ get rounded up to 8 GiB.
+
+ For example, if we need 7500 MiB, we can ask for 7500 MiB, and
+ we will generally get a node that is advertised as "8 GiB" and
+ has at least 7500 MiB available. However, asking for 8192 MiB
+ would either result in an unnecessarily expensive 12 GiB node
+ (if node manager knows about the discrepancy), or an 8 GiB
+ node which has less than 8192 MiB available and is therefore
+ considered by crunch-dispatch to be too small to meet our
+ constraint.
+
+ When node manager learns how to predict the available memory
+ for each node type such that crunch-dispatch always agrees
+ that a node is big enough to run the job it was brought up
+ for, all this will be unnecessary. We'll just ask for exactly
+ the memory we want -- even if that happens to be 8192 MiB.
+ """
+
+ used_bytes = self.stats_max['mem']['rss']
+ if used_bytes == float('-Inf'):
+ logger.warning('%s: no memory usage data', self.label)
+ return
+ used_mib = math.ceil(float(used_bytes) / 1048576)
+ asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
+
+ nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
+ if asked_mib is None or (
+ math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
+ yield (
+ '#!! {} max RSS was {} MiB -- '
+ 'try runtime_constraints "min_ram_mb_per_node":{}'
+ ).format(
+ self.label,
+ int(used_mib),
+ int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
+
def _format(self, val):
"""Return a string representation of a stat.
else:
return '{}'.format(val)
- def _logdata(self):
- if self.args.log_file:
- if self.args.log_file.endswith('.gz'):
- return gzip.open(self.args.log_file)
- else:
- return open(self.args.log_file)
- elif self.args.job:
- arv = arvados.api('v1')
- job = arv.jobs().get(uuid=self.args.job).execute()
- if not job['log']:
- raise ValueError(
- "job {} has no log; live summary not implemented".format(
- self.args.job))
- collection = arvados.collection.CollectionReader(job['log'])
- filenames = [filename for filename in collection]
- if len(filenames) != 1:
- raise ValueError(
- "collection {} has {} files; need exactly one".format(
- job.log, len(filenames)))
- return collection.open(filenames[0])
+
+class CollectionSummarizer(Summarizer):
+ def __init__(self, collection_id, **kwargs):
+ super(CollectionSummarizer, self).__init__(
+ crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
+ self.label = collection_id
+
+
+class JobSummarizer(Summarizer):
+ def __init__(self, job, **kwargs):
+ arv = arvados.api('v1')
+ if isinstance(job, basestring):
+ self.job = arv.jobs().get(uuid=job).execute()
+ else:
+ self.job = job
+ if self.job['log']:
+ rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
+ label = self.job['uuid']
else:
- return sys.stdin
+ rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
+ label = self.job['uuid'] + ' (partial)'
+ super(JobSummarizer, self).__init__(rdr, **kwargs)
+ self.label = label
+ self.existing_constraints = self.job.get('runtime_constraints', {})
+
+
+class PipelineSummarizer(object):
+ def __init__(self, pipeline_instance_uuid, **kwargs):
+ arv = arvados.api('v1', model=OrderedJsonModel())
+ instance = arv.pipeline_instances().get(
+ uuid=pipeline_instance_uuid).execute()
+ self.summarizers = collections.OrderedDict()
+ for cname, component in instance['components'].iteritems():
+ if 'job' not in component:
+ logger.warning(
+ "%s: skipping component with no job assigned", cname)
+ elif component['job'].get('log') is None:
+ logger.warning(
+ "%s: skipping job %s with no log available",
+ cname, component['job'].get('uuid'))
+ else:
+ logger.info(
+ "%s: logdata %s", cname, component['job']['log'])
+ summarizer = JobSummarizer(component['job'], **kwargs)
+ summarizer.label = cname
+ self.summarizers[cname] = summarizer
+ self.label = pipeline_instance_uuid
+
+ def run(self):
+ for summarizer in self.summarizers.itervalues():
+ summarizer.run()
+
+ def text_report(self):
+ txt = ''
+ for cname, summarizer in self.summarizers.iteritems():
+ txt += '### Summary for {} ({})\n'.format(
+ cname, summarizer.job['uuid'])
+ txt += summarizer.text_report()
+ txt += '\n'
+ return txt
+
+ def html_report(self):
+ return crunchstat_summary.chartjs.ChartJS(
+ self.label, self.summarizers.itervalues()).html()
download_url="https://github.com/curoverse/arvados.git",
license='GNU Affero General Public License, version 3.0',
packages=['crunchstat_summary'],
+ include_package_data=True,
scripts=[
'bin/crunchstat-summary'
],
'arvados-python-client',
],
test_suite='tests',
+ tests_require=['pbr<1.7.0', 'mock>=1.0'],
zip_safe=False,
cmdclass={'egg_info': tagger},
)
--- /dev/null
+2016-01-07_00:15:33 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr
+2016-01-07_00:15:33 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr old error message:
+2016-01-07_00:15:33 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr crunchstat: read /proc/3305/net/dev: open /proc/3305/net/dev: no such file or directory
+2016-01-07_00:15:34 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr
+2016-01-07_00:15:34 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr new error message:
+2016-01-07_00:15:34 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr crunchstat: error reading /proc/3305/net/dev: open /proc/3305/net/dev: no such file or directory
+2016-01-07_00:15:34 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr
+2016-01-07_00:15:34 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr cancelled job:
+2016-01-07_00:15:34 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr crunchstat: caught signal: interrupt
net:keep0 tx 0 0.00 0
net:keep0 tx+rx 0 0.00 0
time elapsed 80 - 80
+# Number of tasks: 1
# Max CPU time spent by a single task: 5.75s
# Max CPU usage in a single interval: 13.00%
# Overall CPU usage: 7.19%
# Max memory used by a single task: 0.35GB
# Max network traffic in a single task: 1.79GB
# Max network speed in a single interval: 42.58MB/s
+#!! 4xphq-8i9sb-jq0ekny1xou3zoh max CPU usage was 13% -- try runtime_constraints "min_cores_per_node":1
+#!! 4xphq-8i9sb-jq0ekny1xou3zoh max RSS was 334 MiB -- try runtime_constraints "min_ram_mb_per_node":972
net:eth0 tx 90 - 90
net:eth0 tx+rx 180 - 180
time elapsed 2 - 4
+# Number of tasks: 2
# Max CPU time spent by a single task: 0.00s
# Overall CPU usage: 0.00%
# Max memory used by a single task: 0.00GB
# Max network traffic in a single task: 0.00GB
+#!! 4xphq-8i9sb-zvb2ocfycpomrup max RSS was 1 MiB -- try runtime_constraints "min_ram_mb_per_node":972
net:eth0 tx 90 - 90
net:eth0 tx+rx 180 - 180
time elapsed 2 - 3
+# Number of tasks: 2
# Max CPU time spent by a single task: 0.00s
# Overall CPU usage: 0.00%
# Max memory used by a single task: 0.00GB
# Max network traffic in a single task: 0.00GB
+#!! 4xphq-8i9sb-v831jm2uq0g2g9x max RSS was 1 MiB -- try runtime_constraints "min_ram_mb_per_node":972
+import arvados
+import collections
import crunchstat_summary.command
-import crunchstat_summary.summarizer
import difflib
import glob
+import gzip
+import mock
import os
import unittest
+TESTS_DIR = os.path.dirname(os.path.abspath(__file__))
-class ExampleLogsTestCase(unittest.TestCase):
+
+class ReportDiff(unittest.TestCase):
+ def diff_known_report(self, logfile, cmd):
+ expectfile = logfile+'.report'
+ expect = open(expectfile).readlines()
+ self.diff_report(cmd, expect, expectfile=expectfile)
+
+ def diff_report(self, cmd, expect, expectfile=None):
+ got = [x+"\n" for x in cmd.report().strip("\n").split("\n")]
+ self.assertEqual(got, expect, "\n"+"".join(difflib.context_diff(
+ expect, got, fromfile=expectfile, tofile="(generated)")))
+
+
+class SummarizeFile(ReportDiff):
def test_example_files(self):
- dirname = os.path.dirname(os.path.abspath(__file__))
- for fnm in glob.glob(os.path.join(dirname, '*.txt.gz')):
- logfile = os.path.join(dirname, fnm)
+ for fnm in glob.glob(os.path.join(TESTS_DIR, '*.txt.gz')):
+ logfile = os.path.join(TESTS_DIR, fnm)
args = crunchstat_summary.command.ArgumentParser().parse_args(
['--log-file', logfile])
- summarizer = crunchstat_summary.summarizer.Summarizer(args)
- summarizer.run()
- got = [x+"\n" for x in summarizer.report().strip("\n").split("\n")]
- expectfile = logfile+'.report'
- expect = open(expectfile).readlines()
- self.assertEqual(got, expect, "\n"+"".join(difflib.context_diff(
- expect, got, fromfile=expectfile, tofile="(generated)")))
+ cmd = crunchstat_summary.command.Command(args)
+ cmd.run()
+ self.diff_known_report(logfile, cmd)
+
+
+class HTMLFromFile(ReportDiff):
+ def test_example_files(self):
+ # Note we don't test the output content at all yet; we're
+ # mainly just verifying the --format=html option isn't ignored
+ # and the HTML code path doesn't crash.
+ for fnm in glob.glob(os.path.join(TESTS_DIR, '*.txt.gz')):
+ logfile = os.path.join(TESTS_DIR, fnm)
+ args = crunchstat_summary.command.ArgumentParser().parse_args(
+ ['--format=html', '--log-file', logfile])
+ cmd = crunchstat_summary.command.Command(args)
+ cmd.run()
+ self.assertRegexpMatches(cmd.report(), r'(?is)<html>.*</html>\s*$')
+
+
+class SummarizeEdgeCases(unittest.TestCase):
+ def test_error_messages(self):
+ logfile = open(os.path.join(TESTS_DIR, 'crunchstat_error_messages.txt'))
+ s = crunchstat_summary.summarizer.Summarizer(logfile)
+ s.run()
+
+
+class SummarizeJob(ReportDiff):
+ fake_job_uuid = '4xphq-8i9sb-jq0ekny1xou3zoh'
+ fake_log_id = 'fake-log-collection-id'
+ fake_job = {
+ 'uuid': fake_job_uuid,
+ 'log': fake_log_id,
+ }
+ logfile = os.path.join(TESTS_DIR, 'logfile_20151204190335.txt.gz')
+
+ @mock.patch('arvados.collection.CollectionReader')
+ @mock.patch('arvados.api')
+ def test_job_report(self, mock_api, mock_cr):
+ mock_api().jobs().get().execute.return_value = self.fake_job
+ mock_cr().__iter__.return_value = ['fake-logfile.txt']
+ mock_cr().open.return_value = gzip.open(self.logfile)
+ args = crunchstat_summary.command.ArgumentParser().parse_args(
+ ['--job', self.fake_job_uuid])
+ cmd = crunchstat_summary.command.Command(args)
+ cmd.run()
+ self.diff_known_report(self.logfile, cmd)
+ mock_api().jobs().get.assert_called_with(uuid=self.fake_job_uuid)
+ mock_cr.assert_called_with(self.fake_log_id)
+ mock_cr().open.assert_called_with('fake-logfile.txt')
+
+
+class SummarizePipeline(ReportDiff):
+ fake_instance = {
+ 'uuid': 'zzzzz-d1hrv-i3e77t9z5y8j9cc',
+ 'owner_uuid': 'zzzzz-tpzed-xurymjxw79nv3jz',
+ 'components': collections.OrderedDict([
+ ['foo', {
+ 'job': {
+ 'uuid': 'zzzzz-8i9sb-000000000000000',
+ 'log': 'fake-log-pdh-0',
+ 'runtime_constraints': {
+ 'min_ram_mb_per_node': 900,
+ 'min_cores_per_node': 1,
+ },
+ },
+ }],
+ ['bar', {
+ 'job': {
+ 'uuid': 'zzzzz-8i9sb-000000000000001',
+ 'log': 'fake-log-pdh-1',
+ 'runtime_constraints': {
+ 'min_ram_mb_per_node': 900,
+ 'min_cores_per_node': 1,
+ },
+ },
+ }],
+ ['no-job-assigned', {}],
+ ['unfinished-job', {
+ 'job': {
+ 'uuid': 'zzzzz-8i9sb-xxxxxxxxxxxxxxx',
+ },
+ }],
+ ['baz', {
+ 'job': {
+ 'uuid': 'zzzzz-8i9sb-000000000000002',
+ 'log': 'fake-log-pdh-2',
+ 'runtime_constraints': {
+ 'min_ram_mb_per_node': 900,
+ 'min_cores_per_node': 1,
+ },
+ },
+ }]]),
+ }
+
+ @mock.patch('arvados.collection.CollectionReader')
+ @mock.patch('arvados.api')
+ def test_pipeline(self, mock_api, mock_cr):
+ logfile = os.path.join(TESTS_DIR, 'logfile_20151204190335.txt.gz')
+ mock_api().pipeline_instances().get().execute. \
+ return_value = self.fake_instance
+ mock_cr().__iter__.return_value = ['fake-logfile.txt']
+ mock_cr().open.side_effect = [gzip.open(logfile) for _ in range(3)]
+ args = crunchstat_summary.command.ArgumentParser().parse_args(
+ ['--pipeline-instance', self.fake_instance['uuid']])
+ cmd = crunchstat_summary.command.Command(args)
+ cmd.run()
+
+ job_report = [
+ line for line in open(logfile+'.report').readlines()
+ if not line.startswith('#!! ')]
+ expect = (
+ ['### Summary for foo (zzzzz-8i9sb-000000000000000)\n'] +
+ job_report + ['\n'] +
+ ['### Summary for bar (zzzzz-8i9sb-000000000000001)\n'] +
+ job_report + ['\n'] +
+ ['### Summary for baz (zzzzz-8i9sb-000000000000002)\n'] +
+ job_report)
+ self.diff_report(cmd, expect)
+ mock_cr.assert_has_calls(
+ [
+ mock.call('fake-log-pdh-0'),
+ mock.call('fake-log-pdh-1'),
+ mock.call('fake-log-pdh-2'),
+ ], any_order=True)
+ mock_cr().open.assert_called_with('fake-logfile.txt')