<notextile>
<pre><code><span class="userinput">sudo gpg --keyserver hkp://keys.gnupg.net --recv-keys 409B6B1796C275462A1703113804BB82D39DC0E3
\curl -sSL https://get.rvm.io | sudo bash -s stable --ruby=2.1
-sudo adduser "$USER" rvm
</span></code></pre></notextile>
Either log out and log back in to activate RVM, or explicitly load it in all open shells like this:
<pre><code><span class="userinput">sudo apt-get install \
gawk g++ gcc make libc6-dev libreadline6-dev zlib1g-dev libssl-dev \
libyaml-dev libsqlite3-dev sqlite3 autoconf libgdbm-dev \
- libncurses5-dev automake libtool bison pkg-config libffi-dev
+ libncurses5-dev automake libtool bison pkg-config libffi-dev curl
</span></code></pre></notextile>
Build and install Ruby:
<notextile>
<pre><code><span class="userinput">mkdir -p ~/src
cd ~/src
-curl http://cache.ruby-lang.org/pub/ruby/2.1/ruby-2.1.6.tar.gz | tar xz
-cd ruby-2.1.6
+curl http://cache.ruby-lang.org/pub/ruby/2.1/ruby-2.1.7.tar.gz | tar xz
+cd ruby-2.1.7
./configure --disable-install-rdoc
make
sudo make install
<notextile>
<ol>
-<li><a href="https://www.phusionpassenger.com/documentation/Users%20guide%20Nginx.html">Install Nginx and Phusion Passenger</a>.</li>
+<li><a href="https://www.phusionpassenger.com/library/walkthroughs/deploy/ruby/ownserver/nginx/oss/install_passenger_main.html">Install Nginx and Phusion Passenger</a>.</li>
<li><p>Puma is already included with the API server's gems. We recommend you run it as a service under <a href="http://smarden.org/runit/">runit</a> or a similar tool. Here's a sample runit script for that:</p>
---
layout: default
navsection: installguide
-title: Install Single Sign On (SSO) server
+title: Install the Single Sign On (SSO) server
...
-h2(#dependencies). Install dependencies
+h2(#dependencies). Install prerequisites
-h3(#install_git_curl). Install git and curl
-
-{% include 'install_git_curl' %}
+The Arvados package repository includes an SSO server package that can help automate much of the deployment.
h3(#install_ruby_and_bundler). Install Ruby and Bundler
{% include 'install_ruby_and_bundler' %}
-h3(#install_postgres). Install PostgreSQL
+h3(#install_web_server). Set up a Web server
+
+For best performance, we recommend you use Nginx as your Web server frontend with a Passenger backend to serve the SSO server. The Passenger team provides "Nginx + Passenger installation instructions":https://www.phusionpassenger.com/library/walkthroughs/deploy/ruby/ownserver/nginx/oss/install_passenger_main.html.
-{% include 'install_postgres' %}
+Follow the instructions until you see the section that says you are ready to deploy your Ruby application on the production server.
-h2(#install). Install SSO server
+h2(#install). Install the SSO server
-h3. Get SSO server code and run bundle
+On a Debian-based system, install the following package:
<notextile>
-<pre><code>~$ <span class="userinput">cd $HOME</span> # (or wherever you want to install)
-~$ <span class="userinput">git clone https://github.com/curoverse/sso-devise-omniauth-provider.git</span>
-~$ <span class="userinput">cd sso-devise-omniauth-provider</span>
-~/sso-devise-omniauth-provider$ <span class="userinput">bundle install --without=development</span>
-</code></pre></notextile>
+<pre><code>~$ <span class="userinput">sudo apt-get install arvados-sso-server</span>
+</code></pre>
+</notextile>
+
+On a Red Hat-based system, install the following package:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo yum install arvados-sso-server</span>
+</code></pre>
+</notextile>
-h2. Configure the SSO server
+h2(#configure). Configure the SSO server
-First, copy the example configuration file:
+The package has installed three configuration files in @/etc/arvados/sso@:
<notextile>
-<pre><code>~/sso-devise-omniauth-provider$ <span class="userinput">cp -i config/application.yml.example config/application.yml</span>
-</code></pre></notextile>
+<pre><code>/etc/arvados/sso/application.yml
+/etc/arvados/sso/database.yml
+/etc/arvados/sso/production.rb
+</code></pre>
+</notextile>
+
+The SSO server runs from the @/var/www/arvados-sso/current/@ directory. The files @/var/www/arvados-sso/current/config/application.yml@, @/var/www/arvados-sso/current/config/database.yml@ and @/var/www/arvados-sso/current/config/environments/production.rb@ are symlinked to the configuration files in @/etc/arvados/sso/@.
The SSO server reads the @config/application.yml@ file, as well as the @config/application.defaults.yml@ file. Values in @config/application.yml@ take precedence over the defaults that are defined in @config/application.defaults.yml@. The @config/application.yml.example@ file is not read by the SSO server and is provided for installation convenience only.
-Consult @config/application.default.yml@ for a full list of configuration options. Local configuration goes in @config/application.yml@, do not edit @config/application.default.yml@.
+Consult @config/application.default.yml@ for a full list of configuration options. Local configuration goes in @/etc/arvados/sso/application.yml@, do not edit @config/application.default.yml@.
h3(#uuid_prefix). uuid_prefix
Generate a uuid prefix for the single sign on service. This prefix is used to identify user records as originating from this site. It must be exactly 5 lowercase ASCII letters and/or digits. You may use the following snippet to generate a uuid prefix:
<notextile>
-<pre><code>~/sso-devise-omniauth-provider$ <span class="userinput">ruby -e 'puts "#{rand(2**64).to_s(36)[0,5]}"'</span>
+<pre><code>~$ <span class="userinput">ruby -e 'puts "#{rand(2**64).to_s(36)[0,5]}"'</span>
abcde
</code></pre></notextile>
-Edit @config/application.yml@ and set @uuid_prefix@ in the "common" section.
+Edit @/etc/arvados/sso/application.yml@ and set @uuid_prefix@ in the "common" section.
h3(#secret_token). secret_token
Generate a new secret token for signing cookies:
<notextile>
-<pre><code>~/sso-devise-omniauth-provider$ <span class="userinput">ruby -e 'puts rand(2**400).to_s(36)'</span>
+<pre><code>~$ <span class="userinput">ruby -e 'puts rand(2**400).to_s(36)'</span>
zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz
</code></pre></notextile>
-Edit @config/application.yml@ and set @secret_token@ in the "common" section.
+Edit @/etc/arvados/sso/application.yml@ and set @secret_token@ in the "common" section.
+
+There are other configuration options in @/etc/arvados/sso/application.yml@. See the "Authentication methods":install-sso.html#authentication_methods section below for more details.
h2(#database). Set up the database
-Generate a new database password. Nobody ever needs to memorize it or type it, so make a strong one:
+If PostgreSQL was newly installed as a dependency of the @arvados-sso-server@ package, you will need to start the service.
+
+On a Debian-based system:
<notextile>
-<pre><code>~/sso-devise-omniauth-provider$ <span class="userinput">ruby -e 'puts rand(2**128).to_s(36)'</span>
+<pre><code>~$ <span class="userinput">sudo service postgresql start</span>
+</code></pre>
+</notextile>
+
+On a Red Hat-based system, we also need to initialize the database system:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo service postgresql initdb</span>
+~$ <span class="userinput">sudo service postgresql start</span>
+</code></pre>
+</notextile>
+
+{% include 'notebox_begin' %}
+
+If you are installing on CentOS6, you will need to modify PostgreSQL's configuration to allow password authentication for local users. The default configuration allows 'ident' only. The following commands will make the configuration change, and restart PostgreSQL for it to take effect.
+<br/>
+<notextile>
+<pre><code>~$ <span class="userinput">sudo sed -i -e "s/127.0.0.1\/32 ident/127.0.0.1\/32 md5/" /var/lib/pgsql/data/pg_hba.conf</span>
+~$ <span class="userinput">sudo sed -i -e "s/::1\/128 ident/::1\/128 md5/" /var/lib/pgsql/data/pg_hba.conf</span>
+~$ <span class="userinput">sudo service postgresql restart</span>
+</code></pre>
+</notextile>
+{% include 'notebox_end' %}
+
+
+Next, generate a new database password. Nobody ever needs to memorize it or type it, so make a strong one:
+
+<notextile>
+<pre><code>~$ <span class="userinput">ruby -e 'puts rand(2**128).to_s(36)'</span>
abcdefghijklmnopqrstuvwxyz012345689
</code></pre></notextile>
+Configure the SSO server to connect to your database by updating @/etc/arvados/sso/database.yml@. Replace the @xxxxxxxx@ database password placeholder with the new password you generated above. Be sure to update the @production@ section.
+
+<notextile>
+<pre><code>~$ <span class="userinput">editor /etc/arvados/sso/database.yml</span>
+</code></pre></notextile>
+
Create a new database user with permission to create its own databases.
<notextile>
-<pre><code>~/sso-devise-omniauth-provider$ <span class="userinput">sudo -u postgres createuser --createdb --encrypted -R -S --pwprompt arvados_sso</span>
+<pre><code>~$ <span class="userinput">sudo -u postgres createuser --createdb --encrypted -R -S --pwprompt arvados_sso</span>
Enter password for new role: <span class="userinput">paste-database-password-you-generated</span>
Enter it again: <span class="userinput">paste-database-password-you-generated</span>
</code></pre></notextile>
-Configure SSO server to connect to your database by creating and updating @config/database.yml@. Replace the @xxxxxxxx@ database password placeholders with the new password you generated above. If you are planning a production system, update the @production@ section, otherwise use @development@.
+Rails will take care of creating the database, based on the information from @/etc/arvados/sso/database.yml@.
+
+Alternatively, if the database user you intend to use for the SSO server should not be allowed to create new databases, the user and the database can be created like this:
<notextile>
-<pre><code>~/sso-devise-omniauth-provider$ <span class="userinput">cp -i config/database.yml.example config/database.yml</span>
-~/sso-devise-omniauth-provider$ <span class="userinput">edit config/database.yml</span>
+<pre><code>~$ <span class="userinput">sudo -u postgres createuser --encrypted -R -S --pwprompt arvados_sso</span>
+Enter password for new role: <span class="userinput">paste-database-password-you-generated</span>
+Enter it again: <span class="userinput">paste-database-password-you-generated</span>
+~$ <span class="userinput">sudo -u postgres createdb arvados_sso_production -E UTF8 -O arvados_sso -T template0</span>
</code></pre></notextile>
-Create and initialize the database. If you are planning a production system, choose the @production@ rails environment, otherwise use @development@.
+h2(#reconfigure_package). Reconfigure the package
+
+Now that the @/etc/arvados/sso/application.yml@ and @/etc/arvados/sso/database.yml@ files have been updated, we need to reconfigure our installed package. Doing so will create and/or initialize the database and precompile the assets.
+
+On a Debian-based system:
<notextile>
-<pre><code>~/sso-devise-omniauth-provider$ <span class="userinput">RAILS_ENV=production bundle exec rake db:setup</span>
-</code></pre></notextile>
+<pre><code>~$ <span class="userinput">sudo dpkg-reconfigure arvados-sso-server</span>
+</code></pre>
+</notextile>
-Alternatively, if the database user you intend to use for the SSO server is not allowed to create new databases, you can create the database first and then populate it with rake. Be sure to adjust the database name if you are using the @development@ environment. This sequence of commands is functionally equivalent to the rake db:setup command above:
+On a Red Hat-based system, we need to reinstall the package instead:
<notextile>
-<pre><code>~/sso-devise-omniauth-provider$ <span class="userinput">sudo -u postgres createdb arvados_sso_production -E UTF8 -O arvados_sso -T template0</span>
-~/sso-devise-omniauth-provider$ <span class="userinput">RAILS_ENV=production bundle exec rake db:schema:load</span>
-~/sso-devise-omniauth-provider$ <span class="userinput">RAILS_ENV=production bundle exec rake db:seed</span>
-</code></pre></notextile>
+<pre><code>~$ <span class="userinput">sudo yum reinstall arvados-sso-server</span>
+</code></pre>
+</notextile>
h2(#client). Create arvados-server client
Use @rails console@ to create a @Client@ record that will be used by the Arvados API server. The values of @app_id@ and @app_secret@ correspond to the values for @sso_app_id@ and @sso_app_secret@ in the "API server's SSO settings.":install-api-server.html#omniauth
<notextile>
-<pre><code>~/sso-devise-omniauth-provider$ <span class="userinput">ruby -e 'puts rand(2**400).to_s(36)'</span>
+<pre><code>~$ <span class="userinput">ruby -e 'puts rand(2**400).to_s(36)'</span>
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
-~/sso-devise-omniauth-provider$ <span class="userinput">RAILS_ENV=production bundle exec rails console</span>
+~$ <span class="userinput">RAILS_ENV=production bundle exec rails console</span>
:001 > <span class="userinput">c = Client.new</span>
:002 > <span class="userinput">c.name = "joshid"</span>
:003 > <span class="userinput">c.app_id = "arvados-server"</span>
</code></pre>
</notextile>
-h2(#assets). Precompile assets
+h2(#configure_web_server). Configure your web server
-If you are running in the production environment, you must precompile the assets:
+Edit the http section of your Nginx configuration to run the Passenger server and act as a frontend for it. You might add a block like the following, adding SSL and logging parameters to taste:
<notextile>
-<pre><code>~/sso-devise-omniauth-provider$ <span class="userinput">RAILS_ENV=production bundle exec rake assets:precompile</span>
+<pre><code>server {
+ listen 127.0.0.1:8900;
+ server_name localhost-sso;
+
+ root /var/www/arvados-sso/current/public;
+ index index.html;
+
+ passenger_enabled on;
+ # If you're not using RVM, comment out the line below.
+ passenger_ruby /usr/local/rvm/wrappers/default/ruby;
+}
+
+upstream sso {
+ server 127.0.0.1:8900 fail_timeout=10s;
+}
+
+proxy_http_version 1.1;
+
+server {
+ listen <span class="userinput">[your public IP address]</span>:443 ssl;
+ server_name auth.<span class="userinput">your.domain</span>;
+
+ ssl on;
+ ssl_certificate <span class="userinput">/YOUR/PATH/TO/cert.pem</span>;
+ ssl_certificate_key <span class="userinput">/YOUR/PATH/TO/cert.key</span>;
+
+ index index.html;
+
+ location / {
+ proxy_pass http://sso;
+ proxy_redirect off;
+ proxy_connect_timeout 90s;
+ proxy_read_timeout 300s;
+
+ proxy_set_header X-Forwarded-Proto https;
+ proxy_set_header Host $http_host;
+ proxy_set_header X-Real-IP $remote_addr;
+ proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
+ }
+}
</code></pre>
</notextile>
+Finally, restart Nginx and your Arvados SSO server should be up and running. You can verify that by visiting the URL you configured your Nginx web server to listen on in the server section above (port 443). Read on if you want to configure your Arvados SSO server to use a different authentication backend.
+
h2(#authentication_methods). Authentication methods
Authentication methods are configured in @application.yml@. Currently three authentication methods are supported: local accounts, LDAP, and Google+. If neither Google+ nor LDAP are enabled, the SSO server defaults to local user accounts. Only one authentication mechanism should be in use at a time.
If @allow_account_registration@ is false, you may manually create local accounts on the SSO server from the rails console:
<notextile>
-<pre><code>~/sso-devise-omniauth-provider$ <span class="userinput">RAILS_ENV=production bundle exec rails console</span>
+<pre><code>~$ <span class="userinput">RAILS_ENV=production bundle exec rails console</span>
:001 > <span class="userinput">user = User.new(:email => "test@example.com")</span>
:002 > <span class="userinput">user.password = "passw0rd"</span>
:003 > <span class="userinput">user.save!</span>
google_oauth2_client_id: <span class="userinput">"---YOUR---CLIENT---ID---HERE--"-</span>
google_oauth2_client_secret: <span class="userinput">"---YOUR---CLIENT---SECRET---HERE--"-</span></code></pre></notextile>
-h2(#start). Set up a Web server
-
-For best performance, we recommend you use Nginx as your Web server front-end, with a Passenger backend to serve the SSO server. To do that:
-
-<notextile>
-<ol>
-<li><a href="https://www.phusionpassenger.com/documentation/Users%20guide%20Nginx.html">Install Nginx and Phusion Passenger</a>.</li>
-
-<li><p>Edit the http section of your Nginx configuration to run the Passenger server, and act as a front-end for it. You might add a block like the following, adding SSL and logging parameters to taste:</p>
-
-<pre><code>server {
- listen 127.0.0.1:8900;
- server_name localhost-sso;
-
- root <span class="userinput">/YOUR/PATH/TO/sso-devise-omniauth-provider/public</span>;
- index index.html index.htm index.php;
-
- passenger_enabled on;
- # If you're using RVM, uncomment the line below.
- #passenger_ruby /usr/local/rvm/wrappers/default/ruby;
-}
-
-upstream sso {
- server 127.0.0.1:8900 fail_timeout=10s;
-}
-
-proxy_http_version 1.1;
-
-server {
- listen <span class="userinput">[your public IP address]</span>:443 ssl;
- server_name auth.<span class="userinput">your.domain</span>;
-
- ssl on;
- ssl_certificate <span class="userinput">/YOUR/PATH/TO/cert.pem</span>;
- ssl_certificate_key <span class="userinput">/YOUR/PATH/TO/cert.key</span>;
-
- index index.html index.htm index.php;
-
- location / {
- proxy_pass http://sso;
- proxy_redirect off;
- proxy_connect_timeout 90s;
- proxy_read_timeout 300s;
-
- proxy_set_header X-Forwarded-Proto https;
- proxy_set_header Host $http_host;
- proxy_set_header X-Real-IP $remote_addr;
- proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
- }
-}
-</code></pre>
-</li>
-
-<li>Restart Nginx.</li>
-
-</ol>
-</notextile>
-{% include 'notebox_begin' %}
-
-If you see the following warning "you may safely ignore it":https://stackoverflow.com/questions/10374871/no-secret-option-provided-to-racksessioncookie-warning:
-
-<pre>
-SECURITY WARNING: No secret option provided to Rack::Session::Cookie.
-This poses a security threat. It is strongly recommended that you
-provide a secret to prevent exploits that may be possible from crafted
-cookies. This will not be supported in future versions of Rack, and
-future versions will even invalidate your existing user cookies.
-
-Called from: /var/lib/gems/2.1.0/gems/actionpack-3.2.8/lib/action_dispatch/middleware/session/abstract_store.rb:28:in `initialize'.
-</pre>
-
-{% include 'notebox_end' %}
h2. Install prerequisites
-The Arvados package repository includes Workbench server package that can help automate much of the deployment.
+The Arvados package repository includes a Workbench server package that can help automate much of the deployment.
h3(#install_ruby_and_bundler). Install Ruby and Bundler
<notextile>
<ol>
-<li><a href="https://www.phusionpassenger.com/documentation/Users%20guide%20Nginx.html">Install Nginx and Phusion Passenger</a>.</li>
+<li><a href="https://www.phusionpassenger.com/library/walkthroughs/deploy/ruby/ownserver/nginx/oss/install_passenger_main.html">Install Nginx and Phusion Passenger</a>.</li>
<li>If you're deploying on CentOS and using the python27 Software Collection, configure Nginx to use it:
Usage: arv keep [method] [--parameters]
Use 'arv keep [method] --help' to get more information about specific methods.
-Available methods: ls, get, put, less, check, docker
+Available methods: ls, get, put, docker
</pre>
</notextile>
if ['get', 'put', 'ls', 'normalize'].index @sub then
# Native Arvados
exec `which arv-#{@sub}`.strip, *remaining_opts
- elsif ['less', 'check'].index @sub then
- # wh* shims
- exec `which wh#{@sub}`.strip, *remaining_opts
elsif @sub == 'docker'
exec `which arv-keepdocker`.strip, *remaining_opts
else
puts "Usage: arv keep [method] [--parameters]\n"
puts "Use 'arv keep [method] --help' to get more information about specific methods.\n\n"
- puts "Available methods: ls, get, put, less, check, docker"
+ puts "Available methods: ls, get, put, docker"
end
abort
when 'pipeline'
my $gotsome
= readfrompipes ()
+ reapchildren ();
- if (!$gotsome)
+ if (!$gotsome || ($latest_refresh + 2 < scalar time))
{
check_refresh_wanted();
check_squeue();
package keepclient
import (
+ "bytes"
"crypto/md5"
"crypto/tls"
"errors"
var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN")
var InvalidLocatorError = errors.New("Invalid locator")
+// ErrNoSuchKeepServer is returned when GetIndex is invoked with a UUID with no matching keep server
+var ErrNoSuchKeepServer = errors.New("No keep server matching the given UUID is found")
+
+// ErrIncompleteIndex is returned when the Index response does not end with a new empty line
+var ErrIncompleteIndex = errors.New("Got incomplete index")
+
const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas"
const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored"
return 0, "", BlockNotFound
}
+// GetIndex retrieves a list of blocks stored on the given server whose hashes
+// begin with the given prefix. The returned reader will return an error (other
+// than EOF) if the complete index cannot be retrieved.
+//
+// This is meant to be used only by system components and admin tools.
+// It will return an error unless the client is using a "data manager token"
+// recognized by the Keep services.
+func (kc *KeepClient) GetIndex(keepServiceUUID, prefix string) (io.Reader, error) {
+ url := kc.LocalRoots()[keepServiceUUID]
+ if url == "" {
+ return nil, ErrNoSuchKeepServer
+ }
+
+ url += "/index"
+ if prefix != "" {
+ url += "/" + prefix
+ }
+
+ req, err := http.NewRequest("GET", url, nil)
+ if err != nil {
+ return nil, err
+ }
+
+ req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
+ resp, err := kc.Client.Do(req)
+ if err != nil {
+ return nil, err
+ }
+
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ return nil, fmt.Errorf("Got http status code: %d", resp.StatusCode)
+ }
+
+ var respBody []byte
+ respBody, err = ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return nil, err
+ }
+
+ // Got index; verify that it is complete
+ // The response should be "\n" if no locators matched the prefix
+ // Else, it should be a list of locators followed by a blank line
+ if !bytes.Equal(respBody, []byte("\n")) && !bytes.HasSuffix(respBody, []byte("\n\n")) {
+ return nil, ErrIncompleteIndex
+ }
+
+ // Got complete index; strip the trailing newline and send
+ return bytes.NewReader(respBody[0 : len(respBody)-1]), nil
+}
+
// LocalRoots() returns the map of local (i.e., disk and proxy) Keep
// services: uuid -> baseURI.
func (kc *KeepClient) LocalRoots() map[string]string {
c.Check(err, Equals, InsufficientReplicasError)
c.Check(replicas, Equals, 0)
}
+
+type StubGetIndexHandler struct {
+ c *C
+ expectPath string
+ expectAPIToken string
+ httpStatus int
+ body []byte
+}
+
+func (h StubGetIndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ h.c.Check(req.URL.Path, Equals, h.expectPath)
+ h.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", h.expectAPIToken))
+ resp.WriteHeader(h.httpStatus)
+ resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(h.body)))
+ resp.Write(h.body)
+}
+
+func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
+ hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+ st := StubGetIndexHandler{
+ c,
+ "/index",
+ "abc123",
+ http.StatusOK,
+ []byte(hash + "+3 1443559274\n\n")}
+
+ ks := RunFakeKeepServer(st)
+ defer ks.listener.Close()
+
+ arv, err := arvadosclient.MakeArvadosClient()
+ kc, _ := MakeKeepClient(&arv)
+ arv.ApiToken = "abc123"
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+
+ r, err := kc.GetIndex("x", "")
+ c.Check(err, Equals, nil)
+
+ content, err2 := ioutil.ReadAll(r)
+ c.Check(err2, Equals, nil)
+ c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
+}
+
+func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
+ hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+ st := StubGetIndexHandler{
+ c,
+ "/index/" + hash[0:3],
+ "abc123",
+ http.StatusOK,
+ []byte(hash + "+3 1443559274\n\n")}
+
+ ks := RunFakeKeepServer(st)
+ defer ks.listener.Close()
+
+ arv, err := arvadosclient.MakeArvadosClient()
+ kc, _ := MakeKeepClient(&arv)
+ arv.ApiToken = "abc123"
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+
+ r, err := kc.GetIndex("x", hash[0:3])
+ c.Check(err, Equals, nil)
+
+ content, err2 := ioutil.ReadAll(r)
+ c.Check(err2, Equals, nil)
+ c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
+}
+
+func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) {
+ hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+ st := StubGetIndexHandler{
+ c,
+ "/index/" + hash[0:3],
+ "abc123",
+ http.StatusOK,
+ []byte(hash)}
+
+ ks := RunFakeKeepServer(st)
+ defer ks.listener.Close()
+
+ arv, err := arvadosclient.MakeArvadosClient()
+ kc, _ := MakeKeepClient(&arv)
+ arv.ApiToken = "abc123"
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+
+ _, err = kc.GetIndex("x", hash[0:3])
+ c.Check(err, Equals, ErrIncompleteIndex)
+}
+
+func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) {
+ hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+ st := StubGetIndexHandler{
+ c,
+ "/index/" + hash[0:3],
+ "abc123",
+ http.StatusOK,
+ []byte(hash)}
+
+ ks := RunFakeKeepServer(st)
+ defer ks.listener.Close()
+
+ arv, err := arvadosclient.MakeArvadosClient()
+ kc, _ := MakeKeepClient(&arv)
+ arv.ApiToken = "abc123"
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+
+ _, err = kc.GetIndex("y", hash[0:3])
+ c.Check(err, Equals, ErrNoSuchKeepServer)
+}
+
+func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
+ st := StubGetIndexHandler{
+ c,
+ "/index/abcd",
+ "abc123",
+ http.StatusOK,
+ []byte("\n")}
+
+ ks := RunFakeKeepServer(st)
+ defer ks.listener.Close()
+
+ arv, err := arvadosclient.MakeArvadosClient()
+ kc, _ := MakeKeepClient(&arv)
+ arv.ApiToken = "abc123"
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+
+ r, err := kc.GetIndex("x", "abcd")
+ c.Check(err, Equals, nil)
+
+ content, err2 := ioutil.ReadAll(r)
+ c.Check(err2, Equals, nil)
+ c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
+}
require 'trollop'
require './lib/salvage_collection'
+include SalvageCollection
opts = Trollop::options do
banner ''
*ApiTokenCache
}
+type IndexHandler struct {
+ *keepclient.KeepClient
+ *ApiTokenCache
+}
+
type InvalidPathHandler struct{}
type OptionsHandler struct{}
rest.Handle(`/{locator:[0-9a-f]{32}\+.*}`,
GetBlockHandler{kc, t}).Methods("GET", "HEAD")
rest.Handle(`/{locator:[0-9a-f]{32}}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD")
+
+ // List all blocks
+ rest.Handle(`/index`, IndexHandler{kc, t}).Methods("GET")
+
+ // List blocks whose hash has the given prefix
+ rest.Handle(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler{kc, t}).Methods("GET")
}
if enable_put {
status = http.StatusBadGateway
}
}
+
+// ServeHTTP implementation for IndexHandler
+// Supports only GET requests for /index/{prefix:[0-9a-f]{0,32}}
+// For each keep server found in LocalRoots:
+// Invokes GetIndex using keepclient
+// Expects "complete" response (terminating with blank new line)
+// Aborts on any errors
+// Concatenates responses from all those keep servers and returns
+func (handler IndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ SetCorsHeaders(resp)
+
+ prefix := mux.Vars(req)["prefix"]
+ var err error
+ var status int
+
+ defer func() {
+ if status != http.StatusOK {
+ http.Error(resp, err.Error(), status)
+ }
+ }()
+
+ kc := *handler.KeepClient
+
+ ok, token := CheckAuthorizationHeader(kc, handler.ApiTokenCache, req)
+ if !ok {
+ status, err = http.StatusForbidden, BadAuthorizationHeader
+ return
+ }
+
+ // Copy ArvadosClient struct and use the client's API token
+ arvclient := *kc.Arvados
+ arvclient.ApiToken = token
+ kc.Arvados = &arvclient
+
+ // Only GET method is supported
+ if req.Method != "GET" {
+ status, err = http.StatusNotImplemented, MethodNotSupported
+ return
+ }
+
+ // Get index from all LocalRoots and write to resp
+ var reader io.Reader
+ for uuid := range kc.LocalRoots() {
+ reader, err = kc.GetIndex(uuid, prefix)
+ if err != nil {
+ status = http.StatusBadGateway
+ return
+ }
+
+ _, err = io.Copy(resp, reader)
+ if err != nil {
+ status = http.StatusBadGateway
+ return
+ }
+ }
+
+ // Got index from all the keep servers and wrote to resp
+ status = http.StatusOK
+ resp.Write([]byte("\n"))
+}
"http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+K@zzzzz-zzzzz-zzzzzzzzzzzzzzz+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73")
}
+
+// Test GetIndex
+// Put one block, with 2 replicas
+// With no prefix (expect the block locator, twice)
+// With an existing prefix (expect the block locator, twice)
+// With a valid but non-existing prefix (expect "\n")
+// With an invalid prefix (expect error)
+func (s *ServerRequiredSuite) TestGetIndex(c *C) {
+ kc := runProxy(c, []string{"keepproxy"}, 28852, false)
+ waitForListener()
+ defer closeListener()
+
+ // Put "index-data" blocks
+ data := []byte("index-data")
+ hash := fmt.Sprintf("%x", md5.Sum(data))
+
+ hash2, rep, err := kc.PutB(data)
+ c.Check(hash2, Matches, fmt.Sprintf(`^%s\+10(\+.+)?$`, hash))
+ c.Check(rep, Equals, 2)
+ c.Check(err, Equals, nil)
+
+ reader, blocklen, _, err := kc.Get(hash)
+ c.Assert(err, Equals, nil)
+ c.Check(blocklen, Equals, int64(10))
+ all, err := ioutil.ReadAll(reader)
+ c.Check(all, DeepEquals, data)
+
+ // Put some more blocks
+ _, rep, err = kc.PutB([]byte("some-more-index-data"))
+ c.Check(err, Equals, nil)
+
+ // Invoke GetIndex
+ for _, spec := range []struct {
+ prefix string
+ expectTestHash bool
+ expectOther bool
+ }{
+ {"", true, true}, // with no prefix
+ {hash[:3], true, false}, // with matching prefix
+ {"abcdef", false, false}, // with no such prefix
+ } {
+ indexReader, err := kc.GetIndex("proxy", spec.prefix)
+ c.Assert(err, Equals, nil)
+ indexResp, err := ioutil.ReadAll(indexReader)
+ c.Assert(err, Equals, nil)
+ locators := strings.Split(string(indexResp), "\n")
+ gotTestHash := 0
+ gotOther := 0
+ for _, locator := range locators {
+ if locator == "" {
+ continue
+ }
+ c.Check(locator[:len(spec.prefix)], Equals, spec.prefix)
+ if locator[:32] == hash {
+ gotTestHash++
+ } else {
+ gotOther++
+ }
+ }
+ c.Check(gotTestHash == 2, Equals, spec.expectTestHash)
+ c.Check(gotOther > 0, Equals, spec.expectOther)
+ }
+
+ // GetIndex with invalid prefix
+ _, err = kc.GetIndex("proxy", "xyz")
+ c.Assert((err != nil), Equals, true)
+}
--- /dev/null
+user="root"
+group="root"
+chroot="/"
+chdir="/"
+nice=""
+args="-listen=':9100'"
+
--- /dev/null
+#!/bin/sh
+# Init script for keepproxy
+# Maintained by
+# Generated by pleaserun.
+# Implemented based on LSB Core 3.1:
+# * Sections: 20.2, 20.3
+#
+### BEGIN INIT INFO
+# Provides: keepproxy
+# Required-Start: $remote_fs $syslog
+# Required-Stop: $remote_fs $syslog
+# Default-Start: 2 3 4 5
+# Default-Stop: 0 1 6
+# Short-Description:
+# Description: no description given
+### END INIT INFO
+
+PATH=/sbin:/usr/sbin:/bin:/usr/bin
+export PATH
+
+name=keepproxy
+program=/usr/bin/keepproxy
+args=''
+pidfile="/var/run/$name.pid"
+
+[ -r /etc/default/$name ] && . /etc/default/$name
+[ -r /etc/sysconfig/$name ] && . /etc/sysconfig/$name
+
+trace() {
+ logger -t "/etc/init.d/keepproxy" "$@"
+}
+
+emit() {
+ trace "$@"
+ echo "$@"
+}
+
+start() {
+
+ # Ensure the log directory is setup correctly.
+ [ ! -d "/var/log/" ] && mkdir "/var/log/"
+ chown "$user":"$group" "/var/log/"
+ chmod 755 "/var/log/"
+
+
+ # Setup any environmental stuff beforehand
+
+
+ # Run the program!
+
+ chroot --userspec "$user":"$group" "$chroot" sh -c "
+
+ cd \"$chdir\"
+ exec \"$program\" $args
+ " >> /var/log/keepproxy.stdout 2>> /var/log/keepproxy.stderr &
+
+ # Generate the pidfile from here. If we instead made the forked process
+ # generate it there will be a race condition between the pidfile writing
+ # and a process possibly asking for status.
+ echo $! > $pidfile
+
+ emit "$name started"
+ return 0
+}
+
+stop() {
+ # Try a few times to kill TERM the program
+ if status ; then
+ pid=$(cat "$pidfile")
+ trace "Killing $name (pid $pid) with SIGTERM"
+ kill -TERM $pid
+ # Wait for it to exit.
+ for i in 1 2 3 4 5 ; do
+ trace "Waiting $name (pid $pid) to die..."
+ status || break
+ sleep 1
+ done
+ if status ; then
+ emit "$name stop failed; still running."
+ else
+ emit "$name stopped."
+ fi
+ fi
+}
+
+status() {
+ if [ -f "$pidfile" ] ; then
+ pid=$(cat "$pidfile")
+ if ps -p $pid > /dev/null 2> /dev/null ; then
+ # process by this pid is running.
+ # It may not be our pid, but that's what you get with just pidfiles.
+ # TODO(sissel): Check if this process seems to be the same as the one we
+ # expect. It'd be nice to use flock here, but flock uses fork, not exec,
+ # so it makes it quite awkward to use in this case.
+ return 0
+ else
+ return 2 # program is dead but pid file exists
+ fi
+ else
+ return 3 # program is not running
+ fi
+}
+
+force_stop() {
+ if status ; then
+ stop
+ status && kill -KILL $(cat "$pidfile")
+ fi
+}
+
+
+case "$1" in
+ force-start|start|stop|force-stop|restart)
+ trace "Attempting '$1' on keepproxy"
+ ;;
+esac
+
+case "$1" in
+ force-start)
+ PRESTART=no
+ exec "$0" start
+ ;;
+ start)
+ status
+ code=$?
+ if [ $code -eq 0 ]; then
+ emit "$name is already running"
+ exit $code
+ else
+ start
+ exit $?
+ fi
+ ;;
+ stop) stop ;;
+ force-stop) force_stop ;;
+ status)
+ status
+ code=$?
+ if [ $code -eq 0 ] ; then
+ emit "$name is running"
+ else
+ emit "$name is not running"
+ fi
+ exit $code
+ ;;
+ restart)
+
+ stop && start
+ ;;
+ *)
+ echo "Usage: $SCRIPTNAME {start|force-start|stop|force-start|force-stop|status|restart}" >&2
+ exit 3
+ ;;
+esac
+
+exit $?
--- /dev/null
+package main
+
+import (
+ "bytes"
+ "errors"
+ "flag"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "log"
+ "os"
+ "strings"
+ "time"
+
+ "github.com/curoverse/azure-sdk-for-go/storage"
+)
+
+var (
+ azureStorageAccountName string
+ azureStorageAccountKeyFile string
+ azureStorageReplication int
+)
+
+func readKeyFromFile(file string) (string, error) {
+ buf, err := ioutil.ReadFile(file)
+ if err != nil {
+ return "", errors.New("reading key from " + file + ": " + err.Error())
+ }
+ accountKey := strings.TrimSpace(string(buf))
+ if accountKey == "" {
+ return "", errors.New("empty account key in " + file)
+ }
+ return accountKey, nil
+}
+
+type azureVolumeAdder struct {
+ *volumeSet
+}
+
+func (s *azureVolumeAdder) Set(containerName string) error {
+ if containerName == "" {
+ return errors.New("no container name given")
+ }
+ if azureStorageAccountName == "" || azureStorageAccountKeyFile == "" {
+ return errors.New("-azure-storage-account-name and -azure-storage-account-key-file arguments must given before -azure-storage-container-volume")
+ }
+ accountKey, err := readKeyFromFile(azureStorageAccountKeyFile)
+ if err != nil {
+ return err
+ }
+ azClient, err := storage.NewBasicClient(azureStorageAccountName, accountKey)
+ if err != nil {
+ return errors.New("creating Azure storage client: " + err.Error())
+ }
+ if flagSerializeIO {
+ log.Print("Notice: -serialize is not supported by azure-blob-container volumes.")
+ }
+ v := NewAzureBlobVolume(azClient, containerName, flagReadonly, azureStorageReplication)
+ if err := v.Check(); err != nil {
+ return err
+ }
+ *s.volumeSet = append(*s.volumeSet, v)
+ return nil
+}
+
+func init() {
+ flag.Var(&azureVolumeAdder{&volumes},
+ "azure-storage-container-volume",
+ "Use the given container as a storage volume. Can be given multiple times.")
+ flag.StringVar(
+ &azureStorageAccountName,
+ "azure-storage-account-name",
+ "",
+ "Azure storage account name used for subsequent --azure-storage-container-volume arguments.")
+ flag.StringVar(
+ &azureStorageAccountKeyFile,
+ "azure-storage-account-key-file",
+ "",
+ "File containing the account key used for subsequent --azure-storage-container-volume arguments.")
+ flag.IntVar(
+ &azureStorageReplication,
+ "azure-storage-replication",
+ 3,
+ "Replication level to report to clients when data is stored in an Azure container.")
+}
+
+// An AzureBlobVolume stores and retrieves blocks in an Azure Blob
+// container.
+type AzureBlobVolume struct {
+ azClient storage.Client
+ bsClient storage.BlobStorageClient
+ containerName string
+ readonly bool
+ replication int
+}
+
+func NewAzureBlobVolume(client storage.Client, containerName string, readonly bool, replication int) *AzureBlobVolume {
+ return &AzureBlobVolume{
+ azClient: client,
+ bsClient: client.GetBlobService(),
+ containerName: containerName,
+ readonly: readonly,
+ replication: replication,
+ }
+}
+
+// Check returns nil if the volume is usable.
+func (v *AzureBlobVolume) Check() error {
+ ok, err := v.bsClient.ContainerExists(v.containerName)
+ if err != nil {
+ return err
+ }
+ if !ok {
+ return errors.New("container does not exist")
+ }
+ return nil
+}
+
+func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
+ rdr, err := v.bsClient.GetBlob(v.containerName, loc)
+ if err != nil {
+ if strings.Contains(err.Error(), "404 Not Found") {
+ // "storage: service returned without a response body (404 Not Found)"
+ return nil, os.ErrNotExist
+ }
+ return nil, err
+ }
+ switch err := err.(type) {
+ case nil:
+ default:
+ log.Printf("ERROR IN Get(): %T %#v", err, err)
+ return nil, err
+ }
+ defer rdr.Close()
+ buf := bufs.Get(BlockSize)
+ n, err := io.ReadFull(rdr, buf)
+ switch err {
+ case io.EOF, io.ErrUnexpectedEOF:
+ return buf[:n], nil
+ default:
+ bufs.Put(buf)
+ return nil, err
+ }
+}
+
+func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
+ rdr, err := v.bsClient.GetBlob(v.containerName, loc)
+ if err != nil {
+ return err
+ }
+ defer rdr.Close()
+ return compareReaderWithBuf(rdr, expect, loc[:32])
+}
+
+func (v *AzureBlobVolume) Put(loc string, block []byte) error {
+ if v.readonly {
+ return MethodDisabledError
+ }
+ return v.bsClient.CreateBlockBlobFromReader(v.containerName, loc, uint64(len(block)), bytes.NewReader(block))
+}
+
+func (v *AzureBlobVolume) Touch(loc string) error {
+ if v.readonly {
+ return MethodDisabledError
+ }
+ return v.bsClient.SetBlobMetadata(v.containerName, loc, map[string]string{
+ "touch": fmt.Sprintf("%d", time.Now()),
+ })
+}
+
+func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
+ props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
+ if err != nil {
+ return time.Time{}, err
+ }
+ return time.Parse(time.RFC1123, props.LastModified)
+}
+
+func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
+ params := storage.ListBlobsParameters{
+ Prefix: prefix,
+ }
+ for {
+ resp, err := v.bsClient.ListBlobs(v.containerName, params)
+ if err != nil {
+ return err
+ }
+ for _, b := range resp.Blobs {
+ t, err := time.Parse(time.RFC1123, b.Properties.LastModified)
+ if err != nil {
+ return err
+ }
+ fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, t.Unix())
+ }
+ if resp.NextMarker == "" {
+ return nil
+ }
+ params.Marker = resp.NextMarker
+ }
+}
+
+func (v *AzureBlobVolume) Delete(loc string) error {
+ if v.readonly {
+ return MethodDisabledError
+ }
+ // Ideally we would use If-Unmodified-Since, but that
+ // particular condition seems to be ignored by Azure. Instead,
+ // we get the Etag before checking Mtime, and use If-Match to
+ // ensure we don't delete data if Put() or Touch() happens
+ // between our calls to Mtime() and DeleteBlob().
+ props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
+ if err != nil {
+ return err
+ }
+ if t, err := v.Mtime(loc); err != nil {
+ return err
+ } else if time.Since(t) < blobSignatureTTL {
+ return nil
+ }
+ return v.bsClient.DeleteBlob(v.containerName, loc, map[string]string{
+ "If-Match": props.Etag,
+ })
+}
+
+func (v *AzureBlobVolume) Status() *VolumeStatus {
+ return &VolumeStatus{
+ DeviceNum: 1,
+ BytesFree: BlockSize * 1000,
+ BytesUsed: 1,
+ }
+}
+
+func (v *AzureBlobVolume) String() string {
+ return fmt.Sprintf("azure-storage-container:%+q", v.containerName)
+}
+
+func (v *AzureBlobVolume) Writable() bool {
+ return !v.readonly
+}
+
+func (v *AzureBlobVolume) Replication() int {
+ return v.replication
+}
--- /dev/null
+package main
+
+import (
+ "encoding/base64"
+ "encoding/xml"
+ "flag"
+ "fmt"
+ "io/ioutil"
+ "log"
+ "math/rand"
+ "net"
+ "net/http"
+ "net/http/httptest"
+ "regexp"
+ "sort"
+ "strconv"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/curoverse/azure-sdk-for-go/storage"
+)
+
+const (
+ // The same fake credentials used by Microsoft's Azure emulator
+ emulatorAccountName = "devstoreaccount1"
+ emulatorAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
+)
+
+var azureTestContainer string
+
+func init() {
+ flag.StringVar(
+ &azureTestContainer,
+ "test.azure-storage-container-volume",
+ "",
+ "Name of Azure container to use for testing. Do not use a container with real data! Use -azure-storage-account-name and -azure-storage-key-file arguments to supply credentials.")
+}
+
+type azBlob struct {
+ Data []byte
+ Etag string
+ Metadata map[string]string
+ Mtime time.Time
+ Uncommitted map[string][]byte
+}
+
+type azStubHandler struct {
+ sync.Mutex
+ blobs map[string]*azBlob
+}
+
+func newAzStubHandler() *azStubHandler {
+ return &azStubHandler{
+ blobs: make(map[string]*azBlob),
+ }
+}
+
+func (h *azStubHandler) TouchWithDate(container, hash string, t time.Time) {
+ if blob, ok := h.blobs[container+"|"+hash]; !ok {
+ return
+ } else {
+ blob.Mtime = t
+ }
+}
+
+func (h *azStubHandler) PutRaw(container, hash string, data []byte) {
+ h.Lock()
+ defer h.Unlock()
+ h.blobs[container+"|"+hash] = &azBlob{
+ Data: data,
+ Mtime: time.Now(),
+ Uncommitted: make(map[string][]byte),
+ }
+}
+
+func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
+ h.Lock()
+ defer h.Unlock()
+ // defer log.Printf("azStubHandler: %+v", r)
+
+ path := strings.Split(r.URL.Path, "/")
+ container := path[1]
+ hash := ""
+ if len(path) > 2 {
+ hash = path[2]
+ }
+
+ if err := r.ParseForm(); err != nil {
+ log.Printf("azStubHandler(%+v): %s", r, err)
+ rw.WriteHeader(http.StatusBadRequest)
+ return
+ }
+
+ body, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ return
+ }
+
+ type blockListRequestBody struct {
+ XMLName xml.Name `xml:"BlockList"`
+ Uncommitted []string
+ }
+
+ blob, blobExists := h.blobs[container+"|"+hash]
+
+ switch {
+ case r.Method == "PUT" && r.Form.Get("comp") == "":
+ // "Put Blob" API
+ h.blobs[container+"|"+hash] = &azBlob{
+ Data: body,
+ Mtime: time.Now(),
+ Uncommitted: make(map[string][]byte),
+ Etag: makeEtag(),
+ }
+ rw.WriteHeader(http.StatusCreated)
+ case r.Method == "PUT" && r.Form.Get("comp") == "block":
+ // "Put Block" API
+ if !blobExists {
+ log.Printf("Got block for nonexistent blob: %+v", r)
+ rw.WriteHeader(http.StatusBadRequest)
+ return
+ }
+ blockID, err := base64.StdEncoding.DecodeString(r.Form.Get("blockid"))
+ if err != nil || len(blockID) == 0 {
+ log.Printf("Invalid blockid: %+q", r.Form.Get("blockid"))
+ rw.WriteHeader(http.StatusBadRequest)
+ return
+ }
+ blob.Uncommitted[string(blockID)] = body
+ rw.WriteHeader(http.StatusCreated)
+ case r.Method == "PUT" && r.Form.Get("comp") == "blocklist":
+ // "Put Block List" API
+ bl := &blockListRequestBody{}
+ if err := xml.Unmarshal(body, bl); err != nil {
+ log.Printf("xml Unmarshal: %s", err)
+ rw.WriteHeader(http.StatusBadRequest)
+ return
+ }
+ for _, encBlockID := range bl.Uncommitted {
+ blockID, err := base64.StdEncoding.DecodeString(encBlockID)
+ if err != nil || len(blockID) == 0 || blob.Uncommitted[string(blockID)] == nil {
+ log.Printf("Invalid blockid: %+q", encBlockID)
+ rw.WriteHeader(http.StatusBadRequest)
+ return
+ }
+ blob.Data = blob.Uncommitted[string(blockID)]
+ blob.Etag = makeEtag()
+ blob.Mtime = time.Now()
+ delete(blob.Uncommitted, string(blockID))
+ }
+ rw.WriteHeader(http.StatusCreated)
+ case r.Method == "PUT" && r.Form.Get("comp") == "metadata":
+ // "Set Metadata Headers" API. We don't bother
+ // stubbing "Get Metadata Headers": AzureBlobVolume
+ // sets metadata headers only as a way to bump Etag
+ // and Last-Modified.
+ if !blobExists {
+ log.Printf("Got metadata for nonexistent blob: %+v", r)
+ rw.WriteHeader(http.StatusBadRequest)
+ return
+ }
+ blob.Metadata = make(map[string]string)
+ for k, v := range r.Header {
+ if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
+ blob.Metadata[k] = v[0]
+ }
+ }
+ blob.Mtime = time.Now()
+ blob.Etag = makeEtag()
+ case (r.Method == "GET" || r.Method == "HEAD") && hash != "":
+ // "Get Blob" API
+ if !blobExists {
+ rw.WriteHeader(http.StatusNotFound)
+ return
+ }
+ rw.Header().Set("Last-Modified", blob.Mtime.Format(time.RFC1123))
+ rw.Header().Set("Content-Length", strconv.Itoa(len(blob.Data)))
+ if r.Method == "GET" {
+ if _, err := rw.Write(blob.Data); err != nil {
+ log.Printf("write %+q: %s", blob.Data, err)
+ }
+ }
+ case r.Method == "DELETE" && hash != "":
+ // "Delete Blob" API
+ if !blobExists {
+ rw.WriteHeader(http.StatusNotFound)
+ return
+ }
+ delete(h.blobs, container+"|"+hash)
+ rw.WriteHeader(http.StatusAccepted)
+ case r.Method == "GET" && r.Form.Get("comp") == "list" && r.Form.Get("restype") == "container":
+ // "List Blobs" API
+ prefix := container + "|" + r.Form.Get("prefix")
+ marker := r.Form.Get("marker")
+
+ maxResults := 2
+ if n, err := strconv.Atoi(r.Form.Get("maxresults")); err == nil && n >= 1 && n <= 5000 {
+ maxResults = n
+ }
+
+ resp := storage.BlobListResponse{
+ Marker: marker,
+ NextMarker: "",
+ MaxResults: int64(maxResults),
+ }
+ var hashes sort.StringSlice
+ for k := range h.blobs {
+ if strings.HasPrefix(k, prefix) {
+ hashes = append(hashes, k[len(container)+1:])
+ }
+ }
+ hashes.Sort()
+ for _, hash := range hashes {
+ if len(resp.Blobs) == maxResults {
+ resp.NextMarker = hash
+ break
+ }
+ if len(resp.Blobs) > 0 || marker == "" || marker == hash {
+ blob := h.blobs[container+"|"+hash]
+ resp.Blobs = append(resp.Blobs, storage.Blob{
+ Name: hash,
+ Properties: storage.BlobProperties{
+ LastModified: blob.Mtime.Format(time.RFC1123),
+ ContentLength: int64(len(blob.Data)),
+ Etag: blob.Etag,
+ },
+ })
+ }
+ }
+ buf, err := xml.Marshal(resp)
+ if err != nil {
+ log.Print(err)
+ rw.WriteHeader(http.StatusInternalServerError)
+ }
+ rw.Write(buf)
+ default:
+ log.Printf("azStubHandler: not implemented: %+v Body:%+q", r, body)
+ rw.WriteHeader(http.StatusNotImplemented)
+ }
+}
+
+// azStubDialer is a net.Dialer that notices when the Azure driver
+// tries to connect to "devstoreaccount1.blob.127.0.0.1:46067", and
+// in such cases transparently dials "127.0.0.1:46067" instead.
+type azStubDialer struct {
+ net.Dialer
+}
+
+var localHostPortRe = regexp.MustCompile(`(127\.0\.0\.1|localhost|\[::1\]):\d+`)
+
+func (d *azStubDialer) Dial(network, address string) (net.Conn, error) {
+ if hp := localHostPortRe.FindString(address); hp != "" {
+ log.Println("azStubDialer: dial", hp, "instead of", address)
+ address = hp
+ }
+ return d.Dialer.Dial(network, address)
+}
+
+type TestableAzureBlobVolume struct {
+ *AzureBlobVolume
+ azHandler *azStubHandler
+ azStub *httptest.Server
+ t *testing.T
+}
+
+func NewTestableAzureBlobVolume(t *testing.T, readonly bool, replication int) TestableVolume {
+ azHandler := newAzStubHandler()
+ azStub := httptest.NewServer(azHandler)
+
+ var azClient storage.Client
+
+ container := azureTestContainer
+ if container == "" {
+ // Connect to stub instead of real Azure storage service
+ stubURLBase := strings.Split(azStub.URL, "://")[1]
+ var err error
+ if azClient, err = storage.NewClient(emulatorAccountName, emulatorAccountKey, stubURLBase, storage.DefaultAPIVersion, false); err != nil {
+ t.Fatal(err)
+ }
+ container = "fakecontainername"
+ } else {
+ // Connect to real Azure storage service
+ accountKey, err := readKeyFromFile(azureStorageAccountKeyFile)
+ if err != nil {
+ t.Fatal(err)
+ }
+ azClient, err = storage.NewBasicClient(azureStorageAccountName, accountKey)
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ v := NewAzureBlobVolume(azClient, container, readonly, replication)
+
+ return &TestableAzureBlobVolume{
+ AzureBlobVolume: v,
+ azHandler: azHandler,
+ azStub: azStub,
+ t: t,
+ }
+}
+
+func TestAzureBlobVolumeWithGeneric(t *testing.T) {
+ defer func(t http.RoundTripper) {
+ http.DefaultTransport = t
+ }(http.DefaultTransport)
+ http.DefaultTransport = &http.Transport{
+ Dial: (&azStubDialer{}).Dial,
+ }
+ DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
+ return NewTestableAzureBlobVolume(t, false, azureStorageReplication)
+ })
+}
+
+func TestReadonlyAzureBlobVolumeWithGeneric(t *testing.T) {
+ defer func(t http.RoundTripper) {
+ http.DefaultTransport = t
+ }(http.DefaultTransport)
+ http.DefaultTransport = &http.Transport{
+ Dial: (&azStubDialer{}).Dial,
+ }
+ DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
+ return NewTestableAzureBlobVolume(t, true, azureStorageReplication)
+ })
+}
+
+func TestAzureBlobVolumeReplication(t *testing.T) {
+ for r := 1; r <= 4; r++ {
+ v := NewTestableAzureBlobVolume(t, false, r)
+ defer v.Teardown()
+ if n := v.Replication(); n != r {
+ t.Errorf("Got replication %d, expected %d", n, r)
+ }
+ }
+}
+
+func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
+ v.azHandler.PutRaw(v.containerName, locator, data)
+}
+
+func (v *TestableAzureBlobVolume) TouchWithDate(locator string, lastPut time.Time) {
+ v.azHandler.TouchWithDate(v.containerName, locator, lastPut)
+}
+
+func (v *TestableAzureBlobVolume) Teardown() {
+ v.azStub.Close()
+}
+
+func makeEtag() string {
+ return fmt.Sprintf("0x%x", rand.Int63())
+}
package main
import (
+ "bytes"
"crypto/md5"
"fmt"
"io"
}
return <-outcome
}
+
+func compareReaderWithBuf(rdr io.Reader, expect []byte, hash string) error {
+ bufLen := 1 << 20
+ if bufLen > len(expect) && len(expect) > 0 {
+ // No need for bufLen to be longer than
+ // expect, except that len(buf)==0 would
+ // prevent us from handling empty readers the
+ // same way as non-empty readers: reading 0
+ // bytes at a time never reaches EOF.
+ bufLen = len(expect)
+ }
+ buf := make([]byte, bufLen)
+ cmp := expect
+
+ // Loop invariants: all data read so far matched what
+ // we expected, and the first N bytes of cmp are
+ // expected to equal the next N bytes read from
+ // rdr.
+ for {
+ n, err := rdr.Read(buf)
+ if n > len(cmp) || bytes.Compare(cmp[:n], buf[:n]) != 0 {
+ return collisionOrCorrupt(hash, expect[:len(expect)-len(cmp)], buf[:n], rdr)
+ }
+ cmp = cmp[n:]
+ if err == io.EOF {
+ if len(cmp) != 0 {
+ return collisionOrCorrupt(hash, expect[:len(expect)-len(cmp)], nil, nil)
+ }
+ return nil
+ } else if err != nil {
+ return err
+ }
+ }
+}
uri: "/index/" + TestHash[0:3],
apiToken: dataManagerToken,
}
+ superuserNoSuchPrefixReq := &RequestTester{
+ method: "GET",
+ uri: "/index/abcd",
+ apiToken: dataManagerToken,
+ }
+ superuserInvalidPrefixReq := &RequestTester{
+ method: "GET",
+ uri: "/index/xyz",
+ apiToken: dataManagerToken,
+ }
// -------------------------------------------------------------
// Only the superuser should be allowed to issue /index requests.
"permissions on, superuser /index/prefix request: expected %s, got:\n%s",
expected, response.Body.String())
}
+
+ // superuser /index/{no-such-prefix} request
+ // => OK
+ response = IssueRequest(superuserNoSuchPrefixReq)
+ ExpectStatusCode(t,
+ "permissions on, superuser request",
+ http.StatusOK,
+ response)
+
+ if "\n" != response.Body.String() {
+ t.Errorf("Expected empty response for %s. Found %s", superuserNoSuchPrefixReq.uri, response.Body.String())
+ }
+
+ // superuser /index/{invalid-prefix} request
+ // => StatusBadRequest
+ response = IssueRequest(superuserInvalidPrefixReq)
+ ExpectStatusCode(t,
+ "permissions on, superuser request",
+ http.StatusBadRequest,
+ response)
}
// TestDeleteHandler
case <-ok:
}
}
+
+func TestPutReplicationHeader(t *testing.T) {
+ defer teardown()
+
+ KeepVM = MakeTestVolumeManager(2)
+ defer KeepVM.Close()
+
+ resp := IssueRequest(&RequestTester{
+ method: "PUT",
+ uri: "/" + TestHash,
+ requestBody: TestBlock,
+ })
+ if r := resp.Header().Get("X-Keep-Replicas-Stored"); r != "1" {
+ t.Errorf("Got X-Keep-Replicas-Stored: %q, expected %q", r, "1")
+ }
+}
return
}
- err = PutBlock(buf, hash)
+ replication, err := PutBlock(buf, hash)
bufs.Put(buf)
if err != nil {
expiry := time.Now().Add(blobSignatureTTL)
returnHash = SignLocator(returnHash, apiToken, expiry)
}
+ resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
resp.Write([]byte(returnHash + "\n"))
}
// all writes failed). The text of the error message should
// provide as much detail as possible.
//
-func PutBlock(block []byte, hash string) error {
+func PutBlock(block []byte, hash string) (int, error) {
// Check that BLOCK's checksum matches HASH.
blockhash := fmt.Sprintf("%x", md5.Sum(block))
if blockhash != hash {
log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
- return RequestHashError
+ return 0, RequestHashError
}
// If we already have this data, it's intact on disk, and we
// can update its timestamp, return success. If we have
// different data with the same hash, return failure.
- if err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
- return err
+ if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
+ return n, err
}
// Choose a Keep volume to write to.
// If this volume fails, try all of the volumes in order.
if vol := KeepVM.NextWritable(); vol != nil {
if err := vol.Put(hash, block); err == nil {
- return nil // success!
+ return vol.Replication(), nil // success!
}
}
writables := KeepVM.AllWritable()
if len(writables) == 0 {
log.Print("No writable volumes.")
- return FullError
+ return 0, FullError
}
allFull := true
for _, vol := range writables {
err := vol.Put(hash, block)
if err == nil {
- return nil // success!
+ return vol.Replication(), nil // success!
}
if err != FullError {
// The volume is not full but the
if allFull {
log.Print("All volumes are full.")
- return FullError
+ return 0, FullError
}
// Already logged the non-full errors.
- return GenericError
+ return 0, GenericError
}
-// CompareAndTouch returns nil if one of the volumes already has the
-// given content and it successfully updates the relevant block's
-// modification time in order to protect it from premature garbage
-// collection.
-func CompareAndTouch(hash string, buf []byte) error {
+// CompareAndTouch returns the current replication level if one of the
+// volumes already has the given content and it successfully updates
+// the relevant block's modification time in order to protect it from
+// premature garbage collection. Otherwise, it returns a non-nil
+// error.
+func CompareAndTouch(hash string, buf []byte) (int, error) {
var bestErr error = NotFoundError
for _, vol := range KeepVM.AllWritable() {
if err := vol.Compare(hash, buf); err == CollisionError {
// both, so there's no point writing it even
// on a different volume.)
log.Printf("%s: Compare(%s): %s", vol, hash, err)
- return err
+ return 0, err
} else if os.IsNotExist(err) {
// Block does not exist. This is the only
// "normal" error: we don't log anything.
continue
}
// Compare and Touch both worked --> done.
- return nil
+ return vol.Replication(), nil
}
- return bestErr
+ return 0, bestErr
}
var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
setupHandlersWithGenericVolumeTest(t, factory)
// PutBlock
- if err := PutBlock(testBlock, testHash); err != nil {
+ if _, err := PutBlock(testBlock, testHash); err != nil {
t.Fatalf("Error during PutBlock: %s", err)
}
// Check that PutBlock succeeds again even after CompareAndTouch
- if err := PutBlock(testBlock, testHash); err != nil {
+ if _, err := PutBlock(testBlock, testHash); err != nil {
t.Fatalf("Error during PutBlock: %s", err)
}
testableVolumes[1].PutRaw(testHash, badData)
// Check that PutBlock with good data succeeds
- if err := PutBlock(testBlock, testHash); err != nil {
+ if _, err := PutBlock(testBlock, testHash); err != nil {
t.Fatalf("Error during PutBlock for %q: %s", testHash, err)
}
package main
import (
- "bufio"
"bytes"
- "errors"
"flag"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"os"
"os/signal"
"strings"
- "sync"
"syscall"
"time"
)
var pullq *WorkQueue
var trashq *WorkQueue
+type volumeSet []Volume
+
var (
flagSerializeIO bool
flagReadonly bool
+ volumes volumeSet
)
-type volumeSet []Volume
-
-func (vs *volumeSet) Set(value string) error {
- if dirs := strings.Split(value, ","); len(dirs) > 1 {
- log.Print("DEPRECATED: using comma-separated volume list.")
- for _, dir := range dirs {
- if err := vs.Set(dir); err != nil {
- return err
- }
- }
- return nil
- }
- if len(value) == 0 || value[0] != '/' {
- return errors.New("Invalid volume: must begin with '/'.")
- }
- if _, err := os.Stat(value); err != nil {
- return err
- }
- var locker sync.Locker
- if flagSerializeIO {
- locker = &sync.Mutex{}
- }
- *vs = append(*vs, &UnixVolume{
- root: value,
- locker: locker,
- readonly: flagReadonly,
- })
- return nil
-}
-
func (vs *volumeSet) String() string {
- s := "["
- for i, v := range *vs {
- if i > 0 {
- s = s + " "
- }
- s = s + v.String()
- }
- return s + "]"
-}
-
-// Discover adds a volume for every directory named "keep" that is
-// located at the top level of a device- or tmpfs-backed mount point
-// other than "/". It returns the number of volumes added.
-func (vs *volumeSet) Discover() int {
- added := 0
- f, err := os.Open(ProcMounts)
- if err != nil {
- log.Fatalf("opening %s: %s", ProcMounts, err)
- }
- scanner := bufio.NewScanner(f)
- for scanner.Scan() {
- args := strings.Fields(scanner.Text())
- if err := scanner.Err(); err != nil {
- log.Fatalf("reading %s: %s", ProcMounts, err)
- }
- dev, mount := args[0], args[1]
- if mount == "/" {
- continue
- }
- if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
- continue
- }
- keepdir := mount + "/keep"
- if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
- continue
- }
- // Set the -readonly flag (but only for this volume)
- // if the filesystem is mounted readonly.
- flagReadonlyWas := flagReadonly
- for _, fsopt := range strings.Split(args[3], ",") {
- if fsopt == "ro" {
- flagReadonly = true
- break
- }
- if fsopt == "rw" {
- break
- }
- }
- vs.Set(keepdir)
- flagReadonly = flagReadonlyWas
- added++
- }
- return added
+ return fmt.Sprintf("%+v", (*vs)[:])
}
// TODO(twp): continue moving as much code as possible out of main
listen string
blobSigningKeyFile string
permissionTTLSec int
- volumes volumeSet
pidfile string
)
flag.StringVar(
"readonly",
false,
"Do not write, delete, or touch anything on the following volumes.")
- flag.Var(
- &volumes,
- "volumes",
- "Deprecated synonym for -volume.")
- flag.Var(
- &volumes,
- "volume",
- "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
flag.StringVar(
&pidfile,
"pid",
}
if len(volumes) == 0 {
- if volumes.Discover() == 0 {
+ if (&unixVolumeAdder{&volumes}).Discover() == 0 {
log.Fatal("No volumes found.")
}
}
defer KeepVM.Close()
// Check that PutBlock stores the data as expected.
- if err := PutBlock(TestBlock, TestHash); err != nil {
- t.Fatalf("PutBlock: %v", err)
+ if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+ t.Fatalf("PutBlock: n %d err %v", n, err)
}
vols := KeepVM.AllReadable()
vols[0].(*MockVolume).Bad = true
// Check that PutBlock stores the data as expected.
- if err := PutBlock(TestBlock, TestHash); err != nil {
- t.Fatalf("PutBlock: %v", err)
+ if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+ t.Fatalf("PutBlock: n %d err %v", n, err)
}
result, err := GetBlock(TestHash)
// Check that PutBlock returns the expected error when the hash does
// not match the block.
- if err := PutBlock(BadBlock, TestHash); err != RequestHashError {
+ if _, err := PutBlock(BadBlock, TestHash); err != RequestHashError {
t.Error("Expected RequestHashError, got %v", err)
}
// Store a corrupted block under TestHash.
vols := KeepVM.AllWritable()
vols[0].Put(TestHash, BadBlock)
- if err := PutBlock(TestBlock, TestHash); err != nil {
- t.Errorf("PutBlock: %v", err)
+ if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+ t.Errorf("PutBlock: n %d err %v", n, err)
}
// The block on disk should now match TestBlock.
// Store one block, then attempt to store the other. Confirm that
// PutBlock reported a CollisionError.
- if err := PutBlock(b1, locator); err != nil {
+ if _, err := PutBlock(b1, locator); err != nil {
t.Error(err)
}
- if err := PutBlock(b2, locator); err == nil {
+ if _, err := PutBlock(b2, locator); err == nil {
t.Error("PutBlock did not report a collision")
} else if err != CollisionError {
t.Errorf("PutBlock returned %v", err)
// vols[0].Touch will fail on the next call, so the volume
// manager will store a copy on vols[1] instead.
vols[0].(*MockVolume).Touchable = false
- if err := PutBlock(TestBlock, TestHash); err != nil {
- t.Fatalf("PutBlock: %v", err)
+ if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+ t.Fatalf("PutBlock: n %d err %v", n, err)
}
vols[0].(*MockVolume).Touchable = true
f.Close()
ProcMounts = f.Name()
- var resultVols volumeSet
- added := resultVols.Discover()
+ resultVols := volumeSet{}
+ added := (&unixVolumeAdder{&resultVols}).Discover()
if added != len(resultVols) {
t.Errorf("Discover returned %d, but added %d volumes",
f.Close()
ProcMounts = f.Name()
- var resultVols volumeSet
- added := resultVols.Discover()
+ resultVols := volumeSet{}
+ added := (&unixVolumeAdder{&resultVols}).Discover()
if added != 0 || len(resultVols) != 0 {
t.Fatalf("got %d, %v; expected 0, []", added, resultVols)
}
// Put block
var PutContent = func(content []byte, locator string) (err error) {
- err = PutBlock(content, locator)
+ _, err = PutBlock(content, locator)
return
}
// will fail because it is full, but Mtime or Delete can
// succeed -- then Writable should return false.
Writable() bool
+
+ // Replication returns the storage redundancy of the
+ // underlying device. It will be passed on to clients in
+ // responses to PUT requests.
+ Replication() int
}
// A VolumeManager tells callers which volumes can read, which volumes
buf, err := v.Get(TestHash)
if err != nil {
- t.Error(err)
+ t.Fatal(err)
}
bufs.Put(buf)
// Put must not return a nil error unless it has
// overwritten the existing data.
if bytes.Compare(buf, testDataB) != 0 {
- t.Errorf("Put succeeded but Get returned %+v, expected %+v", buf, testDataB)
+ t.Errorf("Put succeeded but Get returned %+q, expected %+q", buf, testDataB)
}
} else {
// It is permissible for Put to fail, but it must
// leave us with either the original data, the new
// data, or nothing at all.
if getErr == nil && bytes.Compare(buf, testDataA) != 0 && bytes.Compare(buf, testDataB) != 0 {
- t.Errorf("Put failed but Get returned %+v, which is neither %+v nor %+v", buf, testDataA, testDataB)
+ t.Errorf("Put failed but Get returned %+q, which is neither %+q nor %+q", buf, testDataA, testDataB)
}
}
if getErr == nil {
data, err := v.Get(TestHash)
if err != nil {
t.Error(err)
- } else if bytes.Compare(data, TestBlock) != 0 {
- t.Errorf("Block present, but content is incorrect: Expected: %v Found: %v", data, TestBlock)
+ } else {
+ if bytes.Compare(data, TestBlock) != 0 {
+ t.Errorf("Block present, but got %+q, expected %+q", data, TestBlock)
+ }
+ bufs.Put(data)
}
- bufs.Put(data)
data, err = v.Get(TestHash2)
if err != nil {
t.Error(err)
- } else if bytes.Compare(data, TestBlock2) != 0 {
- t.Errorf("Block present, but content is incorrect: Expected: %v Found: %v", data, TestBlock2)
+ } else {
+ if bytes.Compare(data, TestBlock2) != 0 {
+ t.Errorf("Block present, but got %+q, expected %+q", data, TestBlock2)
+ }
+ bufs.Put(data)
}
- bufs.Put(data)
data, err = v.Get(TestHash3)
if err != nil {
t.Error(err)
- } else if bytes.Compare(data, TestBlock3) != 0 {
- t.Errorf("Block present, but content is incorrect: Expected: %v Found: %v", data, TestBlock3)
+ } else {
+ if bytes.Compare(data, TestBlock3) != 0 {
+ t.Errorf("Block present, but to %+q, expected %+q", data, TestBlock3)
+ }
+ bufs.Put(data)
}
- bufs.Put(data)
}
// testPutAndTouch
func testDeleteNewBlock(t *testing.T, factory TestableVolumeFactory) {
v := factory(t)
defer v.Teardown()
+ blobSignatureTTL = 300 * time.Second
if v.Writable() == false {
return
data, err := v.Get(TestHash)
if err != nil {
t.Error(err)
- } else if bytes.Compare(data, TestBlock) != 0 {
- t.Error("Block still present, but content is incorrect: %+v != %+v", data, TestBlock)
+ } else {
+ if bytes.Compare(data, TestBlock) != 0 {
+ t.Errorf("Got data %+q, expected %+q", data, TestBlock)
+ }
+ bufs.Put(data)
}
- bufs.Put(data)
}
// Calling Delete() for a block with a timestamp older than
func testDeleteOldBlock(t *testing.T, factory TestableVolumeFactory) {
v := factory(t)
defer v.Teardown()
+ blobSignatureTTL = 300 * time.Second
if v.Writable() == false {
return
}
v.Put(TestHash, TestBlock)
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL*time.Second))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
if err := v.Delete(TestHash); err != nil {
t.Error(err)
}
if _, err := v.Get(TestHash); err == nil || !os.IsNotExist(err) {
- t.Errorf("os.IsNotExist(%v) should have been true", err.Error())
+ t.Errorf("os.IsNotExist(%v) should have been true", err)
}
}
func (v *MockVolume) Writable() bool {
return !v.Readonly
}
+
+func (v *MockVolume) Replication() int {
+ return 1
+}
package main
import (
- "bytes"
+ "bufio"
+ "errors"
+ "flag"
"fmt"
"io"
"io/ioutil"
"time"
)
+type unixVolumeAdder struct {
+ *volumeSet
+}
+
+func (vs *unixVolumeAdder) Set(value string) error {
+ if dirs := strings.Split(value, ","); len(dirs) > 1 {
+ log.Print("DEPRECATED: using comma-separated volume list.")
+ for _, dir := range dirs {
+ if err := vs.Set(dir); err != nil {
+ return err
+ }
+ }
+ return nil
+ }
+ if len(value) == 0 || value[0] != '/' {
+ return errors.New("Invalid volume: must begin with '/'.")
+ }
+ if _, err := os.Stat(value); err != nil {
+ return err
+ }
+ var locker sync.Locker
+ if flagSerializeIO {
+ locker = &sync.Mutex{}
+ }
+ *vs.volumeSet = append(*vs.volumeSet, &UnixVolume{
+ root: value,
+ locker: locker,
+ readonly: flagReadonly,
+ })
+ return nil
+}
+
+func init() {
+ flag.Var(
+ &unixVolumeAdder{&volumes},
+ "volumes",
+ "Deprecated synonym for -volume.")
+ flag.Var(
+ &unixVolumeAdder{&volumes},
+ "volume",
+ "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
+}
+
+// Discover adds a UnixVolume for every directory named "keep" that is
+// located at the top level of a device- or tmpfs-backed mount point
+// other than "/". It returns the number of volumes added.
+func (vs *unixVolumeAdder) Discover() int {
+ added := 0
+ f, err := os.Open(ProcMounts)
+ if err != nil {
+ log.Fatalf("opening %s: %s", ProcMounts, err)
+ }
+ scanner := bufio.NewScanner(f)
+ for scanner.Scan() {
+ args := strings.Fields(scanner.Text())
+ if err := scanner.Err(); err != nil {
+ log.Fatalf("reading %s: %s", ProcMounts, err)
+ }
+ dev, mount := args[0], args[1]
+ if mount == "/" {
+ continue
+ }
+ if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
+ continue
+ }
+ keepdir := mount + "/keep"
+ if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
+ continue
+ }
+ // Set the -readonly flag (but only for this volume)
+ // if the filesystem is mounted readonly.
+ flagReadonlyWas := flagReadonly
+ for _, fsopt := range strings.Split(args[3], ",") {
+ if fsopt == "ro" {
+ flagReadonly = true
+ break
+ }
+ if fsopt == "rw" {
+ break
+ }
+ }
+ if err := vs.Set(keepdir); err != nil {
+ log.Printf("adding %q: %s", keepdir, err)
+ } else {
+ added++
+ }
+ flagReadonly = flagReadonlyWas
+ }
+ return added
+}
+
// A UnixVolume stores and retrieves blocks in a local directory.
type UnixVolume struct {
// path to the volume's root directory
// bytes.Compare(), but uses less memory.
func (v *UnixVolume) Compare(loc string, expect []byte) error {
path := v.blockPath(loc)
- stat, err := v.stat(path)
- if err != nil {
+ if _, err := v.stat(path); err != nil {
return err
}
- bufLen := 1 << 20
- if int64(bufLen) > stat.Size() {
- bufLen = int(stat.Size())
- if bufLen < 1 {
- // len(buf)==0 would prevent us from handling
- // empty files the same way as non-empty
- // files, because reading 0 bytes at a time
- // never reaches EOF.
- bufLen = 1
- }
- }
- cmp := expect
- buf := make([]byte, bufLen)
return v.getFunc(path, func(rdr io.Reader) error {
- // Loop invariants: all data read so far matched what
- // we expected, and the first N bytes of cmp are
- // expected to equal the next N bytes read from
- // reader.
- for {
- n, err := rdr.Read(buf)
- if n > len(cmp) || bytes.Compare(cmp[:n], buf[:n]) != 0 {
- return collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], buf[:n], rdr)
- }
- cmp = cmp[n:]
- if err == io.EOF {
- if len(cmp) != 0 {
- return collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], nil, nil)
- }
- return nil
- } else if err != nil {
- return err
- }
- }
+ return compareReaderWithBuf(rdr, expect, loc[:32])
})
}
return !v.readonly
}
+func (v *UnixVolume) Replication() int {
+ return 1
+}
+
// lockfile and unlockfile use flock(2) to manage kernel file locks.
func lockfile(f *os.File) error {
return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)