Merge branch '7286-nodeman-destroy-broken-nodes' closes #7286
authorPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 6 Oct 2015 13:17:30 +0000 (09:17 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 6 Oct 2015 13:17:43 +0000 (09:17 -0400)
27 files changed:
doc/_includes/_install_ruby_and_bundler.liquid
doc/install/install-api-server.html.textile.liquid
doc/install/install-sso.html.textile.liquid
doc/install/install-workbench-app.html.textile.liquid
doc/sdk/cli/subcommands.html.textile.liquid
sdk/cli/bin/arv
sdk/cli/bin/crunch-job
sdk/go/keepclient/keepclient.go
sdk/go/keepclient/keepclient_test.go
services/api/script/salvage_collection.rb
services/keepproxy/keepproxy.go
services/keepproxy/keepproxy_test.go
services/keepproxy/pkg-extras/etc/default/keepproxy [new file with mode: 0644]
services/keepproxy/pkg-extras/etc/init.d/keepproxy [new file with mode: 0755]
services/keepstore/azure_blob_volume.go [new file with mode: 0644]
services/keepstore/azure_blob_volume_test.go [new file with mode: 0644]
services/keepstore/collision.go
services/keepstore/handler_test.go
services/keepstore/handlers.go
services/keepstore/handlers_with_generic_volume_test.go
services/keepstore/keepstore.go
services/keepstore/keepstore_test.go
services/keepstore/pull_worker.go
services/keepstore/volume.go
services/keepstore/volume_generic_test.go
services/keepstore/volume_test.go
services/keepstore/volume_unix.go

index 502e2bcb3a41e1bdacd8f0c55a1afde95b7a55d6..e79cffb7f86dd9adf5a622b231ceda25e057f4e4 100644 (file)
@@ -5,7 +5,6 @@ h4(#rvm). *Option 1: Install with RVM*
 <notextile>
 <pre><code><span class="userinput">sudo gpg --keyserver hkp://keys.gnupg.net --recv-keys 409B6B1796C275462A1703113804BB82D39DC0E3
 \curl -sSL https://get.rvm.io | sudo bash -s stable --ruby=2.1
-sudo adduser "$USER" rvm
 </span></code></pre></notextile>
 
 Either log out and log back in to activate RVM, or explicitly load it in all open shells like this:
@@ -46,7 +45,7 @@ Install prerequisites for Ubuntu 12.04 or 14.04:
 <pre><code><span class="userinput">sudo apt-get install \
     gawk g++ gcc make libc6-dev libreadline6-dev zlib1g-dev libssl-dev \
     libyaml-dev libsqlite3-dev sqlite3 autoconf libgdbm-dev \
-    libncurses5-dev automake libtool bison pkg-config libffi-dev
+    libncurses5-dev automake libtool bison pkg-config libffi-dev curl
 </span></code></pre></notextile>
 
 Build and install Ruby:
@@ -54,8 +53,8 @@ Build and install Ruby:
 <notextile>
 <pre><code><span class="userinput">mkdir -p ~/src
 cd ~/src
-curl http://cache.ruby-lang.org/pub/ruby/2.1/ruby-2.1.6.tar.gz | tar xz
-cd ruby-2.1.6
+curl http://cache.ruby-lang.org/pub/ruby/2.1/ruby-2.1.7.tar.gz | tar xz
+cd ruby-2.1.7
 ./configure --disable-install-rdoc
 make
 sudo make install
index fb43f783f04ffc5346da7cf46224ce8856eecb02..695584fa247f2176db5d690adebf4b9002389f0d 100644 (file)
@@ -203,7 +203,7 @@ For best performance, we recommend you use Nginx as your Web server front-end, w
 
 <notextile>
 <ol>
-<li><a href="https://www.phusionpassenger.com/documentation/Users%20guide%20Nginx.html">Install Nginx and Phusion Passenger</a>.</li>
+<li><a href="https://www.phusionpassenger.com/library/walkthroughs/deploy/ruby/ownserver/nginx/oss/install_passenger_main.html">Install Nginx and Phusion Passenger</a>.</li>
 
 <li><p>Puma is already included with the API server's gems.  We recommend you run it as a service under <a href="http://smarden.org/runit/">runit</a> or a similar tool.  Here's a sample runit script for that:</p>
 
index af999e95a922b14bbda53339b5d7eb29c20bae6b..56c7a4b337eb5eb0c2b6c8fbd2aa47675e353b7c 100644 (file)
 ---
 layout: default
 navsection: installguide
-title: Install Single Sign On (SSO) server
+title: Install the Single Sign On (SSO) server
 ...
 
-h2(#dependencies). Install dependencies
+h2(#dependencies). Install prerequisites
 
-h3(#install_git_curl). Install git and curl
-
-{% include 'install_git_curl' %}
+The Arvados package repository includes an SSO server package that can help automate much of the deployment.
 
 h3(#install_ruby_and_bundler). Install Ruby and Bundler
 
 {% include 'install_ruby_and_bundler' %}
 
-h3(#install_postgres). Install PostgreSQL
+h3(#install_web_server). Set up a Web server
+
+For best performance, we recommend you use Nginx as your Web server frontend with a Passenger backend to serve the SSO server. The Passenger team provides "Nginx + Passenger installation instructions":https://www.phusionpassenger.com/library/walkthroughs/deploy/ruby/ownserver/nginx/oss/install_passenger_main.html.
 
-{% include 'install_postgres' %}
+Follow the instructions until you see the section that says you are ready to deploy your Ruby application on the production server.
 
-h2(#install). Install SSO server
+h2(#install). Install the SSO server
 
-h3. Get SSO server code and run bundle
+On a Debian-based system, install the following package:
 
 <notextile>
-<pre><code>~$ <span class="userinput">cd $HOME</span> # (or wherever you want to install)
-~$ <span class="userinput">git clone https://github.com/curoverse/sso-devise-omniauth-provider.git</span>
-~$ <span class="userinput">cd sso-devise-omniauth-provider</span>
-~/sso-devise-omniauth-provider$ <span class="userinput">bundle install --without=development</span>
-</code></pre></notextile>
+<pre><code>~$ <span class="userinput">sudo apt-get install arvados-sso-server</span>
+</code></pre>
+</notextile>
+
+On a Red Hat-based system, install the following package:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo yum install arvados-sso-server</span>
+</code></pre>
+</notextile>
 
-h2. Configure the SSO server
+h2(#configure). Configure the SSO server
 
-First, copy the example configuration file:
+The package has installed three configuration files in @/etc/arvados/sso@:
 
 <notextile>
-<pre><code>~/sso-devise-omniauth-provider$ <span class="userinput">cp -i config/application.yml.example config/application.yml</span>
-</code></pre></notextile>
+<pre><code>/etc/arvados/sso/application.yml
+/etc/arvados/sso/database.yml
+/etc/arvados/sso/production.rb
+</code></pre>
+</notextile>
+
+The SSO server runs from the @/var/www/arvados-sso/current/@ directory. The files @/var/www/arvados-sso/current/config/application.yml@, @/var/www/arvados-sso/current/config/database.yml@ and @/var/www/arvados-sso/current/config/environments/production.rb@ are symlinked to the configuration files in @/etc/arvados/sso/@.
 
 The SSO server reads the @config/application.yml@ file, as well as the @config/application.defaults.yml@ file. Values in @config/application.yml@ take precedence over the defaults that are defined in @config/application.defaults.yml@. The @config/application.yml.example@ file is not read by the SSO server and is provided for installation convenience only.
 
-Consult @config/application.default.yml@ for a full list of configuration options.  Local configuration goes in @config/application.yml@, do not edit @config/application.default.yml@.
+Consult @config/application.default.yml@ for a full list of configuration options.  Local configuration goes in @/etc/arvados/sso/application.yml@, do not edit @config/application.default.yml@.
 
 h3(#uuid_prefix). uuid_prefix
 
 Generate a uuid prefix for the single sign on service.  This prefix is used to identify user records as originating from this site.  It must be exactly 5 lowercase ASCII letters and/or digits.  You may use the following snippet to generate a uuid prefix:
 
 <notextile>
-<pre><code>~/sso-devise-omniauth-provider$ <span class="userinput">ruby -e 'puts "#{rand(2**64).to_s(36)[0,5]}"'</span>
+<pre><code>~$ <span class="userinput">ruby -e 'puts "#{rand(2**64).to_s(36)[0,5]}"'</span>
 abcde
 </code></pre></notextile>
 
-Edit @config/application.yml@ and set @uuid_prefix@ in the "common" section.
+Edit @/etc/arvados/sso/application.yml@ and set @uuid_prefix@ in the "common" section.
 
 h3(#secret_token). secret_token
 
 Generate a new secret token for signing cookies:
 
 <notextile>
-<pre><code>~/sso-devise-omniauth-provider$ <span class="userinput">ruby -e 'puts rand(2**400).to_s(36)'</span>
+<pre><code>~$ <span class="userinput">ruby -e 'puts rand(2**400).to_s(36)'</span>
 zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz
 </code></pre></notextile>
 
-Edit @config/application.yml@ and set @secret_token@ in the "common" section.
+Edit @/etc/arvados/sso/application.yml@ and set @secret_token@ in the "common" section.
+
+There are other configuration options in @/etc/arvados/sso/application.yml@. See the "Authentication methods":install-sso.html#authentication_methods section below for more details.
 
 h2(#database). Set up the database
 
-Generate a new database password. Nobody ever needs to memorize it or type it, so make a strong one:
+If PostgreSQL was newly installed as a dependency of the @arvados-sso-server@ package, you will need to start the service.
+
+On a Debian-based system:
 
 <notextile>
-<pre><code>~/sso-devise-omniauth-provider$ <span class="userinput">ruby -e 'puts rand(2**128).to_s(36)'</span>
+<pre><code>~$ <span class="userinput">sudo service postgresql start</span>
+</code></pre>
+</notextile>
+
+On a Red Hat-based system, we also need to initialize the database system:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo service postgresql initdb</span>
+~$ <span class="userinput">sudo service postgresql start</span>
+</code></pre>
+</notextile>
+
+{% include 'notebox_begin' %}
+
+If you are installing on CentOS6, you will need to modify PostgreSQL's configuration to allow password authentication for local users. The default configuration allows 'ident' only. The following commands will make the configuration change, and restart PostgreSQL for it to take effect.
+<br/>
+<notextile>
+<pre><code>~$ <span class="userinput">sudo sed -i -e "s/127.0.0.1\/32          ident/127.0.0.1\/32          md5/" /var/lib/pgsql/data/pg_hba.conf</span>
+~$ <span class="userinput">sudo sed -i -e "s/::1\/128               ident/::1\/128               md5/" /var/lib/pgsql/data/pg_hba.conf</span>
+~$ <span class="userinput">sudo service postgresql restart</span>
+</code></pre>
+</notextile>
+{% include 'notebox_end' %}
+
+
+Next, generate a new database password. Nobody ever needs to memorize it or type it, so make a strong one:
+
+<notextile>
+<pre><code>~$ <span class="userinput">ruby -e 'puts rand(2**128).to_s(36)'</span>
 abcdefghijklmnopqrstuvwxyz012345689
 </code></pre></notextile>
 
+Configure the SSO server to connect to your database by updating @/etc/arvados/sso/database.yml@. Replace the @xxxxxxxx@ database password placeholder with the new password you generated above. Be sure to update the @production@ section.
+
+<notextile>
+<pre><code>~$ <span class="userinput">editor /etc/arvados/sso/database.yml</span>
+</code></pre></notextile>
+
 Create a new database user with permission to create its own databases.
 
 <notextile>
-<pre><code>~/sso-devise-omniauth-provider$ <span class="userinput">sudo -u postgres createuser --createdb --encrypted -R -S --pwprompt arvados_sso</span>
+<pre><code>~$ <span class="userinput">sudo -u postgres createuser --createdb --encrypted -R -S --pwprompt arvados_sso</span>
 Enter password for new role: <span class="userinput">paste-database-password-you-generated</span>
 Enter it again: <span class="userinput">paste-database-password-you-generated</span>
 </code></pre></notextile>
 
-Configure SSO server to connect to your database by creating and updating @config/database.yml@. Replace the @xxxxxxxx@ database password placeholders with the new password you generated above.  If you are planning a production system, update the @production@ section, otherwise use @development@.
+Rails will take care of creating the database, based on the information from @/etc/arvados/sso/database.yml@. 
+
+Alternatively, if the database user you intend to use for the SSO server should not be allowed to create new databases, the user and the database can be created like this:
 
 <notextile>
-<pre><code>~/sso-devise-omniauth-provider$ <span class="userinput">cp -i config/database.yml.example config/database.yml</span>
-~/sso-devise-omniauth-provider$ <span class="userinput">edit config/database.yml</span>
+<pre><code>~$ <span class="userinput">sudo -u postgres createuser --encrypted -R -S --pwprompt arvados_sso</span>
+Enter password for new role: <span class="userinput">paste-database-password-you-generated</span>
+Enter it again: <span class="userinput">paste-database-password-you-generated</span>
+~$ <span class="userinput">sudo -u postgres createdb arvados_sso_production -E UTF8 -O arvados_sso -T template0</span>
 </code></pre></notextile>
 
-Create and initialize the database. If you are planning a production system, choose the @production@ rails environment, otherwise use @development@.
+h2(#reconfigure_package). Reconfigure the package
+
+Now that the @/etc/arvados/sso/application.yml@ and @/etc/arvados/sso/database.yml@ files have been updated, we need to reconfigure our installed package. Doing so will create and/or initialize the database and precompile the assets.
+
+On a Debian-based system:
 
 <notextile>
-<pre><code>~/sso-devise-omniauth-provider$ <span class="userinput">RAILS_ENV=production bundle exec rake db:setup</span>
-</code></pre></notextile>
+<pre><code>~$ <span class="userinput">sudo dpkg-reconfigure arvados-sso-server</span>
+</code></pre>
+</notextile>
 
-Alternatively, if the database user you intend to use for the SSO server is not allowed to create new databases, you can create the database first and then populate it with rake. Be sure to adjust the database name if you are using the @development@ environment. This sequence of commands is functionally equivalent to the rake db:setup command above:
+On a Red Hat-based system, we need to reinstall the package instead:
 
 <notextile>
-<pre><code>~/sso-devise-omniauth-provider$ <span class="userinput">sudo -u postgres createdb arvados_sso_production -E UTF8 -O arvados_sso -T template0</span>
-~/sso-devise-omniauth-provider$ <span class="userinput">RAILS_ENV=production bundle exec rake db:schema:load</span>
-~/sso-devise-omniauth-provider$ <span class="userinput">RAILS_ENV=production bundle exec rake db:seed</span>
-</code></pre></notextile>
+<pre><code>~$ <span class="userinput">sudo yum reinstall arvados-sso-server</span>
+</code></pre>
+</notextile>
 
 h2(#client). Create arvados-server client
 
 Use @rails console@ to create a @Client@ record that will be used by the Arvados API server.  The values of @app_id@ and @app_secret@ correspond to the values for @sso_app_id@ and @sso_app_secret@ in the "API server's SSO settings.":install-api-server.html#omniauth
 
 <notextile>
-<pre><code>~/sso-devise-omniauth-provider$ <span class="userinput">ruby -e 'puts rand(2**400).to_s(36)'</span>
+<pre><code>~$ <span class="userinput">ruby -e 'puts rand(2**400).to_s(36)'</span>
 xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
-~/sso-devise-omniauth-provider$ <span class="userinput">RAILS_ENV=production bundle exec rails console</span>
+~$ <span class="userinput">RAILS_ENV=production bundle exec rails console</span>
 :001 &gt; <span class="userinput">c = Client.new</span>
 :002 &gt; <span class="userinput">c.name = "joshid"</span>
 :003 &gt; <span class="userinput">c.app_id = "arvados-server"</span>
@@ -118,15 +174,56 @@ xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
 </code></pre>
 </notextile>
 
-h2(#assets). Precompile assets
+h2(#configure_web_server). Configure your web server
 
-If you are running in the production environment, you must precompile the assets:
+Edit the http section of your Nginx configuration to run the Passenger server and act as a frontend for it. You might add a block like the following, adding SSL and logging parameters to taste:
 
 <notextile>
-<pre><code>~/sso-devise-omniauth-provider$ <span class="userinput">RAILS_ENV=production bundle exec rake assets:precompile</span>
+<pre><code>server {
+  listen 127.0.0.1:8900;
+  server_name localhost-sso;
+
+  root   /var/www/arvados-sso/current/public;
+  index  index.html;
+
+  passenger_enabled on;
+  # If you're not using RVM, comment out the line below.
+  passenger_ruby /usr/local/rvm/wrappers/default/ruby;
+}
+
+upstream sso {
+  server     127.0.0.1:8900  fail_timeout=10s;
+}
+
+proxy_http_version 1.1;
+
+server {
+  listen       <span class="userinput">[your public IP address]</span>:443 ssl;
+  server_name  auth.<span class="userinput">your.domain</span>;
+
+  ssl on;
+  ssl_certificate     <span class="userinput">/YOUR/PATH/TO/cert.pem</span>;
+  ssl_certificate_key <span class="userinput">/YOUR/PATH/TO/cert.key</span>;
+
+  index  index.html;
+
+  location / {
+    proxy_pass            http://sso;
+    proxy_redirect        off;
+    proxy_connect_timeout 90s;
+    proxy_read_timeout    300s;
+
+    proxy_set_header      X-Forwarded-Proto https;
+    proxy_set_header      Host $http_host;
+    proxy_set_header      X-Real-IP $remote_addr;
+    proxy_set_header      X-Forwarded-For $proxy_add_x_forwarded_for;
+  }
+}
 </code></pre>
 </notextile>
 
+Finally, restart Nginx and your Arvados SSO server should be up and running. You can verify that by visiting the URL you configured your Nginx web server to listen on in the server section above (port 443). Read on if you want to configure your Arvados SSO server to use a different authentication backend.
+
 h2(#authentication_methods). Authentication methods
 
 Authentication methods are configured in @application.yml@.  Currently three authentication methods are supported: local accounts, LDAP, and Google+.  If neither Google+ nor LDAP are enabled, the SSO server defaults to local user accounts.   Only one authentication mechanism should be in use at a time.
@@ -150,7 +247,7 @@ For more information about configuring backend support for sending email (requir
 If @allow_account_registration@ is false, you may manually create local accounts on the SSO server from the rails console:
 
 <notextile>
-<pre><code>~/sso-devise-omniauth-provider$ <span class="userinput">RAILS_ENV=production bundle exec rails console</span>
+<pre><code>~$ <span class="userinput">RAILS_ENV=production bundle exec rails console</span>
 :001 &gt; <span class="userinput">user = User.new(:email =&gt; "test@example.com")</span>
 :002 &gt; <span class="userinput">user.password = "passw0rd"</span>
 :003 &gt; <span class="userinput">user.save!</span>
@@ -210,76 +307,4 @@ In order to use Google+ authentication, you must use the <a href="https://consol
   google_oauth2_client_id: <span class="userinput">"---YOUR---CLIENT---ID---HERE--"-</span>
   google_oauth2_client_secret: <span class="userinput">"---YOUR---CLIENT---SECRET---HERE--"-</span></code></pre></notextile>
 
-h2(#start). Set up a Web server
-
-For best performance, we recommend you use Nginx as your Web server front-end, with a Passenger backend to serve the SSO server.  To do that:
-
-<notextile>
-<ol>
-<li><a href="https://www.phusionpassenger.com/documentation/Users%20guide%20Nginx.html">Install Nginx and Phusion Passenger</a>.</li>
-
-<li><p>Edit the http section of your Nginx configuration to run the Passenger server, and act as a front-end for it.  You might add a block like the following, adding SSL and logging parameters to taste:</p>
-
-<pre><code>server {
-  listen 127.0.0.1:8900;
-  server_name localhost-sso;
-
-  root   <span class="userinput">/YOUR/PATH/TO/sso-devise-omniauth-provider/public</span>;
-  index  index.html index.htm index.php;
-
-  passenger_enabled on;
-  # If you're using RVM, uncomment the line below.
-  #passenger_ruby /usr/local/rvm/wrappers/default/ruby;
-}
-
-upstream sso {
-  server     127.0.0.1:8900  fail_timeout=10s;
-}
-
-proxy_http_version 1.1;
-
-server {
-  listen       <span class="userinput">[your public IP address]</span>:443 ssl;
-  server_name  auth.<span class="userinput">your.domain</span>;
-
-  ssl on;
-  ssl_certificate     <span class="userinput">/YOUR/PATH/TO/cert.pem</span>;
-  ssl_certificate_key <span class="userinput">/YOUR/PATH/TO/cert.key</span>;
-
-  index  index.html index.htm index.php;
-
-  location / {
-    proxy_pass            http://sso;
-    proxy_redirect        off;
-    proxy_connect_timeout 90s;
-    proxy_read_timeout    300s;
-
-    proxy_set_header      X-Forwarded-Proto https;
-    proxy_set_header      Host $http_host;
-    proxy_set_header      X-Real-IP $remote_addr;
-    proxy_set_header      X-Forwarded-For $proxy_add_x_forwarded_for;
-  }
-}
-</code></pre>
-</li>
-
-<li>Restart Nginx.</li>
-
-</ol>
-</notextile>
 
-{% include 'notebox_begin' %}
-
-If you see the following warning "you may safely ignore it":https://stackoverflow.com/questions/10374871/no-secret-option-provided-to-racksessioncookie-warning:
-
-<pre>
-SECURITY WARNING: No secret option provided to Rack::Session::Cookie.
-This poses a security threat. It is strongly recommended that you
-provide a secret to prevent exploits that may be possible from crafted
-cookies. This will not be supported in future versions of Rack, and
-future versions will even invalidate your existing user cookies.
-
-Called from: /var/lib/gems/2.1.0/gems/actionpack-3.2.8/lib/action_dispatch/middleware/session/abstract_store.rb:28:in `initialize'.
-</pre>
-
-{% include 'notebox_end' %}
index 52a69f502b1d6e65ad920230ab91599f86be0b19..22fc1557a8fae0336fff6f0f80e89e8f8d1b1262 100644 (file)
@@ -6,7 +6,7 @@ title: Install Workbench
 
 h2. Install prerequisites
 
-The Arvados package repository includes Workbench server package that can help automate much of the deployment.
+The Arvados package repository includes Workbench server package that can help automate much of the deployment.
 
 h3(#install_ruby_and_bundler). Install Ruby and Bundler
 
@@ -93,7 +93,7 @@ For best performance, we recommend you use Nginx as your Web server front-end, w
 
 <notextile>
 <ol>
-<li><a href="https://www.phusionpassenger.com/documentation/Users%20guide%20Nginx.html">Install Nginx and Phusion Passenger</a>.</li>
+<li><a href="https://www.phusionpassenger.com/library/walkthroughs/deploy/ruby/ownserver/nginx/oss/install_passenger_main.html">Install Nginx and Phusion Passenger</a>.</li>
 
 <li>If you're deploying on CentOS and using the python27 Software Collection, configure Nginx to use it:
 
index 018213c1a1815061f3265a85040937dcb53652a7..aa7af94d5f9d4ac8527e7d3e238e15d19f177905 100644 (file)
@@ -157,7 +157,7 @@ $ <code class="userinput">arv keep --help</code>
 Usage: arv keep [method] [--parameters]
 Use 'arv keep [method] --help' to get more information about specific methods.
 
-Available methods: ls, get, put, less, check, docker
+Available methods: ls, get, put, docker
 </pre>
 </notextile>
 
index b757855b1f22653eef4a6faaa96cc2c684214e86..2bd7f4ef465d7a3427425716c11f9e3574d9f226 100755 (executable)
@@ -100,15 +100,12 @@ def check_subcommands client, arvados, subcommand, global_opts, remaining_opts
     if ['get', 'put', 'ls', 'normalize'].index @sub then
       # Native Arvados
       exec `which arv-#{@sub}`.strip, *remaining_opts
-    elsif ['less', 'check'].index @sub then
-      # wh* shims
-      exec `which wh#{@sub}`.strip, *remaining_opts
     elsif @sub == 'docker'
       exec `which arv-keepdocker`.strip, *remaining_opts
     else
       puts "Usage: arv keep [method] [--parameters]\n"
       puts "Use 'arv keep [method] --help' to get more information about specific methods.\n\n"
-      puts "Available methods: ls, get, put, less, check, docker"
+      puts "Available methods: ls, get, put, docker"
     end
     abort
   when 'pipeline'
index 423f4cc043409b0f63f96c6c84333b746e21d19a..5539012c49e19e2b9afbeba26850aabbec9405d1 100755 (executable)
@@ -980,7 +980,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     my $gotsome
        = readfrompipes ()
        + reapchildren ();
-    if (!$gotsome)
+    if (!$gotsome || ($latest_refresh + 2 < scalar time))
     {
       check_refresh_wanted();
       check_squeue();
index 70aa3746bcc117932beba6313ed1b3d9a0ac0a3d..53dfb2b5384302b0f113ec608c016760a2ca77ce 100644 (file)
@@ -2,6 +2,7 @@
 package keepclient
 
 import (
+       "bytes"
        "crypto/md5"
        "crypto/tls"
        "errors"
@@ -29,6 +30,12 @@ var MissingArvadosApiHost = errors.New("Missing required environment variable AR
 var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN")
 var InvalidLocatorError = errors.New("Invalid locator")
 
+// ErrNoSuchKeepServer is returned when GetIndex is invoked with a UUID with no matching keep server
+var ErrNoSuchKeepServer = errors.New("No keep server matching the given UUID is found")
+
+// ErrIncompleteIndex is returned when the Index response does not end with a new empty line
+var ErrIncompleteIndex = errors.New("Got incomplete index")
+
 const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas"
 const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored"
 
@@ -182,6 +189,58 @@ func (kc *KeepClient) Ask(locator string) (int64, string, error) {
        return 0, "", BlockNotFound
 }
 
+// GetIndex retrieves a list of blocks stored on the given server whose hashes
+// begin with the given prefix. The returned reader will return an error (other
+// than EOF) if the complete index cannot be retrieved.
+//
+// This is meant to be used only by system components and admin tools.
+// It will return an error unless the client is using a "data manager token"
+// recognized by the Keep services.
+func (kc *KeepClient) GetIndex(keepServiceUUID, prefix string) (io.Reader, error) {
+       url := kc.LocalRoots()[keepServiceUUID]
+       if url == "" {
+               return nil, ErrNoSuchKeepServer
+       }
+
+       url += "/index"
+       if prefix != "" {
+               url += "/" + prefix
+       }
+
+       req, err := http.NewRequest("GET", url, nil)
+       if err != nil {
+               return nil, err
+       }
+
+       req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
+       resp, err := kc.Client.Do(req)
+       if err != nil {
+               return nil, err
+       }
+
+       defer resp.Body.Close()
+
+       if resp.StatusCode != http.StatusOK {
+               return nil, fmt.Errorf("Got http status code: %d", resp.StatusCode)
+       }
+
+       var respBody []byte
+       respBody, err = ioutil.ReadAll(resp.Body)
+       if err != nil {
+               return nil, err
+       }
+
+       // Got index; verify that it is complete
+       // The response should be "\n" if no locators matched the prefix
+       // Else, it should be a list of locators followed by a blank line
+       if !bytes.Equal(respBody, []byte("\n")) && !bytes.HasSuffix(respBody, []byte("\n\n")) {
+               return nil, ErrIncompleteIndex
+       }
+
+       // Got complete index; strip the trailing newline and send
+       return bytes.NewReader(respBody[0 : len(respBody)-1]), nil
+}
+
 // LocalRoots() returns the map of local (i.e., disk and proxy) Keep
 // services: uuid -> baseURI.
 func (kc *KeepClient) LocalRoots() map[string]string {
index e4e459e83a51c408141c7127a098f69e955a005e..ee60d2822f2f95b439101b974d9a11001831b346 100644 (file)
@@ -948,3 +948,139 @@ func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
        c.Check(err, Equals, InsufficientReplicasError)
        c.Check(replicas, Equals, 0)
 }
+
+type StubGetIndexHandler struct {
+       c              *C
+       expectPath     string
+       expectAPIToken string
+       httpStatus     int
+       body           []byte
+}
+
+func (h StubGetIndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       h.c.Check(req.URL.Path, Equals, h.expectPath)
+       h.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", h.expectAPIToken))
+       resp.WriteHeader(h.httpStatus)
+       resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(h.body)))
+       resp.Write(h.body)
+}
+
+func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
+       hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+       st := StubGetIndexHandler{
+               c,
+               "/index",
+               "abc123",
+               http.StatusOK,
+               []byte(hash + "+3 1443559274\n\n")}
+
+       ks := RunFakeKeepServer(st)
+       defer ks.listener.Close()
+
+       arv, err := arvadosclient.MakeArvadosClient()
+       kc, _ := MakeKeepClient(&arv)
+       arv.ApiToken = "abc123"
+       kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+
+       r, err := kc.GetIndex("x", "")
+       c.Check(err, Equals, nil)
+
+       content, err2 := ioutil.ReadAll(r)
+       c.Check(err2, Equals, nil)
+       c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
+}
+
+func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
+       hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+       st := StubGetIndexHandler{
+               c,
+               "/index/" + hash[0:3],
+               "abc123",
+               http.StatusOK,
+               []byte(hash + "+3 1443559274\n\n")}
+
+       ks := RunFakeKeepServer(st)
+       defer ks.listener.Close()
+
+       arv, err := arvadosclient.MakeArvadosClient()
+       kc, _ := MakeKeepClient(&arv)
+       arv.ApiToken = "abc123"
+       kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+
+       r, err := kc.GetIndex("x", hash[0:3])
+       c.Check(err, Equals, nil)
+
+       content, err2 := ioutil.ReadAll(r)
+       c.Check(err2, Equals, nil)
+       c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
+}
+
+func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) {
+       hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+       st := StubGetIndexHandler{
+               c,
+               "/index/" + hash[0:3],
+               "abc123",
+               http.StatusOK,
+               []byte(hash)}
+
+       ks := RunFakeKeepServer(st)
+       defer ks.listener.Close()
+
+       arv, err := arvadosclient.MakeArvadosClient()
+       kc, _ := MakeKeepClient(&arv)
+       arv.ApiToken = "abc123"
+       kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+
+       _, err = kc.GetIndex("x", hash[0:3])
+       c.Check(err, Equals, ErrIncompleteIndex)
+}
+
+func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) {
+       hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+       st := StubGetIndexHandler{
+               c,
+               "/index/" + hash[0:3],
+               "abc123",
+               http.StatusOK,
+               []byte(hash)}
+
+       ks := RunFakeKeepServer(st)
+       defer ks.listener.Close()
+
+       arv, err := arvadosclient.MakeArvadosClient()
+       kc, _ := MakeKeepClient(&arv)
+       arv.ApiToken = "abc123"
+       kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+
+       _, err = kc.GetIndex("y", hash[0:3])
+       c.Check(err, Equals, ErrNoSuchKeepServer)
+}
+
+func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
+       st := StubGetIndexHandler{
+               c,
+               "/index/abcd",
+               "abc123",
+               http.StatusOK,
+               []byte("\n")}
+
+       ks := RunFakeKeepServer(st)
+       defer ks.listener.Close()
+
+       arv, err := arvadosclient.MakeArvadosClient()
+       kc, _ := MakeKeepClient(&arv)
+       arv.ApiToken = "abc123"
+       kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+
+       r, err := kc.GetIndex("x", "abcd")
+       c.Check(err, Equals, nil)
+
+       content, err2 := ioutil.ReadAll(r)
+       c.Check(err2, Equals, nil)
+       c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
+}
index b70807bd38acbb484da6e8f89f0c9525b4491993..7ef98bb7c8571ec8a6f64f4eb4a8906d25c52361 100755 (executable)
@@ -12,6 +12,7 @@
 
 require 'trollop'
 require './lib/salvage_collection'
+include SalvageCollection
 
 opts = Trollop::options do
   banner ''
index d0af4a58ea5e7746d8243fb6272820e8c4801307..7900096caf0ad9e80dfe13fde5970dbd0876db27 100644 (file)
@@ -241,6 +241,11 @@ type PutBlockHandler struct {
        *ApiTokenCache
 }
 
+type IndexHandler struct {
+       *keepclient.KeepClient
+       *ApiTokenCache
+}
+
 type InvalidPathHandler struct{}
 
 type OptionsHandler struct{}
@@ -262,6 +267,12 @@ func MakeRESTRouter(
                rest.Handle(`/{locator:[0-9a-f]{32}\+.*}`,
                        GetBlockHandler{kc, t}).Methods("GET", "HEAD")
                rest.Handle(`/{locator:[0-9a-f]{32}}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD")
+
+               // List all blocks
+               rest.Handle(`/index`, IndexHandler{kc, t}).Methods("GET")
+
+               // List blocks whose hash has the given prefix
+               rest.Handle(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler{kc, t}).Methods("GET")
        }
 
        if enable_put {
@@ -481,3 +492,63 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
                status = http.StatusBadGateway
        }
 }
+
+// ServeHTTP implementation for IndexHandler
+// Supports only GET requests for /index/{prefix:[0-9a-f]{0,32}}
+// For each keep server found in LocalRoots:
+//   Invokes GetIndex using keepclient
+//   Expects "complete" response (terminating with blank new line)
+//   Aborts on any errors
+// Concatenates responses from all those keep servers and returns
+func (handler IndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       SetCorsHeaders(resp)
+
+       prefix := mux.Vars(req)["prefix"]
+       var err error
+       var status int
+
+       defer func() {
+               if status != http.StatusOK {
+                       http.Error(resp, err.Error(), status)
+               }
+       }()
+
+       kc := *handler.KeepClient
+
+       ok, token := CheckAuthorizationHeader(kc, handler.ApiTokenCache, req)
+       if !ok {
+               status, err = http.StatusForbidden, BadAuthorizationHeader
+               return
+       }
+
+       // Copy ArvadosClient struct and use the client's API token
+       arvclient := *kc.Arvados
+       arvclient.ApiToken = token
+       kc.Arvados = &arvclient
+
+       // Only GET method is supported
+       if req.Method != "GET" {
+               status, err = http.StatusNotImplemented, MethodNotSupported
+               return
+       }
+
+       // Get index from all LocalRoots and write to resp
+       var reader io.Reader
+       for uuid := range kc.LocalRoots() {
+               reader, err = kc.GetIndex(uuid, prefix)
+               if err != nil {
+                       status = http.StatusBadGateway
+                       return
+               }
+
+               _, err = io.Copy(resp, reader)
+               if err != nil {
+                       status = http.StatusBadGateway
+                       return
+               }
+       }
+
+       // Got index from all the keep servers and wrote to resp
+       status = http.StatusOK
+       resp.Write([]byte("\n"))
+}
index 22cc72ea15ea4b23b308e355a0292cec808557d2..6fe8fe7ac3d0023c4d012be022c9ab3d50431d62 100644 (file)
@@ -406,3 +406,70 @@ func (s *ServerRequiredSuite) TestStripHint(c *C) {
                "http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+K@zzzzz-zzzzz-zzzzzzzzzzzzzzz+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73")
 
 }
+
+// Test GetIndex
+//   Put one block, with 2 replicas
+//   With no prefix (expect the block locator, twice)
+//   With an existing prefix (expect the block locator, twice)
+//   With a valid but non-existing prefix (expect "\n")
+//   With an invalid prefix (expect error)
+func (s *ServerRequiredSuite) TestGetIndex(c *C) {
+       kc := runProxy(c, []string{"keepproxy"}, 28852, false)
+       waitForListener()
+       defer closeListener()
+
+       // Put "index-data" blocks
+       data := []byte("index-data")
+       hash := fmt.Sprintf("%x", md5.Sum(data))
+
+       hash2, rep, err := kc.PutB(data)
+       c.Check(hash2, Matches, fmt.Sprintf(`^%s\+10(\+.+)?$`, hash))
+       c.Check(rep, Equals, 2)
+       c.Check(err, Equals, nil)
+
+       reader, blocklen, _, err := kc.Get(hash)
+       c.Assert(err, Equals, nil)
+       c.Check(blocklen, Equals, int64(10))
+       all, err := ioutil.ReadAll(reader)
+       c.Check(all, DeepEquals, data)
+
+       // Put some more blocks
+       _, rep, err = kc.PutB([]byte("some-more-index-data"))
+       c.Check(err, Equals, nil)
+
+       // Invoke GetIndex
+       for _, spec := range []struct {
+               prefix         string
+               expectTestHash bool
+               expectOther    bool
+       }{
+               {"", true, true},         // with no prefix
+               {hash[:3], true, false},  // with matching prefix
+               {"abcdef", false, false}, // with no such prefix
+       } {
+               indexReader, err := kc.GetIndex("proxy", spec.prefix)
+               c.Assert(err, Equals, nil)
+               indexResp, err := ioutil.ReadAll(indexReader)
+               c.Assert(err, Equals, nil)
+               locators := strings.Split(string(indexResp), "\n")
+               gotTestHash := 0
+               gotOther := 0
+               for _, locator := range locators {
+                       if locator == "" {
+                               continue
+                       }
+                       c.Check(locator[:len(spec.prefix)], Equals, spec.prefix)
+                       if locator[:32] == hash {
+                               gotTestHash++
+                       } else {
+                               gotOther++
+                       }
+               }
+               c.Check(gotTestHash == 2, Equals, spec.expectTestHash)
+               c.Check(gotOther > 0, Equals, spec.expectOther)
+       }
+
+       // GetIndex with invalid prefix
+       _, err = kc.GetIndex("proxy", "xyz")
+       c.Assert((err != nil), Equals, true)
+}
diff --git a/services/keepproxy/pkg-extras/etc/default/keepproxy b/services/keepproxy/pkg-extras/etc/default/keepproxy
new file mode 100644 (file)
index 0000000..ddcab10
--- /dev/null
@@ -0,0 +1,7 @@
+user="root"
+group="root"
+chroot="/"
+chdir="/"
+nice=""
+args="-listen=':9100'"
+
diff --git a/services/keepproxy/pkg-extras/etc/init.d/keepproxy b/services/keepproxy/pkg-extras/etc/init.d/keepproxy
new file mode 100755 (executable)
index 0000000..7bb35b9
--- /dev/null
@@ -0,0 +1,156 @@
+#!/bin/sh
+# Init script for keepproxy
+# Maintained by 
+# Generated by pleaserun.
+# Implemented based on LSB Core 3.1:
+#   * Sections: 20.2, 20.3
+#
+### BEGIN INIT INFO
+# Provides:          keepproxy
+# Required-Start:    $remote_fs $syslog
+# Required-Stop:     $remote_fs $syslog
+# Default-Start:     2 3 4 5
+# Default-Stop:      0 1 6
+# Short-Description: 
+# Description:       no description given
+### END INIT INFO
+
+PATH=/sbin:/usr/sbin:/bin:/usr/bin
+export PATH
+
+name=keepproxy
+program=/usr/bin/keepproxy
+args=''
+pidfile="/var/run/$name.pid"
+
+[ -r /etc/default/$name ] && . /etc/default/$name
+[ -r /etc/sysconfig/$name ] && . /etc/sysconfig/$name
+
+trace() {
+  logger -t "/etc/init.d/keepproxy" "$@"
+}
+
+emit() {
+  trace "$@"
+  echo "$@"
+}
+
+start() {
+
+  # Ensure the log directory is setup correctly.
+  [ ! -d "/var/log/" ] && mkdir "/var/log/"
+  chown "$user":"$group" "/var/log/"
+  chmod 755 "/var/log/"
+
+
+  # Setup any environmental stuff beforehand
+  
+
+  # Run the program!
+  
+  chroot --userspec "$user":"$group" "$chroot" sh -c "
+    
+    cd \"$chdir\"
+    exec \"$program\" $args
+  " >> /var/log/keepproxy.stdout 2>> /var/log/keepproxy.stderr &
+
+  # Generate the pidfile from here. If we instead made the forked process
+  # generate it there will be a race condition between the pidfile writing
+  # and a process possibly asking for status.
+  echo $! > $pidfile
+
+  emit "$name started"
+  return 0
+}
+
+stop() {
+  # Try a few times to kill TERM the program
+  if status ; then
+    pid=$(cat "$pidfile")
+    trace "Killing $name (pid $pid) with SIGTERM"
+    kill -TERM $pid
+    # Wait for it to exit.
+    for i in 1 2 3 4 5 ; do
+      trace "Waiting $name (pid $pid) to die..."
+      status || break
+      sleep 1
+    done
+    if status ; then
+      emit "$name stop failed; still running."
+    else
+      emit "$name stopped."
+    fi
+  fi
+}
+
+status() {
+  if [ -f "$pidfile" ] ; then
+    pid=$(cat "$pidfile")
+    if ps -p $pid > /dev/null 2> /dev/null ; then
+      # process by this pid is running.
+      # It may not be our pid, but that's what you get with just pidfiles.
+      # TODO(sissel): Check if this process seems to be the same as the one we
+      # expect. It'd be nice to use flock here, but flock uses fork, not exec,
+      # so it makes it quite awkward to use in this case.
+      return 0
+    else
+      return 2 # program is dead but pid file exists
+    fi
+  else
+    return 3 # program is not running
+  fi
+}
+
+force_stop() {
+  if status ; then
+    stop
+    status && kill -KILL $(cat "$pidfile")
+  fi
+}
+
+
+case "$1" in
+  force-start|start|stop|force-stop|restart)
+    trace "Attempting '$1' on keepproxy"
+    ;;
+esac
+
+case "$1" in
+  force-start)
+    PRESTART=no
+    exec "$0" start
+    ;;
+  start)
+    status
+    code=$?
+    if [ $code -eq 0 ]; then
+      emit "$name is already running"
+      exit $code
+    else
+      start
+      exit $?
+    fi
+    ;;
+  stop) stop ;;
+  force-stop) force_stop ;;
+  status) 
+    status
+    code=$?
+    if [ $code -eq 0 ] ; then
+      emit "$name is running"
+    else
+      emit "$name is not running"
+    fi
+    exit $code
+    ;;
+  restart) 
+    
+    stop && start 
+    ;;
+  *)
+    echo "Usage: $SCRIPTNAME {start|force-start|stop|force-start|force-stop|status|restart}" >&2
+    exit 3
+  ;;
+esac
+
+exit $?
diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
new file mode 100644 (file)
index 0000000..79123a9
--- /dev/null
@@ -0,0 +1,243 @@
+package main
+
+import (
+       "bytes"
+       "errors"
+       "flag"
+       "fmt"
+       "io"
+       "io/ioutil"
+       "log"
+       "os"
+       "strings"
+       "time"
+
+       "github.com/curoverse/azure-sdk-for-go/storage"
+)
+
+var (
+       azureStorageAccountName    string
+       azureStorageAccountKeyFile string
+       azureStorageReplication    int
+)
+
+func readKeyFromFile(file string) (string, error) {
+       buf, err := ioutil.ReadFile(file)
+       if err != nil {
+               return "", errors.New("reading key from " + file + ": " + err.Error())
+       }
+       accountKey := strings.TrimSpace(string(buf))
+       if accountKey == "" {
+               return "", errors.New("empty account key in " + file)
+       }
+       return accountKey, nil
+}
+
+type azureVolumeAdder struct {
+       *volumeSet
+}
+
+func (s *azureVolumeAdder) Set(containerName string) error {
+       if containerName == "" {
+               return errors.New("no container name given")
+       }
+       if azureStorageAccountName == "" || azureStorageAccountKeyFile == "" {
+               return errors.New("-azure-storage-account-name and -azure-storage-account-key-file arguments must given before -azure-storage-container-volume")
+       }
+       accountKey, err := readKeyFromFile(azureStorageAccountKeyFile)
+       if err != nil {
+               return err
+       }
+       azClient, err := storage.NewBasicClient(azureStorageAccountName, accountKey)
+       if err != nil {
+               return errors.New("creating Azure storage client: " + err.Error())
+       }
+       if flagSerializeIO {
+               log.Print("Notice: -serialize is not supported by azure-blob-container volumes.")
+       }
+       v := NewAzureBlobVolume(azClient, containerName, flagReadonly, azureStorageReplication)
+       if err := v.Check(); err != nil {
+               return err
+       }
+       *s.volumeSet = append(*s.volumeSet, v)
+       return nil
+}
+
+func init() {
+       flag.Var(&azureVolumeAdder{&volumes},
+               "azure-storage-container-volume",
+               "Use the given container as a storage volume. Can be given multiple times.")
+       flag.StringVar(
+               &azureStorageAccountName,
+               "azure-storage-account-name",
+               "",
+               "Azure storage account name used for subsequent --azure-storage-container-volume arguments.")
+       flag.StringVar(
+               &azureStorageAccountKeyFile,
+               "azure-storage-account-key-file",
+               "",
+               "File containing the account key used for subsequent --azure-storage-container-volume arguments.")
+       flag.IntVar(
+               &azureStorageReplication,
+               "azure-storage-replication",
+               3,
+               "Replication level to report to clients when data is stored in an Azure container.")
+}
+
+// An AzureBlobVolume stores and retrieves blocks in an Azure Blob
+// container.
+type AzureBlobVolume struct {
+       azClient      storage.Client
+       bsClient      storage.BlobStorageClient
+       containerName string
+       readonly      bool
+       replication   int
+}
+
+func NewAzureBlobVolume(client storage.Client, containerName string, readonly bool, replication int) *AzureBlobVolume {
+       return &AzureBlobVolume{
+               azClient:      client,
+               bsClient:      client.GetBlobService(),
+               containerName: containerName,
+               readonly:      readonly,
+               replication:   replication,
+       }
+}
+
+// Check returns nil if the volume is usable.
+func (v *AzureBlobVolume) Check() error {
+       ok, err := v.bsClient.ContainerExists(v.containerName)
+       if err != nil {
+               return err
+       }
+       if !ok {
+               return errors.New("container does not exist")
+       }
+       return nil
+}
+
+func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
+       rdr, err := v.bsClient.GetBlob(v.containerName, loc)
+       if err != nil {
+               if strings.Contains(err.Error(), "404 Not Found") {
+                       // "storage: service returned without a response body (404 Not Found)"
+                       return nil, os.ErrNotExist
+               }
+               return nil, err
+       }
+       switch err := err.(type) {
+       case nil:
+       default:
+               log.Printf("ERROR IN Get(): %T %#v", err, err)
+               return nil, err
+       }
+       defer rdr.Close()
+       buf := bufs.Get(BlockSize)
+       n, err := io.ReadFull(rdr, buf)
+       switch err {
+       case io.EOF, io.ErrUnexpectedEOF:
+               return buf[:n], nil
+       default:
+               bufs.Put(buf)
+               return nil, err
+       }
+}
+
+func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
+       rdr, err := v.bsClient.GetBlob(v.containerName, loc)
+       if err != nil {
+               return err
+       }
+       defer rdr.Close()
+       return compareReaderWithBuf(rdr, expect, loc[:32])
+}
+
+func (v *AzureBlobVolume) Put(loc string, block []byte) error {
+       if v.readonly {
+               return MethodDisabledError
+       }
+       return v.bsClient.CreateBlockBlobFromReader(v.containerName, loc, uint64(len(block)), bytes.NewReader(block))
+}
+
+func (v *AzureBlobVolume) Touch(loc string) error {
+       if v.readonly {
+               return MethodDisabledError
+       }
+       return v.bsClient.SetBlobMetadata(v.containerName, loc, map[string]string{
+               "touch": fmt.Sprintf("%d", time.Now()),
+       })
+}
+
+func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
+       props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
+       if err != nil {
+               return time.Time{}, err
+       }
+       return time.Parse(time.RFC1123, props.LastModified)
+}
+
+func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
+       params := storage.ListBlobsParameters{
+               Prefix: prefix,
+       }
+       for {
+               resp, err := v.bsClient.ListBlobs(v.containerName, params)
+               if err != nil {
+                       return err
+               }
+               for _, b := range resp.Blobs {
+                       t, err := time.Parse(time.RFC1123, b.Properties.LastModified)
+                       if err != nil {
+                               return err
+                       }
+                       fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, t.Unix())
+               }
+               if resp.NextMarker == "" {
+                       return nil
+               }
+               params.Marker = resp.NextMarker
+       }
+}
+
+func (v *AzureBlobVolume) Delete(loc string) error {
+       if v.readonly {
+               return MethodDisabledError
+       }
+       // Ideally we would use If-Unmodified-Since, but that
+       // particular condition seems to be ignored by Azure. Instead,
+       // we get the Etag before checking Mtime, and use If-Match to
+       // ensure we don't delete data if Put() or Touch() happens
+       // between our calls to Mtime() and DeleteBlob().
+       props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
+       if err != nil {
+               return err
+       }
+       if t, err := v.Mtime(loc); err != nil {
+               return err
+       } else if time.Since(t) < blobSignatureTTL {
+               return nil
+       }
+       return v.bsClient.DeleteBlob(v.containerName, loc, map[string]string{
+               "If-Match": props.Etag,
+       })
+}
+
+func (v *AzureBlobVolume) Status() *VolumeStatus {
+       return &VolumeStatus{
+               DeviceNum: 1,
+               BytesFree: BlockSize * 1000,
+               BytesUsed: 1,
+       }
+}
+
+func (v *AzureBlobVolume) String() string {
+       return fmt.Sprintf("azure-storage-container:%+q", v.containerName)
+}
+
+func (v *AzureBlobVolume) Writable() bool {
+       return !v.readonly
+}
+
+func (v *AzureBlobVolume) Replication() int {
+       return v.replication
+}
diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go
new file mode 100644 (file)
index 0000000..66b0ea0
--- /dev/null
@@ -0,0 +1,353 @@
+package main
+
+import (
+       "encoding/base64"
+       "encoding/xml"
+       "flag"
+       "fmt"
+       "io/ioutil"
+       "log"
+       "math/rand"
+       "net"
+       "net/http"
+       "net/http/httptest"
+       "regexp"
+       "sort"
+       "strconv"
+       "strings"
+       "sync"
+       "testing"
+       "time"
+
+       "github.com/curoverse/azure-sdk-for-go/storage"
+)
+
+const (
+       // The same fake credentials used by Microsoft's Azure emulator
+       emulatorAccountName = "devstoreaccount1"
+       emulatorAccountKey  = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
+)
+
+var azureTestContainer string
+
+func init() {
+       flag.StringVar(
+               &azureTestContainer,
+               "test.azure-storage-container-volume",
+               "",
+               "Name of Azure container to use for testing. Do not use a container with real data! Use -azure-storage-account-name and -azure-storage-key-file arguments to supply credentials.")
+}
+
+type azBlob struct {
+       Data        []byte
+       Etag        string
+       Metadata    map[string]string
+       Mtime       time.Time
+       Uncommitted map[string][]byte
+}
+
+type azStubHandler struct {
+       sync.Mutex
+       blobs map[string]*azBlob
+}
+
+func newAzStubHandler() *azStubHandler {
+       return &azStubHandler{
+               blobs: make(map[string]*azBlob),
+       }
+}
+
+func (h *azStubHandler) TouchWithDate(container, hash string, t time.Time) {
+       if blob, ok := h.blobs[container+"|"+hash]; !ok {
+               return
+       } else {
+               blob.Mtime = t
+       }
+}
+
+func (h *azStubHandler) PutRaw(container, hash string, data []byte) {
+       h.Lock()
+       defer h.Unlock()
+       h.blobs[container+"|"+hash] = &azBlob{
+               Data:        data,
+               Mtime:       time.Now(),
+               Uncommitted: make(map[string][]byte),
+       }
+}
+
+func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
+       h.Lock()
+       defer h.Unlock()
+       // defer log.Printf("azStubHandler: %+v", r)
+
+       path := strings.Split(r.URL.Path, "/")
+       container := path[1]
+       hash := ""
+       if len(path) > 2 {
+               hash = path[2]
+       }
+
+       if err := r.ParseForm(); err != nil {
+               log.Printf("azStubHandler(%+v): %s", r, err)
+               rw.WriteHeader(http.StatusBadRequest)
+               return
+       }
+
+       body, err := ioutil.ReadAll(r.Body)
+       if err != nil {
+               return
+       }
+
+       type blockListRequestBody struct {
+               XMLName     xml.Name `xml:"BlockList"`
+               Uncommitted []string
+       }
+
+       blob, blobExists := h.blobs[container+"|"+hash]
+
+       switch {
+       case r.Method == "PUT" && r.Form.Get("comp") == "":
+               // "Put Blob" API
+               h.blobs[container+"|"+hash] = &azBlob{
+                       Data:        body,
+                       Mtime:       time.Now(),
+                       Uncommitted: make(map[string][]byte),
+                       Etag:        makeEtag(),
+               }
+               rw.WriteHeader(http.StatusCreated)
+       case r.Method == "PUT" && r.Form.Get("comp") == "block":
+               // "Put Block" API
+               if !blobExists {
+                       log.Printf("Got block for nonexistent blob: %+v", r)
+                       rw.WriteHeader(http.StatusBadRequest)
+                       return
+               }
+               blockID, err := base64.StdEncoding.DecodeString(r.Form.Get("blockid"))
+               if err != nil || len(blockID) == 0 {
+                       log.Printf("Invalid blockid: %+q", r.Form.Get("blockid"))
+                       rw.WriteHeader(http.StatusBadRequest)
+                       return
+               }
+               blob.Uncommitted[string(blockID)] = body
+               rw.WriteHeader(http.StatusCreated)
+       case r.Method == "PUT" && r.Form.Get("comp") == "blocklist":
+               // "Put Block List" API
+               bl := &blockListRequestBody{}
+               if err := xml.Unmarshal(body, bl); err != nil {
+                       log.Printf("xml Unmarshal: %s", err)
+                       rw.WriteHeader(http.StatusBadRequest)
+                       return
+               }
+               for _, encBlockID := range bl.Uncommitted {
+                       blockID, err := base64.StdEncoding.DecodeString(encBlockID)
+                       if err != nil || len(blockID) == 0 || blob.Uncommitted[string(blockID)] == nil {
+                               log.Printf("Invalid blockid: %+q", encBlockID)
+                               rw.WriteHeader(http.StatusBadRequest)
+                               return
+                       }
+                       blob.Data = blob.Uncommitted[string(blockID)]
+                       blob.Etag = makeEtag()
+                       blob.Mtime = time.Now()
+                       delete(blob.Uncommitted, string(blockID))
+               }
+               rw.WriteHeader(http.StatusCreated)
+       case r.Method == "PUT" && r.Form.Get("comp") == "metadata":
+               // "Set Metadata Headers" API. We don't bother
+               // stubbing "Get Metadata Headers": AzureBlobVolume
+               // sets metadata headers only as a way to bump Etag
+               // and Last-Modified.
+               if !blobExists {
+                       log.Printf("Got metadata for nonexistent blob: %+v", r)
+                       rw.WriteHeader(http.StatusBadRequest)
+                       return
+               }
+               blob.Metadata = make(map[string]string)
+               for k, v := range r.Header {
+                       if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
+                               blob.Metadata[k] = v[0]
+                       }
+               }
+               blob.Mtime = time.Now()
+               blob.Etag = makeEtag()
+       case (r.Method == "GET" || r.Method == "HEAD") && hash != "":
+               // "Get Blob" API
+               if !blobExists {
+                       rw.WriteHeader(http.StatusNotFound)
+                       return
+               }
+               rw.Header().Set("Last-Modified", blob.Mtime.Format(time.RFC1123))
+               rw.Header().Set("Content-Length", strconv.Itoa(len(blob.Data)))
+               if r.Method == "GET" {
+                       if _, err := rw.Write(blob.Data); err != nil {
+                               log.Printf("write %+q: %s", blob.Data, err)
+                       }
+               }
+       case r.Method == "DELETE" && hash != "":
+               // "Delete Blob" API
+               if !blobExists {
+                       rw.WriteHeader(http.StatusNotFound)
+                       return
+               }
+               delete(h.blobs, container+"|"+hash)
+               rw.WriteHeader(http.StatusAccepted)
+       case r.Method == "GET" && r.Form.Get("comp") == "list" && r.Form.Get("restype") == "container":
+               // "List Blobs" API
+               prefix := container + "|" + r.Form.Get("prefix")
+               marker := r.Form.Get("marker")
+
+               maxResults := 2
+               if n, err := strconv.Atoi(r.Form.Get("maxresults")); err == nil && n >= 1 && n <= 5000 {
+                       maxResults = n
+               }
+
+               resp := storage.BlobListResponse{
+                       Marker:     marker,
+                       NextMarker: "",
+                       MaxResults: int64(maxResults),
+               }
+               var hashes sort.StringSlice
+               for k := range h.blobs {
+                       if strings.HasPrefix(k, prefix) {
+                               hashes = append(hashes, k[len(container)+1:])
+                       }
+               }
+               hashes.Sort()
+               for _, hash := range hashes {
+                       if len(resp.Blobs) == maxResults {
+                               resp.NextMarker = hash
+                               break
+                       }
+                       if len(resp.Blobs) > 0 || marker == "" || marker == hash {
+                               blob := h.blobs[container+"|"+hash]
+                               resp.Blobs = append(resp.Blobs, storage.Blob{
+                                       Name: hash,
+                                       Properties: storage.BlobProperties{
+                                               LastModified:  blob.Mtime.Format(time.RFC1123),
+                                               ContentLength: int64(len(blob.Data)),
+                                               Etag:          blob.Etag,
+                                       },
+                               })
+                       }
+               }
+               buf, err := xml.Marshal(resp)
+               if err != nil {
+                       log.Print(err)
+                       rw.WriteHeader(http.StatusInternalServerError)
+               }
+               rw.Write(buf)
+       default:
+               log.Printf("azStubHandler: not implemented: %+v Body:%+q", r, body)
+               rw.WriteHeader(http.StatusNotImplemented)
+       }
+}
+
+// azStubDialer is a net.Dialer that notices when the Azure driver
+// tries to connect to "devstoreaccount1.blob.127.0.0.1:46067", and
+// in such cases transparently dials "127.0.0.1:46067" instead.
+type azStubDialer struct {
+       net.Dialer
+}
+
+var localHostPortRe = regexp.MustCompile(`(127\.0\.0\.1|localhost|\[::1\]):\d+`)
+
+func (d *azStubDialer) Dial(network, address string) (net.Conn, error) {
+       if hp := localHostPortRe.FindString(address); hp != "" {
+               log.Println("azStubDialer: dial", hp, "instead of", address)
+               address = hp
+       }
+       return d.Dialer.Dial(network, address)
+}
+
+type TestableAzureBlobVolume struct {
+       *AzureBlobVolume
+       azHandler *azStubHandler
+       azStub    *httptest.Server
+       t         *testing.T
+}
+
+func NewTestableAzureBlobVolume(t *testing.T, readonly bool, replication int) TestableVolume {
+       azHandler := newAzStubHandler()
+       azStub := httptest.NewServer(azHandler)
+
+       var azClient storage.Client
+
+       container := azureTestContainer
+       if container == "" {
+               // Connect to stub instead of real Azure storage service
+               stubURLBase := strings.Split(azStub.URL, "://")[1]
+               var err error
+               if azClient, err = storage.NewClient(emulatorAccountName, emulatorAccountKey, stubURLBase, storage.DefaultAPIVersion, false); err != nil {
+                       t.Fatal(err)
+               }
+               container = "fakecontainername"
+       } else {
+               // Connect to real Azure storage service
+               accountKey, err := readKeyFromFile(azureStorageAccountKeyFile)
+               if err != nil {
+                       t.Fatal(err)
+               }
+               azClient, err = storage.NewBasicClient(azureStorageAccountName, accountKey)
+               if err != nil {
+                       t.Fatal(err)
+               }
+       }
+
+       v := NewAzureBlobVolume(azClient, container, readonly, replication)
+
+       return &TestableAzureBlobVolume{
+               AzureBlobVolume: v,
+               azHandler:       azHandler,
+               azStub:          azStub,
+               t:               t,
+       }
+}
+
+func TestAzureBlobVolumeWithGeneric(t *testing.T) {
+       defer func(t http.RoundTripper) {
+               http.DefaultTransport = t
+       }(http.DefaultTransport)
+       http.DefaultTransport = &http.Transport{
+               Dial: (&azStubDialer{}).Dial,
+       }
+       DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
+               return NewTestableAzureBlobVolume(t, false, azureStorageReplication)
+       })
+}
+
+func TestReadonlyAzureBlobVolumeWithGeneric(t *testing.T) {
+       defer func(t http.RoundTripper) {
+               http.DefaultTransport = t
+       }(http.DefaultTransport)
+       http.DefaultTransport = &http.Transport{
+               Dial: (&azStubDialer{}).Dial,
+       }
+       DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
+               return NewTestableAzureBlobVolume(t, true, azureStorageReplication)
+       })
+}
+
+func TestAzureBlobVolumeReplication(t *testing.T) {
+       for r := 1; r <= 4; r++ {
+               v := NewTestableAzureBlobVolume(t, false, r)
+               defer v.Teardown()
+               if n := v.Replication(); n != r {
+                       t.Errorf("Got replication %d, expected %d", n, r)
+               }
+       }
+}
+
+func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
+       v.azHandler.PutRaw(v.containerName, locator, data)
+}
+
+func (v *TestableAzureBlobVolume) TouchWithDate(locator string, lastPut time.Time) {
+       v.azHandler.TouchWithDate(v.containerName, locator, lastPut)
+}
+
+func (v *TestableAzureBlobVolume) Teardown() {
+       v.azStub.Close()
+}
+
+func makeEtag() string {
+       return fmt.Sprintf("0x%x", rand.Int63())
+}
index be26514a00ce6ae9092bf12981f5f55818a083f1..a4af563729b3cf0e72686a2913fd3664e497e24d 100644 (file)
@@ -1,6 +1,7 @@
 package main
 
 import (
+       "bytes"
        "crypto/md5"
        "fmt"
        "io"
@@ -47,3 +48,37 @@ func collisionOrCorrupt(expectMD5 string, buf1, buf2 []byte, rdr io.Reader) erro
        }
        return <-outcome
 }
+
+func compareReaderWithBuf(rdr io.Reader, expect []byte, hash string) error {
+       bufLen := 1 << 20
+       if bufLen > len(expect) && len(expect) > 0 {
+               // No need for bufLen to be longer than
+               // expect, except that len(buf)==0 would
+               // prevent us from handling empty readers the
+               // same way as non-empty readers: reading 0
+               // bytes at a time never reaches EOF.
+               bufLen = len(expect)
+       }
+       buf := make([]byte, bufLen)
+       cmp := expect
+
+       // Loop invariants: all data read so far matched what
+       // we expected, and the first N bytes of cmp are
+       // expected to equal the next N bytes read from
+       // rdr.
+       for {
+               n, err := rdr.Read(buf)
+               if n > len(cmp) || bytes.Compare(cmp[:n], buf[:n]) != 0 {
+                       return collisionOrCorrupt(hash, expect[:len(expect)-len(cmp)], buf[:n], rdr)
+               }
+               cmp = cmp[n:]
+               if err == io.EOF {
+                       if len(cmp) != 0 {
+                               return collisionOrCorrupt(hash, expect[:len(expect)-len(cmp)], nil, nil)
+                       }
+                       return nil
+               } else if err != nil {
+                       return err
+               }
+       }
+}
index dd8366671b6b056f8e4161cfae8169cd6a086cc6..3817ea19002d1c18f14c2479a383fb2d1601d763 100644 (file)
@@ -321,6 +321,16 @@ func TestIndexHandler(t *testing.T) {
                uri:      "/index/" + TestHash[0:3],
                apiToken: dataManagerToken,
        }
+       superuserNoSuchPrefixReq := &RequestTester{
+               method:   "GET",
+               uri:      "/index/abcd",
+               apiToken: dataManagerToken,
+       }
+       superuserInvalidPrefixReq := &RequestTester{
+               method:   "GET",
+               uri:      "/index/xyz",
+               apiToken: dataManagerToken,
+       }
 
        // -------------------------------------------------------------
        // Only the superuser should be allowed to issue /index requests.
@@ -407,6 +417,26 @@ func TestIndexHandler(t *testing.T) {
                        "permissions on, superuser /index/prefix request: expected %s, got:\n%s",
                        expected, response.Body.String())
        }
+
+       // superuser /index/{no-such-prefix} request
+       // => OK
+       response = IssueRequest(superuserNoSuchPrefixReq)
+       ExpectStatusCode(t,
+               "permissions on, superuser request",
+               http.StatusOK,
+               response)
+
+       if "\n" != response.Body.String() {
+               t.Errorf("Expected empty response for %s. Found %s", superuserNoSuchPrefixReq.uri, response.Body.String())
+       }
+
+       // superuser /index/{invalid-prefix} request
+       // => StatusBadRequest
+       response = IssueRequest(superuserInvalidPrefixReq)
+       ExpectStatusCode(t,
+               "permissions on, superuser request",
+               http.StatusBadRequest,
+               response)
 }
 
 // TestDeleteHandler
@@ -924,3 +954,19 @@ func TestGetHandlerNoBufferleak(t *testing.T) {
        case <-ok:
        }
 }
+
+func TestPutReplicationHeader(t *testing.T) {
+       defer teardown()
+
+       KeepVM = MakeTestVolumeManager(2)
+       defer KeepVM.Close()
+
+       resp := IssueRequest(&RequestTester{
+               method:      "PUT",
+               uri:         "/" + TestHash,
+               requestBody: TestBlock,
+       })
+       if r := resp.Header().Get("X-Keep-Replicas-Stored"); r != "1" {
+               t.Errorf("Got X-Keep-Replicas-Stored: %q, expected %q", r, "1")
+       }
+}
index 2b96dbc582f8b584e731401f3e467dae635ea837..95af1b48707c6b189982dc18762cb517769bd117 100644 (file)
@@ -120,7 +120,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
                return
        }
 
-       err = PutBlock(buf, hash)
+       replication, err := PutBlock(buf, hash)
        bufs.Put(buf)
 
        if err != nil {
@@ -137,6 +137,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
                expiry := time.Now().Add(blobSignatureTTL)
                returnHash = SignLocator(returnHash, apiToken, expiry)
        }
+       resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
        resp.Write([]byte(returnHash + "\n"))
 }
 
@@ -517,40 +518,40 @@ func GetBlock(hash string) ([]byte, error) {
 //          all writes failed). The text of the error message should
 //          provide as much detail as possible.
 //
-func PutBlock(block []byte, hash string) error {
+func PutBlock(block []byte, hash string) (int, error) {
        // Check that BLOCK's checksum matches HASH.
        blockhash := fmt.Sprintf("%x", md5.Sum(block))
        if blockhash != hash {
                log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
-               return RequestHashError
+               return 0, RequestHashError
        }
 
        // If we already have this data, it's intact on disk, and we
        // can update its timestamp, return success. If we have
        // different data with the same hash, return failure.
-       if err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
-               return err
+       if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
+               return n, err
        }
 
        // Choose a Keep volume to write to.
        // If this volume fails, try all of the volumes in order.
        if vol := KeepVM.NextWritable(); vol != nil {
                if err := vol.Put(hash, block); err == nil {
-                       return nil // success!
+                       return vol.Replication(), nil // success!
                }
        }
 
        writables := KeepVM.AllWritable()
        if len(writables) == 0 {
                log.Print("No writable volumes.")
-               return FullError
+               return 0, FullError
        }
 
        allFull := true
        for _, vol := range writables {
                err := vol.Put(hash, block)
                if err == nil {
-                       return nil // success!
+                       return vol.Replication(), nil // success!
                }
                if err != FullError {
                        // The volume is not full but the
@@ -563,17 +564,18 @@ func PutBlock(block []byte, hash string) error {
 
        if allFull {
                log.Print("All volumes are full.")
-               return FullError
+               return 0, FullError
        }
        // Already logged the non-full errors.
-       return GenericError
+       return 0, GenericError
 }
 
-// CompareAndTouch returns nil if one of the volumes already has the
-// given content and it successfully updates the relevant block's
-// modification time in order to protect it from premature garbage
-// collection.
-func CompareAndTouch(hash string, buf []byte) error {
+// CompareAndTouch returns the current replication level if one of the
+// volumes already has the given content and it successfully updates
+// the relevant block's modification time in order to protect it from
+// premature garbage collection. Otherwise, it returns a non-nil
+// error.
+func CompareAndTouch(hash string, buf []byte) (int, error) {
        var bestErr error = NotFoundError
        for _, vol := range KeepVM.AllWritable() {
                if err := vol.Compare(hash, buf); err == CollisionError {
@@ -583,7 +585,7 @@ func CompareAndTouch(hash string, buf []byte) error {
                        // both, so there's no point writing it even
                        // on a different volume.)
                        log.Printf("%s: Compare(%s): %s", vol, hash, err)
-                       return err
+                       return 0, err
                } else if os.IsNotExist(err) {
                        // Block does not exist. This is the only
                        // "normal" error: we don't log anything.
@@ -601,9 +603,9 @@ func CompareAndTouch(hash string, buf []byte) error {
                        continue
                }
                // Compare and Touch both worked --> done.
-               return nil
+               return vol.Replication(), nil
        }
-       return bestErr
+       return 0, bestErr
 }
 
 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
index 90094f311722c1cdd71ba94f51d87a9f867a8b57..9f31f5f6fe5f31d38745046f7a771982f6b2ad00 100644 (file)
@@ -76,12 +76,12 @@ func testPutBlock(t *testing.T, factory TestableVolumeManagerFactory, testHash s
        setupHandlersWithGenericVolumeTest(t, factory)
 
        // PutBlock
-       if err := PutBlock(testBlock, testHash); err != nil {
+       if _, err := PutBlock(testBlock, testHash); err != nil {
                t.Fatalf("Error during PutBlock: %s", err)
        }
 
        // Check that PutBlock succeeds again even after CompareAndTouch
-       if err := PutBlock(testBlock, testHash); err != nil {
+       if _, err := PutBlock(testBlock, testHash); err != nil {
                t.Fatalf("Error during PutBlock: %s", err)
        }
 
@@ -104,7 +104,7 @@ func testPutBlockCorrupt(t *testing.T, factory TestableVolumeManagerFactory,
        testableVolumes[1].PutRaw(testHash, badData)
 
        // Check that PutBlock with good data succeeds
-       if err := PutBlock(testBlock, testHash); err != nil {
+       if _, err := PutBlock(testBlock, testHash); err != nil {
                t.Fatalf("Error during PutBlock for %q: %s", testHash, err)
        }
 
index 3e360e1799117e80e773e1e5c58fa3b5560b07ef..2528f6d6a6c4dbf2f4b509e670c834aa10b9e618 100644 (file)
@@ -1,9 +1,7 @@
 package main
 
 import (
-       "bufio"
        "bytes"
-       "errors"
        "flag"
        "fmt"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
@@ -14,7 +12,6 @@ import (
        "os"
        "os/signal"
        "strings"
-       "sync"
        "syscall"
        "time"
 )
@@ -114,95 +111,16 @@ var KeepVM VolumeManager
 var pullq *WorkQueue
 var trashq *WorkQueue
 
+type volumeSet []Volume
+
 var (
        flagSerializeIO bool
        flagReadonly    bool
+       volumes         volumeSet
 )
 
-type volumeSet []Volume
-
-func (vs *volumeSet) Set(value string) error {
-       if dirs := strings.Split(value, ","); len(dirs) > 1 {
-               log.Print("DEPRECATED: using comma-separated volume list.")
-               for _, dir := range dirs {
-                       if err := vs.Set(dir); err != nil {
-                               return err
-                       }
-               }
-               return nil
-       }
-       if len(value) == 0 || value[0] != '/' {
-               return errors.New("Invalid volume: must begin with '/'.")
-       }
-       if _, err := os.Stat(value); err != nil {
-               return err
-       }
-       var locker sync.Locker
-       if flagSerializeIO {
-               locker = &sync.Mutex{}
-       }
-       *vs = append(*vs, &UnixVolume{
-               root:     value,
-               locker:   locker,
-               readonly: flagReadonly,
-       })
-       return nil
-}
-
 func (vs *volumeSet) String() string {
-       s := "["
-       for i, v := range *vs {
-               if i > 0 {
-                       s = s + " "
-               }
-               s = s + v.String()
-       }
-       return s + "]"
-}
-
-// Discover adds a volume for every directory named "keep" that is
-// located at the top level of a device- or tmpfs-backed mount point
-// other than "/". It returns the number of volumes added.
-func (vs *volumeSet) Discover() int {
-       added := 0
-       f, err := os.Open(ProcMounts)
-       if err != nil {
-               log.Fatalf("opening %s: %s", ProcMounts, err)
-       }
-       scanner := bufio.NewScanner(f)
-       for scanner.Scan() {
-               args := strings.Fields(scanner.Text())
-               if err := scanner.Err(); err != nil {
-                       log.Fatalf("reading %s: %s", ProcMounts, err)
-               }
-               dev, mount := args[0], args[1]
-               if mount == "/" {
-                       continue
-               }
-               if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
-                       continue
-               }
-               keepdir := mount + "/keep"
-               if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
-                       continue
-               }
-               // Set the -readonly flag (but only for this volume)
-               // if the filesystem is mounted readonly.
-               flagReadonlyWas := flagReadonly
-               for _, fsopt := range strings.Split(args[3], ",") {
-                       if fsopt == "ro" {
-                               flagReadonly = true
-                               break
-                       }
-                       if fsopt == "rw" {
-                               break
-                       }
-               }
-               vs.Set(keepdir)
-               flagReadonly = flagReadonlyWas
-               added++
-       }
-       return added
+       return fmt.Sprintf("%+v", (*vs)[:])
 }
 
 // TODO(twp): continue moving as much code as possible out of main
@@ -219,7 +137,6 @@ func main() {
                listen               string
                blobSigningKeyFile   string
                permissionTTLSec     int
-               volumes              volumeSet
                pidfile              string
        )
        flag.StringVar(
@@ -276,14 +193,6 @@ func main() {
                "readonly",
                false,
                "Do not write, delete, or touch anything on the following volumes.")
-       flag.Var(
-               &volumes,
-               "volumes",
-               "Deprecated synonym for -volume.")
-       flag.Var(
-               &volumes,
-               "volume",
-               "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
        flag.StringVar(
                &pidfile,
                "pid",
@@ -328,7 +237,7 @@ func main() {
        }
 
        if len(volumes) == 0 {
-               if volumes.Discover() == 0 {
+               if (&unixVolumeAdder{&volumes}).Discover() == 0 {
                        log.Fatal("No volumes found.")
                }
        }
index 0e2129cc848d4f4bc7b2b91c17d2fa3115ae7732..9b132927c6a773a1d396e53da813e306698b5242 100644 (file)
@@ -126,8 +126,8 @@ func TestPutBlockOK(t *testing.T) {
        defer KeepVM.Close()
 
        // Check that PutBlock stores the data as expected.
-       if err := PutBlock(TestBlock, TestHash); err != nil {
-               t.Fatalf("PutBlock: %v", err)
+       if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+               t.Fatalf("PutBlock: n %d err %v", n, err)
        }
 
        vols := KeepVM.AllReadable()
@@ -156,8 +156,8 @@ func TestPutBlockOneVol(t *testing.T) {
        vols[0].(*MockVolume).Bad = true
 
        // Check that PutBlock stores the data as expected.
-       if err := PutBlock(TestBlock, TestHash); err != nil {
-               t.Fatalf("PutBlock: %v", err)
+       if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+               t.Fatalf("PutBlock: n %d err %v", n, err)
        }
 
        result, err := GetBlock(TestHash)
@@ -184,7 +184,7 @@ func TestPutBlockMD5Fail(t *testing.T) {
 
        // Check that PutBlock returns the expected error when the hash does
        // not match the block.
-       if err := PutBlock(BadBlock, TestHash); err != RequestHashError {
+       if _, err := PutBlock(BadBlock, TestHash); err != RequestHashError {
                t.Error("Expected RequestHashError, got %v", err)
        }
 
@@ -209,8 +209,8 @@ func TestPutBlockCorrupt(t *testing.T) {
        // Store a corrupted block under TestHash.
        vols := KeepVM.AllWritable()
        vols[0].Put(TestHash, BadBlock)
-       if err := PutBlock(TestBlock, TestHash); err != nil {
-               t.Errorf("PutBlock: %v", err)
+       if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+               t.Errorf("PutBlock: n %d err %v", n, err)
        }
 
        // The block on disk should now match TestBlock.
@@ -239,10 +239,10 @@ func TestPutBlockCollision(t *testing.T) {
 
        // Store one block, then attempt to store the other. Confirm that
        // PutBlock reported a CollisionError.
-       if err := PutBlock(b1, locator); err != nil {
+       if _, err := PutBlock(b1, locator); err != nil {
                t.Error(err)
        }
-       if err := PutBlock(b2, locator); err == nil {
+       if _, err := PutBlock(b2, locator); err == nil {
                t.Error("PutBlock did not report a collision")
        } else if err != CollisionError {
                t.Errorf("PutBlock returned %v", err)
@@ -273,8 +273,8 @@ func TestPutBlockTouchFails(t *testing.T) {
        // vols[0].Touch will fail on the next call, so the volume
        // manager will store a copy on vols[1] instead.
        vols[0].(*MockVolume).Touchable = false
-       if err := PutBlock(TestBlock, TestHash); err != nil {
-               t.Fatalf("PutBlock: %v", err)
+       if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+               t.Fatalf("PutBlock: n %d err %v", n, err)
        }
        vols[0].(*MockVolume).Touchable = true
 
@@ -333,8 +333,8 @@ func TestDiscoverTmpfs(t *testing.T) {
        f.Close()
        ProcMounts = f.Name()
 
-       var resultVols volumeSet
-       added := resultVols.Discover()
+       resultVols := volumeSet{}
+       added := (&unixVolumeAdder{&resultVols}).Discover()
 
        if added != len(resultVols) {
                t.Errorf("Discover returned %d, but added %d volumes",
@@ -373,8 +373,8 @@ func TestDiscoverNone(t *testing.T) {
        f.Close()
        ProcMounts = f.Name()
 
-       var resultVols volumeSet
-       added := resultVols.Discover()
+       resultVols := volumeSet{}
+       added := (&unixVolumeAdder{&resultVols}).Discover()
        if added != 0 || len(resultVols) != 0 {
                t.Fatalf("got %d, %v; expected 0, []", added, resultVols)
        }
index 9f0b96fa35b1c32af4b06be92ea02632547ef249..2626d4bf68e1594f394ad4539f0f32a90fe00339 100644 (file)
@@ -95,6 +95,6 @@ func GenerateRandomAPIToken() string {
 
 // Put block
 var PutContent = func(content []byte, locator string) (err error) {
-       err = PutBlock(content, locator)
+       _, err = PutBlock(content, locator)
        return
 }
index 9bf291bdea28dce8b05baae738c8d42fbca7481c..7966c41b92bd89958308ec77765f0b7a5a1f0fd9 100644 (file)
@@ -195,6 +195,11 @@ type Volume interface {
        // will fail because it is full, but Mtime or Delete can
        // succeed -- then Writable should return false.
        Writable() bool
+
+       // Replication returns the storage redundancy of the
+       // underlying device. It will be passed on to clients in
+       // responses to PUT requests.
+       Replication() int
 }
 
 // A VolumeManager tells callers which volumes can read, which volumes
index c08c3f5f0007ac483486cb01d1e6a4bd68253d42..6dca74eaf2100e7003f9727041dbe07d999d632a 100644 (file)
@@ -70,7 +70,7 @@ func testGet(t *testing.T, factory TestableVolumeFactory) {
 
        buf, err := v.Get(TestHash)
        if err != nil {
-               t.Error(err)
+               t.Fatal(err)
        }
 
        bufs.Put(buf)
@@ -182,14 +182,14 @@ func testPutBlockWithDifferentContent(t *testing.T, factory TestableVolumeFactor
                // Put must not return a nil error unless it has
                // overwritten the existing data.
                if bytes.Compare(buf, testDataB) != 0 {
-                       t.Errorf("Put succeeded but Get returned %+v, expected %+v", buf, testDataB)
+                       t.Errorf("Put succeeded but Get returned %+q, expected %+q", buf, testDataB)
                }
        } else {
                // It is permissible for Put to fail, but it must
                // leave us with either the original data, the new
                // data, or nothing at all.
                if getErr == nil && bytes.Compare(buf, testDataA) != 0 && bytes.Compare(buf, testDataB) != 0 {
-                       t.Errorf("Put failed but Get returned %+v, which is neither %+v nor %+v", buf, testDataA, testDataB)
+                       t.Errorf("Put failed but Get returned %+q, which is neither %+q nor %+q", buf, testDataA, testDataB)
                }
        }
        if getErr == nil {
@@ -225,26 +225,32 @@ func testPutMultipleBlocks(t *testing.T, factory TestableVolumeFactory) {
        data, err := v.Get(TestHash)
        if err != nil {
                t.Error(err)
-       } else if bytes.Compare(data, TestBlock) != 0 {
-               t.Errorf("Block present, but content is incorrect: Expected: %v  Found: %v", data, TestBlock)
+       } else {
+               if bytes.Compare(data, TestBlock) != 0 {
+                       t.Errorf("Block present, but got %+q, expected %+q", data, TestBlock)
+               }
+               bufs.Put(data)
        }
-       bufs.Put(data)
 
        data, err = v.Get(TestHash2)
        if err != nil {
                t.Error(err)
-       } else if bytes.Compare(data, TestBlock2) != 0 {
-               t.Errorf("Block present, but content is incorrect: Expected: %v  Found: %v", data, TestBlock2)
+       } else {
+               if bytes.Compare(data, TestBlock2) != 0 {
+                       t.Errorf("Block present, but got %+q, expected %+q", data, TestBlock2)
+               }
+               bufs.Put(data)
        }
-       bufs.Put(data)
 
        data, err = v.Get(TestHash3)
        if err != nil {
                t.Error(err)
-       } else if bytes.Compare(data, TestBlock3) != 0 {
-               t.Errorf("Block present, but content is incorrect: Expected: %v  Found: %v", data, TestBlock3)
+       } else {
+               if bytes.Compare(data, TestBlock3) != 0 {
+                       t.Errorf("Block present, but to %+q, expected %+q", data, TestBlock3)
+               }
+               bufs.Put(data)
        }
-       bufs.Put(data)
 }
 
 // testPutAndTouch
@@ -371,6 +377,7 @@ func testIndexTo(t *testing.T, factory TestableVolumeFactory) {
 func testDeleteNewBlock(t *testing.T, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
+       blobSignatureTTL = 300 * time.Second
 
        if v.Writable() == false {
                return
@@ -384,10 +391,12 @@ func testDeleteNewBlock(t *testing.T, factory TestableVolumeFactory) {
        data, err := v.Get(TestHash)
        if err != nil {
                t.Error(err)
-       } else if bytes.Compare(data, TestBlock) != 0 {
-               t.Error("Block still present, but content is incorrect: %+v != %+v", data, TestBlock)
+       } else {
+               if bytes.Compare(data, TestBlock) != 0 {
+                       t.Errorf("Got data %+q, expected %+q", data, TestBlock)
+               }
+               bufs.Put(data)
        }
-       bufs.Put(data)
 }
 
 // Calling Delete() for a block with a timestamp older than
@@ -396,19 +405,20 @@ func testDeleteNewBlock(t *testing.T, factory TestableVolumeFactory) {
 func testDeleteOldBlock(t *testing.T, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
+       blobSignatureTTL = 300 * time.Second
 
        if v.Writable() == false {
                return
        }
 
        v.Put(TestHash, TestBlock)
-       v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL*time.Second))
+       v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
 
        if err := v.Delete(TestHash); err != nil {
                t.Error(err)
        }
        if _, err := v.Get(TestHash); err == nil || !os.IsNotExist(err) {
-               t.Errorf("os.IsNotExist(%v) should have been true", err.Error())
+               t.Errorf("os.IsNotExist(%v) should have been true", err)
        }
 }
 
index f272c84c837676b12cc45ad3b2e962f635e746b1..d6714365de5bef98ad082b93f595231993bafa48 100644 (file)
@@ -214,3 +214,7 @@ func (v *MockVolume) String() string {
 func (v *MockVolume) Writable() bool {
        return !v.Readonly
 }
+
+func (v *MockVolume) Replication() int {
+       return 1
+}
index 6c0f5c4e978d995b5c9308f52d5d2414045d64fc..98c31d1eab6d0c18f3d242daf898c3d25345e490 100644 (file)
@@ -1,7 +1,9 @@
 package main
 
 import (
-       "bytes"
+       "bufio"
+       "errors"
+       "flag"
        "fmt"
        "io"
        "io/ioutil"
@@ -16,6 +18,97 @@ import (
        "time"
 )
 
+type unixVolumeAdder struct {
+       *volumeSet
+}
+
+func (vs *unixVolumeAdder) Set(value string) error {
+       if dirs := strings.Split(value, ","); len(dirs) > 1 {
+               log.Print("DEPRECATED: using comma-separated volume list.")
+               for _, dir := range dirs {
+                       if err := vs.Set(dir); err != nil {
+                               return err
+                       }
+               }
+               return nil
+       }
+       if len(value) == 0 || value[0] != '/' {
+               return errors.New("Invalid volume: must begin with '/'.")
+       }
+       if _, err := os.Stat(value); err != nil {
+               return err
+       }
+       var locker sync.Locker
+       if flagSerializeIO {
+               locker = &sync.Mutex{}
+       }
+       *vs.volumeSet = append(*vs.volumeSet, &UnixVolume{
+               root:     value,
+               locker:   locker,
+               readonly: flagReadonly,
+       })
+       return nil
+}
+
+func init() {
+       flag.Var(
+               &unixVolumeAdder{&volumes},
+               "volumes",
+               "Deprecated synonym for -volume.")
+       flag.Var(
+               &unixVolumeAdder{&volumes},
+               "volume",
+               "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
+}
+
+// Discover adds a UnixVolume for every directory named "keep" that is
+// located at the top level of a device- or tmpfs-backed mount point
+// other than "/". It returns the number of volumes added.
+func (vs *unixVolumeAdder) Discover() int {
+       added := 0
+       f, err := os.Open(ProcMounts)
+       if err != nil {
+               log.Fatalf("opening %s: %s", ProcMounts, err)
+       }
+       scanner := bufio.NewScanner(f)
+       for scanner.Scan() {
+               args := strings.Fields(scanner.Text())
+               if err := scanner.Err(); err != nil {
+                       log.Fatalf("reading %s: %s", ProcMounts, err)
+               }
+               dev, mount := args[0], args[1]
+               if mount == "/" {
+                       continue
+               }
+               if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
+                       continue
+               }
+               keepdir := mount + "/keep"
+               if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
+                       continue
+               }
+               // Set the -readonly flag (but only for this volume)
+               // if the filesystem is mounted readonly.
+               flagReadonlyWas := flagReadonly
+               for _, fsopt := range strings.Split(args[3], ",") {
+                       if fsopt == "ro" {
+                               flagReadonly = true
+                               break
+                       }
+                       if fsopt == "rw" {
+                               break
+                       }
+               }
+               if err := vs.Set(keepdir); err != nil {
+                       log.Printf("adding %q: %s", keepdir, err)
+               } else {
+                       added++
+               }
+               flagReadonly = flagReadonlyWas
+       }
+       return added
+}
+
 // A UnixVolume stores and retrieves blocks in a local directory.
 type UnixVolume struct {
        // path to the volume's root directory
@@ -115,43 +208,11 @@ func (v *UnixVolume) Get(loc string) ([]byte, error) {
 // bytes.Compare(), but uses less memory.
 func (v *UnixVolume) Compare(loc string, expect []byte) error {
        path := v.blockPath(loc)
-       stat, err := v.stat(path)
-       if err != nil {
+       if _, err := v.stat(path); err != nil {
                return err
        }
-       bufLen := 1 << 20
-       if int64(bufLen) > stat.Size() {
-               bufLen = int(stat.Size())
-               if bufLen < 1 {
-                       // len(buf)==0 would prevent us from handling
-                       // empty files the same way as non-empty
-                       // files, because reading 0 bytes at a time
-                       // never reaches EOF.
-                       bufLen = 1
-               }
-       }
-       cmp := expect
-       buf := make([]byte, bufLen)
        return v.getFunc(path, func(rdr io.Reader) error {
-               // Loop invariants: all data read so far matched what
-               // we expected, and the first N bytes of cmp are
-               // expected to equal the next N bytes read from
-               // reader.
-               for {
-                       n, err := rdr.Read(buf)
-                       if n > len(cmp) || bytes.Compare(cmp[:n], buf[:n]) != 0 {
-                               return collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], buf[:n], rdr)
-                       }
-                       cmp = cmp[n:]
-                       if err == io.EOF {
-                               if len(cmp) != 0 {
-                                       return collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], nil, nil)
-                               }
-                               return nil
-                       } else if err != nil {
-                               return err
-                       }
-               }
+               return compareReaderWithBuf(rdr, expect, loc[:32])
        })
 }
 
@@ -406,6 +467,10 @@ func (v *UnixVolume) Writable() bool {
        return !v.readonly
 }
 
+func (v *UnixVolume) Replication() int {
+       return 1
+}
+
 // lockfile and unlockfile use flock(2) to manage kernel file locks.
 func lockfile(f *os.File) error {
        return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)