if not @results.nil?
@results.each(&block)
else
+ results = []
self.each_page do |items|
items.each do |i|
+ results << i
block.call i
end
end
+ # Cache results only if all were retrieved (block didn't raise
+ # an exception).
+ @results = results
end
self
end
<meta name="author" content="">
<% if current_user %>
<% content_for :js do %>
- window.defaultSession = <%=raw({baseURL: Rails.configuration.Services.Controller.ExternalURL.to_s, token: Thread.current[:arvados_api_token], user: current_user}.to_json)%>
+ window.defaultSession = <%=raw({baseURL: Rails.configuration.Services.Controller.ExternalURL.to_s.gsub(/\/?$/,'/'), token: Thread.current[:arvados_api_token], user: current_user}.to_json)%>
<% end %>
<% end %>
<% if current_user and $arvados_api_client.discovery[:websocketUrl] %>
assert_nil c.items_available
refute_empty c.results
end
+
+ test 'cache results across each(&block) calls' do
+ use_token :admin
+ c = Collection.where(owner_uuid: 'zzzzz-j7d0g-0201collections').with_count('none')
+ c.each do |x|
+ x.description = 'foo'
+ end
+ found = 0
+ c.each do |x|
+ found += 1
+ # We should get the same objects we modified in the loop above
+ # -- not new objects built from another set of API responses.
+ assert_equal 'foo', x.description
+ end
+ assert_equal 201, found
+ end
end
# needed for pkgdown, builds R SDK doc pages
which pandoc || fatal "No pandoc. Try: apt-get install pandoc"
fi
+ echo 'procs with /dev/fuse open:'
+ find /proc/*/fd -lname /dev/fuse 2>/dev/null | cut -d/ -f3 | xargs --no-run-if-empty ps -lywww
+ echo 'grep fuse /proc/self/mountinfo:'
+ grep fuse /proc/self/mountinfo
}
rotate_logfile() {
- User interface:
- install/install-sso.html.textile.liquid
- install/install-workbench-app.html.textile.liquid
+ - install/install-workbench2-app.html.textile.liquid
- install/install-composer.html.textile.liquid
- Additional services:
- install/install-ws.html.textile.liquid
---
layout: default
navsection: admin
-title: Using AWS Spot instances
+title: Using Preemptible instances
...
{% comment %}
SPDX-License-Identifier: CC-BY-SA-3.0
{% endcomment %}
-This page describes how to set up the system to take advantage of "Amazon's EC2 spot instances":https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-spot-instances.html.
+This page describes how to enable preemptible instances. Preemptible instances typically offer lower cost computation with a tradeoff of lower service guarantees. If a compute node is preempted, Arvados will restart the computation on a new instance.
-h3. Nodemanager
+Currently Arvados supports preemptible instances using AWS spot instances.
-Nodemanager should have configured cloud sizes that include the @preemptible@ boolean parameter. For example, for every on-demand cloud node size, you could create a @.spot@ variant, like this:
+h2. Configuration
-<pre>
-[Size m4.large]
-cores = 2
-scratch = 32000
-
-[Size m4.large.spot]
-cores = 2
-instance_type = m4.large
-preemptible = true
-scratch = 32000
-</pre>
-
-h3. Slurm dispatcher
-
-The @crunch-dispatch-slurm@ service needs a matching instance type configuration on @/etc/arvados/config.yml@, following the previous example:
+To use preemptible instances, set @UsePreemptibleInstances: true@ and add entries to @InstanceTypes@ with @Preemptible: true@ to @config.yml@. Typically you want to add both preemptible and non-preemptible entries for each cloud provider VM type. The @Price@ for preemptible instances is the maximum bid price, the actual price paid is dynamic and may be lower. For example:
<pre>
Clusters:
uuid_prefix:
+ Containers:
+ UsePreemptibleInstances: true
InstanceTypes:
- - Name: m4.large
- VCPUs: 2
- RAM: 7782000000
- Scratch: 32000000000
- Price: 0.1
- - Name: m4.large.spot
- Preemptible: true
- VCPUs: 2
- RAM: 7782000000
- Scratch: 32000000000
- Price: 0.1
+ m4.large:
+ Preemptible: false
+ ProviderType: m4.large
+ VCPUs: 2
+ RAM: 8GiB
+ AddedScratch: 32GB
+ Price: 0.1
+ m4.large.spot:
+ Preemptible: true
+ ProviderType: m4.large
+ VCPUs: 2
+ RAM: 8GiB
+ AddedScratch: 32GB
+ Price: 0.1
</pre>
-@InstanceType@ names should match those defined on nodemanager's config file because it's @crunch-dispatch-slurm@'s job to select the instance type and communicate the decision to @nodemanager@ via Slurm.
+When @UsePreemptibleInstances@ is enabled, child containers (workflow steps) will automatically be made preemptible. Note that because preempting the workflow runner would cancel the entire workflow, the workflow runner runs in a reserved (non-preemptible) instance.
-h3. API Server
+If you are using "crunch-dispatch-cloud":{{site.baseurl}}/install/install-dispatch-cloud.html no additional configuration is required.
-Container requests will need the @preemptible@ scheduling parameter included, to make the dispatcher request a spot instance. The API Server configuration file includes an option that when active, will auto assign the @preemptible@ parameter to any new child container request if it doesn't have it already. To activate this feature, the following should be added to the @application.yml@ file:
+If you are using the legacy Nodemanager, "see below":#nodemanager .
-<pre>
-preemptible_instances: true
-</pre>
+h2. Preemptible instances on AWS
-With this configuration active, child container requests should include the @preemptible = false@ parameter at creation time to avoid being scheduled for spot instance usage.
+For general information, see "using Amazon EC2 spot instances":https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-spot-instances.html .
-h3. AWS Permissions
+h3. Permissions
-When requesting spot instances, Amazon's API may return an authorization error depending on how users and permissions are set on the account. If this is the case check nodemanager's log for:
+When requesting spot instances, Amazon's API may return an authorization error depending on how users and permissions are set on the account. If this is the case check logs for this error:
<pre>
BaseHTTPError: AuthFailure.ServiceLinkedRoleCreationNotPermitted: The provided credentials do not have permission to create the service-linked role for EC2 Spot Instances.
Amazon's Spot instances prices are declared at instance request time and defined by the maximum price that the user is willing to pay per hour. By default, this price is the same amount as the on-demand version of each instance type, and this setting is the one that nodemanager uses for now, as it doesn't include any pricing data to the spot instance request.
-The real price that a spot instance has at any point in time is discovered at the end of each usage hour, depending on instance demand. For this reason, AWS provides a data feed subscription to get hourly logs, as described on "Amazon's User Guide":https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/spot-data-feeds.html.
\ No newline at end of file
+The real price that a spot instance has at any point in time is discovered at the end of each usage hour, depending on instance demand. For this reason, AWS provides a data feed subscription to get hourly logs, as described on "Amazon's User Guide":https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/spot-data-feeds.html.
+
+h2(#nodemanager). Nodemanager
+
+If you are using the legacy Nodemanager, its config file must also declare preemptible instance sizes, which must match the API server's @InstanceTypes@:
+
+<pre>
+[Size m4.large]
+cores = 2
+scratch = 32000
+
+[Size m4.large.spot]
+cores = 2
+instance_type = m4.large
+preemptible = true
+scratch = 32000
+</pre>
TODO: extract this information based on git commit messages and generate changelogs / release notes automatically.
{% endcomment %}
-table(table table-bordered table-condensed).
-|_. development|"master":#master|\3.|
-|_. latest stable|"v1.4.0":#v1_4_0|\3.|
-|_\5. past stable|
-|"v1.3.3":#v1_3_3|"v1.3.0":#v1_3_0|\3.|
-|"v1.2.1":#v1_2_1|"v1.2.0":#v1_2_0|\3.|
-|"v1.1.4":#v1_1_4|"v1.1.3":#v1_1_3|"v1.1.2":#v1_1_2|"v1.1.1":#v1_1_1|"v1.1.0":#v1_1_0|
-|\5. "older":#older|
-
h3(#master). development master (as of 2019-08-12)
h4. Delete "keep_services" records
|"Keep-balance":install-keep-balance.html |Storage cluster maintenance daemon responsible for moving blocks to their optimal server location, adjusting block replication levels, and trashing unreferenced blocks.|Required to free deleted data from underlying storage, and to ensure proper replication and block distribution (including support for storage classes).|
|\3=. *User interface*|
|"Single Sign On server":install-sso.html |Login server.|Required for web based login to Workbench.|
-|"Workbench":install-workbench-app.html |Primary graphical user interface for working with file collections and running containers.|Optional. Depends on API server, SSO server, keep-web, websockets server.|
+|"Workbench":install-workbench-app.html, "Workbench2":install-workbench2-app.html |Primary graphical user interface for working with file collections and running containers.|Optional. Depends on API server, SSO server, keep-web, websockets server.|
|"Workflow Composer":install-composer.html |Graphical user interface for editing Common Workflow Language workflows.|Optional. Depends on git server (arv-git-httpd).|
|\3=. *Additional services*|
|"Websockets server":install-ws.html |Event distribution server.|Required to view streaming container logs in Workbench.|
If you restrict access to your Arvados services based on network topology -- for example, your proxy server is not reachable from the public internet -- additional proxy configuration might be needed to thwart cross-site scripting attacks that would circumvent your restrictions. Read the "'Intranet mode' section of the Keep-web documentation":https://godoc.org/github.com/curoverse/arvados/services/keep-web#hdr-Intranet_mode now.
{% include 'notebox_end' %}
-h3. Configure DNS
+h3(#dns). Configure DNS
Configure your DNS servers so the following names resolve to your Nginx proxy's public IP address.
* @download.uuid_prefix.your.domain@
--- /dev/null
+---
+layout: default
+navsection: installguide
+title: Install Workbench2 (beta)
+...
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+Workbench2 is the web-based user interface for Arvados.
+
+{% include 'notebox_begin' %}
+Workbench2 is the replacement for Arvados Workbench. Workbench2 is currently in <i>beta</i>, it is not yet feature complete.
+{% include 'notebox_end' %}
+
+h2(#install_workbench). Install Workbench2 and dependencies
+
+Workbench2 does not require its own database. It is a set of html, javascript and css files that are served as static files from a web server like Nginx or Apache2.
+
+On a Debian-based system, install the following package:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo apt-get install arvados-workbench2</span>
+</code></pre>
+</notextile>
+
+On a Red Hat-based system, install the following package:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo yum install arvados-workbench2</span>
+</code></pre>
+</notextile>
+
+h2. Set up Web server
+
+For best performance, we recommend you use Nginx as your Web server to serve Workbench2. Workbench2 consists entirely of static files. To do that:
+
+<notextile>
+<ol>
+<li>Install Nginx</li>
+
+<li><p>Edit the http section of your Nginx configuration to serve Workbench2's files. You might add a block like the following, adding SSL and logging parameters to taste:</p>
+
+<pre><code>server {
+ listen <span class="userinput">[your public IP address]</span>:443 ssl;
+ server_name workbench2.<span class="userinput">uuid-prefix.your.domain</span>;
+
+ ssl on;
+ ssl_certificate <span class="userinput">/YOUR/PATH/TO/cert.pem</span>;
+ ssl_certificate_key <span class="userinput">/YOUR/PATH/TO/cert.key</span>;
+
+ index index.html;
+
+ # Workbench2 uses a call to /config.json to bootstrap itself and talk to the desired API server
+ location /config.json {
+ return 200 '{ "API_HOST": "<span class="userinput">uuid-prefix.your.domain</span>" }';
+ }
+
+ location / {
+ root /var/www/arvados-workbench2/workbench2;
+ index index.html;
+ try_files $uri $uri/ /index.html;
+ if (-f $document_root/maintenance.html) {
+ return 503;
+ }
+ }
+}
+</code></pre>
+</li>
+
+<li>Restart Nginx.</li>
+
+</ol>
+</notextile>
+
+h2. Trusted client setting
+
+Log in to Workbench2 once to ensure that the Arvados API server has a record of the Workbench2 client.
+
+In the <strong>API server</strong> project root, start the Rails console. {% include 'install_rails_command' %}
+
+At the console, enter the following commands to locate the ApiClient record for your Workbench2 installation (typically, while you're setting this up, the @last@ one in the database is the one you want), then set the @is_trusted@ flag for the appropriate client record:
+
+<notextile><pre><code>irb(main):001:0> <span class="userinput">wb = ApiClient.all.last; [wb.url_prefix, wb.created_at]</span>
+=> ["https://workbench2.<span class="userinput">uuid_prefix.your.domain</span>/", Sat, 20 Apr 2019 01:23:45 UTC +00:00]
+irb(main):002:0> <span class="userinput">include CurrentApiClient</span>
+=> true
+irb(main):003:0> <span class="userinput">act_as_system_user do wb.update_attributes!(is_trusted: true) end</span>
+=> true
+</code></pre>
+</notextile>
Login:
# These settings are provided by your OAuth2 provider (eg
# Google) used to perform upstream authentication.
- ProviderAppSecret: ""
ProviderAppID: ""
+ ProviderAppSecret: ""
+
+ # (Experimental) Authenticate with Google, bypassing the
+ # SSO-provider gateway service. Use the Google Cloud console to
+ # generate the Client ID and secret (APIs and Services >
+ # Credentials > Create credentials > OAuth client ID > Web
+ # application) and add your controller's /login URL (e.g.,
+ # "https://zzzzz.example.com/login") as an authorized redirect
+ # URL.
+ #
+ # Requires EnableBetaController14287. ProviderAppID must be
+ # blank.
+ GoogleClientID: ""
+ GoogleClientSecret: ""
# The cluster ID to delegate the user database. When set,
# logins on this cluster will be redirected to the login cluster
# Workbench welcome screen, this is HTML text that will be
# incorporated directly onto the page.
WelcomePageHTML: |
- <img src="arvados-logo-big.png" style="width: 20%; float: right; padding: 1em;" />
+ <img src="/arvados-logo-big.png" style="width: 20%; float: right; padding: 1em;" />
<h2>Please log in.</h2>
<p>The "Log in" button below will show you a sign-in
identification, and does not retrieve any other personal
information.</i>
+ InactivePageHTML: |
+ <img src="/arvados-logo-big.png" style="width: 20%; float: right; padding: 1em;" />
+ <h3>Hi! You're logged in, but...</h3>
+ <p>Your account is inactive.</p>
+ <p>An administrator must activate your account before you can get
+ any further.</p>
+
# Use experimental controller code (see https://dev.arvados.org/issues/14287)
EnableBetaController14287: false
"InstanceTypes.*": true,
"InstanceTypes.*.*": true,
"Login": true,
- "Login.ProviderAppSecret": false,
+ "Login.GoogleClientID": false,
+ "Login.GoogleClientSecret": false,
"Login.ProviderAppID": false,
+ "Login.ProviderAppSecret": false,
"Login.LoginCluster": true,
"Login.RemoteTokenRefresh": true,
"Mail": false,
"Workbench.UserProfileFormMessage": true,
"Workbench.VocabularyURL": true,
"Workbench.WelcomePageHTML": true,
+ "Workbench.InactivePageHTML": true,
}
func redactUnsafe(m map[string]interface{}, mPrefix, lookupPrefix string) error {
Login:
# These settings are provided by your OAuth2 provider (eg
# Google) used to perform upstream authentication.
- ProviderAppSecret: ""
ProviderAppID: ""
+ ProviderAppSecret: ""
+
+ # (Experimental) Authenticate with Google, bypassing the
+ # SSO-provider gateway service. Use the Google Cloud console to
+ # generate the Client ID and secret (APIs and Services >
+ # Credentials > Create credentials > OAuth client ID > Web
+ # application) and add your controller's /login URL (e.g.,
+ # "https://zzzzz.example.com/login") as an authorized redirect
+ # URL.
+ #
+ # Requires EnableBetaController14287. ProviderAppID must be
+ # blank.
+ GoogleClientID: ""
+ GoogleClientSecret: ""
# The cluster ID to delegate the user database. When set,
# logins on this cluster will be redirected to the login cluster
# Workbench welcome screen, this is HTML text that will be
# incorporated directly onto the page.
WelcomePageHTML: |
- <img src="arvados-logo-big.png" style="width: 20%; float: right; padding: 1em;" />
+ <img src="/arvados-logo-big.png" style="width: 20%; float: right; padding: 1em;" />
<h2>Please log in.</h2>
<p>The "Log in" button below will show you a sign-in
identification, and does not retrieve any other personal
information.</i>
+ InactivePageHTML: |
+ <img src="/arvados-logo-big.png" style="width: 20%; float: right; padding: 1em;" />
+ <h3>Hi! You're logged in, but...</h3>
+ <p>Your account is inactive.</p>
+ <p>An administrator must activate your account before you can get
+ any further.</p>
+
# Use experimental controller code (see https://dev.arvados.org/issues/14287)
EnableBetaController14287: false
`)
"strings"
"git.curoverse.com/arvados.git/lib/config"
- "git.curoverse.com/arvados.git/lib/controller/railsproxy"
+ "git.curoverse.com/arvados.git/lib/controller/localdb"
"git.curoverse.com/arvados.git/lib/controller/rpc"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/auth"
}
func New(cluster *arvados.Cluster) *Conn {
- local := railsproxy.NewConn(cluster)
+ local := localdb.NewConn(cluster)
remotes := map[string]backend{}
for id, remote := range cluster.RemoteClusters {
if !remote.Proxy {
return json.RawMessage(buf.Bytes()), err
}
+func (conn *Conn) Login(ctx context.Context, options arvados.LoginOptions) (arvados.LoginResponse, error) {
+ if id := conn.cluster.Login.LoginCluster; id != "" && id != conn.cluster.ClusterID {
+ // defer entire login procedure to designated cluster
+ remote, ok := conn.remotes[id]
+ if !ok {
+ return arvados.LoginResponse{}, fmt.Errorf("configuration problem: designated login cluster %q is not defined", id)
+ }
+ baseURL := remote.BaseURL()
+ target, err := baseURL.Parse(arvados.EndpointLogin.Path)
+ if err != nil {
+ return arvados.LoginResponse{}, fmt.Errorf("internal error getting redirect target: %s", err)
+ }
+ target.RawQuery = url.Values{
+ "return_to": []string{options.ReturnTo},
+ "remote": []string{options.Remote},
+ }.Encode()
+ return arvados.LoginResponse{
+ RedirectLocation: target.String(),
+ }, nil
+ } else {
+ return conn.local.Login(ctx, options)
+ }
+}
+
func (conn *Conn) CollectionGet(ctx context.Context, options arvados.GetOptions) (arvados.Collection, error) {
if len(options.UUID) == 27 {
// UUID is really a UUID
return conn.chooseBackend(options.UUID).APIClientAuthorizationCurrent(ctx, options)
}
-type backend interface{ arvados.API }
+type backend interface {
+ arvados.API
+ BaseURL() url.URL
+}
type notFoundError struct{}
}
}
+ if matchAllFilters == nil {
+ // Not filtering by UUID at all; just query the local
+ // cluster.
+ _, err := fn(ctx, conn.cluster.ClusterID, conn.local, opts)
+ return err
+ }
+
// Collate UUIDs in matchAllFilters by remote cluster ID --
// e.g., todoByRemote["aaaaa"]["aaaaa-4zz18-000000000000000"]
// will be true -- and count the total number of UUIDs we're
s.fed = New(s.cluster)
}
-func (s *FederationSuite) addDirectRemote(c *check.C, id string, backend arvados.API) {
+func (s *FederationSuite) addDirectRemote(c *check.C, id string, backend backend) {
s.cluster.RemoteClusters[id] = arvados.RemoteCluster{
Host: "in-process.local",
}
s.fed.remotes[id] = backend
}
-func (s *FederationSuite) addHTTPRemote(c *check.C, id string, backend arvados.API) {
+func (s *FederationSuite) addHTTPRemote(c *check.C, id string, backend backend) {
srv := httpserver.Server{Addr: ":"}
srv.Handler = router.New(backend)
c.Check(srv.Start(), check.IsNil)
if cl.MaxPageSize > 0 && len(resp.Items) >= cl.MaxPageSize {
break
}
+ if options.Limit >= 0 && len(resp.Items) >= options.Limit {
+ break
+ }
if cl.matchFilters(c, options.Filters) {
resp.Items = append(resp.Items, c)
}
expectStatus int
}
+func (s *CollectionListSuite) TestCollectionListNoUUIDFilters(c *check.C) {
+ s.test(c, listTrial{
+ count: "none",
+ limit: 1,
+ expectUUIDs: []string{s.uuids[0][0]},
+ expectCalls: []int{1, 0, 0},
+ })
+}
+
func (s *CollectionListSuite) TestCollectionListOneLocal(c *check.C) {
s.test(c, listTrial{
count: "none",
continue
}
opts := calls[0].Options.(arvados.ListOptions)
- c.Check(opts.Limit, check.Equals, -1)
+ c.Check(opts.Limit, check.Equals, trial.limit)
}
}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package federation
+
+import (
+ "context"
+ "net/url"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ check "gopkg.in/check.v1"
+)
+
+func (s *FederationSuite) TestDeferToLoginCluster(c *check.C) {
+ s.addHTTPRemote(c, "zhome", &arvadostest.APIStub{})
+ s.cluster.Login.LoginCluster = "zhome"
+
+ returnTo := "https://app.example.com/foo?bar"
+ for _, remote := range []string{"", "ccccc"} {
+ resp, err := s.fed.Login(context.Background(), arvados.LoginOptions{Remote: remote, ReturnTo: returnTo})
+ c.Check(err, check.IsNil)
+ c.Logf("remote %q -- RedirectLocation %q", remote, resp.RedirectLocation)
+ target, err := url.Parse(resp.RedirectLocation)
+ c.Check(err, check.IsNil)
+ c.Check(target.Host, check.Equals, s.cluster.RemoteClusters["zhome"].Host)
+ c.Check(target.Scheme, check.Equals, "http")
+ c.Check(target.Query().Get("remote"), check.Equals, remote)
+ c.Check(target.Query().Get("return_to"), check.Equals, returnTo)
+ }
+}
if h.Cluster.EnableBetaController14287 {
mux.Handle("/arvados/v1/collections", rtr)
mux.Handle("/arvados/v1/collections/", rtr)
+ mux.Handle("/login", rtr)
}
hs := http.NotFoundHandler()
}
func (s *HandlerSuite) TestProxyRedirect(c *check.C) {
+ s.cluster.Login.ProviderAppID = "test"
+ s.cluster.Login.ProviderAppSecret = "test"
req := httptest.NewRequest("GET", "https://0.0.0.0:1/login?return_to=foo", nil)
resp := httptest.NewRecorder()
s.handler.ServeHTTP(resp, req)
- c.Check(resp.Code, check.Equals, http.StatusFound)
- c.Check(resp.Header().Get("Location"), check.Matches, `https://0.0.0.0:1/auth/joshid\?return_to=%2Cfoo&?`)
+ if !c.Check(resp.Code, check.Equals, http.StatusFound) {
+ c.Log(resp.Body.String())
+ }
+ // Old "proxy entire request" code path returns an absolute
+ // URL. New lib/controller/federation code path returns a
+ // relative URL.
+ c.Check(resp.Header().Get("Location"), check.Matches, `(https://0.0.0.0:1)?/auth/joshid\?return_to=%2Cfoo&?`)
}
func (s *HandlerSuite) TestValidateV1APIToken(c *check.C) {
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package localdb
+
+import (
+ "context"
+ "errors"
+
+ "git.curoverse.com/arvados.git/lib/controller/railsproxy"
+ "git.curoverse.com/arvados.git/lib/controller/rpc"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type railsProxy = rpc.Conn
+
+type Conn struct {
+ cluster *arvados.Cluster
+ *railsProxy // handles API methods that aren't defined on Conn itself
+
+ googleLoginController
+}
+
+func NewConn(cluster *arvados.Cluster) *Conn {
+ return &Conn{
+ cluster: cluster,
+ railsProxy: railsproxy.NewConn(cluster),
+ }
+}
+
+func (conn *Conn) Login(ctx context.Context, opts arvados.LoginOptions) (arvados.LoginResponse, error) {
+ wantGoogle := conn.cluster.Login.GoogleClientID != ""
+ wantSSO := conn.cluster.Login.ProviderAppID != ""
+ if wantGoogle == wantSSO {
+ return arvados.LoginResponse{}, errors.New("configuration problem: exactly one of Login.GoogleClientID and Login.ProviderAppID must be configured")
+ } else if wantGoogle {
+ return conn.googleLoginController.Login(ctx, conn.cluster, conn.railsProxy, opts)
+ } else {
+ // Proxy to RailsAPI, which hands off to sso-provider.
+ return conn.railsProxy.Login(ctx, opts)
+ }
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package localdb
+
+import (
+ "bytes"
+ "context"
+ "crypto/hmac"
+ "crypto/sha256"
+ "encoding/base64"
+ "errors"
+ "fmt"
+ "net/url"
+ "strings"
+ "sync"
+ "text/template"
+ "time"
+
+ "git.curoverse.com/arvados.git/lib/controller/rpc"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/auth"
+ "github.com/coreos/go-oidc"
+ "golang.org/x/oauth2"
+)
+
+type googleLoginController struct {
+ issuer string // override OIDC issuer URL (normally https://accounts.google.com) for testing
+ provider *oidc.Provider
+ mu sync.Mutex
+}
+
+func (ctrl *googleLoginController) getProvider() (*oidc.Provider, error) {
+ ctrl.mu.Lock()
+ defer ctrl.mu.Unlock()
+ if ctrl.provider == nil {
+ issuer := ctrl.issuer
+ if issuer == "" {
+ issuer = "https://accounts.google.com"
+ }
+ provider, err := oidc.NewProvider(context.Background(), issuer)
+ if err != nil {
+ return nil, err
+ }
+ ctrl.provider = provider
+ }
+ return ctrl.provider, nil
+}
+
+func (ctrl *googleLoginController) Login(ctx context.Context, cluster *arvados.Cluster, railsproxy *railsProxy, opts arvados.LoginOptions) (arvados.LoginResponse, error) {
+ provider, err := ctrl.getProvider()
+ if err != nil {
+ return ctrl.loginError(fmt.Errorf("error setting up OpenID Connect provider: %s", err))
+ }
+ redirURL, err := (*url.URL)(&cluster.Services.Controller.ExternalURL).Parse("/login")
+ if err != nil {
+ return ctrl.loginError(fmt.Errorf("error making redirect URL: %s", err))
+ }
+ conf := &oauth2.Config{
+ ClientID: cluster.Login.GoogleClientID,
+ ClientSecret: cluster.Login.GoogleClientSecret,
+ Endpoint: provider.Endpoint(),
+ Scopes: []string{oidc.ScopeOpenID, "profile", "email"},
+ RedirectURL: redirURL.String(),
+ }
+ verifier := provider.Verifier(&oidc.Config{
+ ClientID: conf.ClientID,
+ })
+ if opts.State == "" {
+ // Initiate Google sign-in.
+ if opts.ReturnTo == "" {
+ return ctrl.loginError(errors.New("missing return_to parameter"))
+ }
+ me := url.URL(cluster.Services.Controller.ExternalURL)
+ callback, err := me.Parse("/" + arvados.EndpointLogin.Path)
+ if err != nil {
+ return ctrl.loginError(err)
+ }
+ conf.RedirectURL = callback.String()
+ state := ctrl.newOAuth2State([]byte(cluster.SystemRootToken), opts.Remote, opts.ReturnTo)
+ return arvados.LoginResponse{
+ RedirectLocation: conf.AuthCodeURL(state.String(),
+ // prompt=select_account tells Google
+ // to show the "choose which Google
+ // account" page, even if the client
+ // is currently logged in to exactly
+ // one Google account.
+ oauth2.SetAuthURLParam("prompt", "select_account")),
+ }, nil
+ } else {
+ // Callback after Google sign-in.
+ state := ctrl.parseOAuth2State(opts.State)
+ if !state.verify([]byte(cluster.SystemRootToken)) {
+ return ctrl.loginError(errors.New("invalid OAuth2 state"))
+ }
+ oauth2Token, err := conf.Exchange(ctx, opts.Code)
+ if err != nil {
+ return ctrl.loginError(fmt.Errorf("error in OAuth2 exchange: %s", err))
+ }
+ rawIDToken, ok := oauth2Token.Extra("id_token").(string)
+ if !ok {
+ return ctrl.loginError(errors.New("error in OAuth2 exchange: no ID token in OAuth2 token"))
+ }
+ idToken, err := verifier.Verify(ctx, rawIDToken)
+ if err != nil {
+ return ctrl.loginError(fmt.Errorf("error verifying ID token: %s", err))
+ }
+ var claims struct {
+ Name string `json:"name"`
+ Email string `json:"email"`
+ Verified bool `json:"email_verified"`
+ }
+ if err := idToken.Claims(&claims); err != nil {
+ return ctrl.loginError(fmt.Errorf("error extracting claims from ID token: %s", err))
+ }
+ if !claims.Verified {
+ return ctrl.loginError(errors.New("cannot authenticate using an unverified email address"))
+ }
+
+ firstname, lastname := strings.TrimSpace(claims.Name), ""
+ if names := strings.Fields(firstname); len(names) > 1 {
+ firstname = strings.Join(names[0:len(names)-1], " ")
+ lastname = names[len(names)-1]
+ }
+
+ ctxRoot := auth.NewContext(ctx, &auth.Credentials{Tokens: []string{cluster.SystemRootToken}})
+ return railsproxy.UserSessionCreate(ctxRoot, rpc.UserSessionCreateOptions{
+ ReturnTo: state.Remote + "," + state.ReturnTo,
+ AuthInfo: map[string]interface{}{
+ "email": claims.Email,
+ "first_name": firstname,
+ "last_name": lastname,
+ },
+ })
+ }
+}
+
+func (ctrl *googleLoginController) loginError(sendError error) (resp arvados.LoginResponse, err error) {
+ tmpl, err := template.New("error").Parse(`<h2>Login error:</h2><p>{{.}}</p>`)
+ if err != nil {
+ return
+ }
+ err = tmpl.Execute(&resp.HTML, sendError.Error())
+ return
+}
+
+func (ctrl *googleLoginController) newOAuth2State(key []byte, remote, returnTo string) oauth2State {
+ s := oauth2State{
+ Time: time.Now().Unix(),
+ Remote: remote,
+ ReturnTo: returnTo,
+ }
+ s.HMAC = s.computeHMAC(key)
+ return s
+}
+
+type oauth2State struct {
+ HMAC []byte // hash of other fields; see computeHMAC()
+ Time int64 // creation time (unix timestamp)
+ Remote string // remote cluster if requesting a salted token, otherwise blank
+ ReturnTo string // redirect target
+}
+
+func (ctrl *googleLoginController) parseOAuth2State(encoded string) (s oauth2State) {
+ // Errors are not checked. If decoding/parsing fails, the
+ // token will be rejected by verify().
+ decoded, _ := base64.RawURLEncoding.DecodeString(encoded)
+ f := strings.Split(string(decoded), "\n")
+ if len(f) != 4 {
+ return
+ }
+ fmt.Sscanf(f[0], "%x", &s.HMAC)
+ fmt.Sscanf(f[1], "%x", &s.Time)
+ fmt.Sscanf(f[2], "%s", &s.Remote)
+ fmt.Sscanf(f[3], "%s", &s.ReturnTo)
+ return
+}
+
+func (s oauth2State) verify(key []byte) bool {
+ if delta := time.Now().Unix() - s.Time; delta < 0 || delta > 300 {
+ return false
+ }
+ return hmac.Equal(s.computeHMAC(key), s.HMAC)
+}
+
+func (s oauth2State) String() string {
+ var buf bytes.Buffer
+ enc := base64.NewEncoder(base64.RawURLEncoding, &buf)
+ fmt.Fprintf(enc, "%x\n%x\n%s\n%s", s.HMAC, s.Time, s.Remote, s.ReturnTo)
+ enc.Close()
+ return buf.String()
+}
+
+func (s oauth2State) computeHMAC(key []byte) []byte {
+ mac := hmac.New(sha256.New, key)
+ fmt.Fprintf(mac, "%x %s %s", s.Time, s.Remote, s.ReturnTo)
+ return mac.Sum(nil)
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package localdb
+
+import (
+ "bytes"
+ "context"
+ "crypto/rand"
+ "crypto/rsa"
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "net/http/httptest"
+ "net/url"
+ "strings"
+ "testing"
+ "time"
+
+ "git.curoverse.com/arvados.git/lib/config"
+ "git.curoverse.com/arvados.git/lib/controller/rpc"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/auth"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+ check "gopkg.in/check.v1"
+ jose "gopkg.in/square/go-jose.v2"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+ check.TestingT(t)
+}
+
+var _ = check.Suite(&LoginSuite{})
+
+type LoginSuite struct {
+ cluster *arvados.Cluster
+ ctx context.Context
+ localdb *Conn
+ railsSpy *arvadostest.Proxy
+ fakeIssuer *httptest.Server
+ issuerKey *rsa.PrivateKey
+
+ // expected token request
+ validCode string
+ // desired response from token endpoint
+ authEmail string
+ authEmailVerified bool
+ authName string
+}
+
+func (s *LoginSuite) SetUpTest(c *check.C) {
+ var err error
+ s.issuerKey, err = rsa.GenerateKey(rand.Reader, 2048)
+ c.Assert(err, check.IsNil)
+
+ s.authEmail = "active-user@arvados.local"
+ s.authEmailVerified = true
+ s.authName = "Fake User Name"
+ s.fakeIssuer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ req.ParseForm()
+ c.Logf("fakeIssuer: got req: %s %s %s", req.Method, req.URL, req.Form)
+ w.Header().Set("Content-Type", "application/json")
+ switch req.URL.Path {
+ case "/.well-known/openid-configuration":
+ json.NewEncoder(w).Encode(map[string]interface{}{
+ "issuer": s.fakeIssuer.URL,
+ "authorization_endpoint": s.fakeIssuer.URL + "/auth",
+ "token_endpoint": s.fakeIssuer.URL + "/token",
+ "jwks_uri": s.fakeIssuer.URL + "/jwks",
+ "userinfo_endpoint": s.fakeIssuer.URL + "/userinfo",
+ })
+ case "/token":
+ if req.Form.Get("code") != s.validCode || s.validCode == "" {
+ w.WriteHeader(http.StatusUnauthorized)
+ return
+ }
+ idToken, _ := json.Marshal(map[string]interface{}{
+ "iss": s.fakeIssuer.URL,
+ "aud": []string{"test%client$id"},
+ "sub": "fake-user-id",
+ "exp": time.Now().UTC().Add(time.Minute).UnixNano(),
+ "iat": time.Now().UTC().UnixNano(),
+ "nonce": "fake-nonce",
+ "email": s.authEmail,
+ "email_verified": s.authEmailVerified,
+ "name": s.authName,
+ })
+ json.NewEncoder(w).Encode(struct {
+ AccessToken string `json:"access_token"`
+ TokenType string `json:"token_type"`
+ RefreshToken string `json:"refresh_token"`
+ ExpiresIn int32 `json:"expires_in"`
+ IDToken string `json:"id_token"`
+ }{
+ AccessToken: s.fakeToken(c, []byte("fake access token")),
+ TokenType: "Bearer",
+ RefreshToken: "test-refresh-token",
+ ExpiresIn: 30,
+ IDToken: s.fakeToken(c, idToken),
+ })
+ case "/jwks":
+ json.NewEncoder(w).Encode(jose.JSONWebKeySet{
+ Keys: []jose.JSONWebKey{
+ {Key: s.issuerKey.Public(), Algorithm: string(jose.RS256), KeyID: ""},
+ },
+ })
+ case "/auth":
+ w.WriteHeader(http.StatusInternalServerError)
+ case "/userinfo":
+ w.WriteHeader(http.StatusInternalServerError)
+ default:
+ w.WriteHeader(http.StatusNotFound)
+ }
+ }))
+
+ cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+ s.cluster, err = cfg.GetCluster("")
+ s.cluster.Login.GoogleClientID = "test%client$id"
+ s.cluster.Login.GoogleClientSecret = "test#client/secret"
+ c.Assert(err, check.IsNil)
+
+ s.localdb = NewConn(s.cluster)
+ s.localdb.googleLoginController.issuer = s.fakeIssuer.URL
+
+ s.railsSpy = arvadostest.NewProxy(c, s.cluster.Services.RailsAPI)
+ s.localdb.railsProxy = rpc.NewConn(s.cluster.ClusterID, s.railsSpy.URL, true, rpc.PassthroughTokenProvider)
+}
+
+func (s *LoginSuite) TearDownTest(c *check.C) {
+ s.railsSpy.Close()
+}
+
+func (s *LoginSuite) TestGoogleLoginStart_Bogus(c *check.C) {
+ resp, err := s.localdb.Login(context.Background(), arvados.LoginOptions{})
+ c.Check(err, check.IsNil)
+ c.Check(resp.RedirectLocation, check.Equals, "")
+ c.Check(resp.HTML.String(), check.Matches, `.*missing return_to parameter.*`)
+}
+
+func (s *LoginSuite) TestGoogleLoginStart(c *check.C) {
+ for _, remote := range []string{"", "zzzzz"} {
+ resp, err := s.localdb.Login(context.Background(), arvados.LoginOptions{Remote: remote, ReturnTo: "https://app.example.com/foo?bar"})
+ c.Check(err, check.IsNil)
+ target, err := url.Parse(resp.RedirectLocation)
+ c.Check(err, check.IsNil)
+ issuerURL, _ := url.Parse(s.fakeIssuer.URL)
+ c.Check(target.Host, check.Equals, issuerURL.Host)
+ q := target.Query()
+ c.Check(q.Get("client_id"), check.Equals, "test%client$id")
+ state := s.localdb.googleLoginController.parseOAuth2State(q.Get("state"))
+ c.Check(state.verify([]byte(s.cluster.SystemRootToken)), check.Equals, true)
+ c.Check(state.Time, check.Not(check.Equals), 0)
+ c.Check(state.Remote, check.Equals, remote)
+ c.Check(state.ReturnTo, check.Equals, "https://app.example.com/foo?bar")
+ }
+}
+
+func (s *LoginSuite) TestGoogleLoginSuccess(c *check.C) {
+ // Initiate login, but instead of following the redirect to
+ // the provider, just grab state from the redirect URL.
+ resp, err := s.localdb.Login(context.Background(), arvados.LoginOptions{ReturnTo: "https://app.example.com/foo?bar"})
+ c.Check(err, check.IsNil)
+ target, err := url.Parse(resp.RedirectLocation)
+ c.Check(err, check.IsNil)
+ state := target.Query().Get("state")
+ c.Check(state, check.Not(check.Equals), "")
+
+ // Prime the fake issuer with a valid code.
+ s.validCode = fmt.Sprintf("abcdefgh-%d", time.Now().Unix())
+
+ // Callback with invalid code.
+ resp, err = s.localdb.Login(context.Background(), arvados.LoginOptions{
+ Code: "first-try-a-bogus-code",
+ State: state,
+ })
+ c.Check(err, check.IsNil)
+ c.Check(resp.RedirectLocation, check.Equals, "")
+ c.Check(resp.HTML.String(), check.Matches, `(?ms).*error in OAuth2 exchange.*cannot fetch token.*`)
+
+ // Callback with invalid state.
+ resp, err = s.localdb.Login(context.Background(), arvados.LoginOptions{
+ Code: s.validCode,
+ State: "bogus-state",
+ })
+ c.Check(err, check.IsNil)
+ c.Check(resp.RedirectLocation, check.Equals, "")
+ c.Check(resp.HTML.String(), check.Matches, `(?ms).*invalid OAuth2 state.*`)
+
+ // Callback with valid code and state.
+ resp, err = s.localdb.Login(context.Background(), arvados.LoginOptions{
+ Code: s.validCode,
+ State: state,
+ })
+ c.Check(err, check.IsNil)
+ c.Check(resp.HTML.String(), check.Equals, "")
+ c.Check(resp.RedirectLocation, check.Not(check.Equals), "")
+ target, err = url.Parse(resp.RedirectLocation)
+ c.Check(err, check.IsNil)
+ c.Check(target.Host, check.Equals, "app.example.com")
+ c.Check(target.Path, check.Equals, "/foo")
+ token := target.Query().Get("api_token")
+ c.Check(token, check.Matches, `v2/zzzzz-gj3su-.{15}/.{32,50}`)
+
+ foundCallback := false
+ for _, dump := range s.railsSpy.RequestDumps {
+ c.Logf("spied request: %q", dump)
+ split := bytes.Split(dump, []byte("\r\n\r\n"))
+ c.Assert(split, check.HasLen, 2)
+ hdr, body := string(split[0]), string(split[1])
+ if strings.Contains(hdr, "POST /auth/controller/callback") {
+ vs, err := url.ParseQuery(body)
+ var authinfo map[string]interface{}
+ c.Check(json.Unmarshal([]byte(vs.Get("auth_info")), &authinfo), check.IsNil)
+ c.Check(err, check.IsNil)
+ c.Check(authinfo["first_name"], check.Equals, "Fake User")
+ c.Check(authinfo["last_name"], check.Equals, "Name")
+ c.Check(authinfo["email"], check.Equals, "active-user@arvados.local")
+ foundCallback = true
+ }
+ }
+ c.Check(foundCallback, check.Equals, true)
+
+ // Try using the returned Arvados token.
+ c.Logf("trying an API call with new token %q", token)
+ ctx := auth.NewContext(context.Background(), &auth.Credentials{Tokens: []string{token}})
+ cl, err := s.localdb.CollectionList(ctx, arvados.ListOptions{Limit: -1})
+ c.Check(cl.ItemsAvailable, check.Not(check.Equals), 0)
+ c.Check(cl.Items, check.Not(check.HasLen), 0)
+ c.Check(err, check.IsNil)
+
+ // Might as well check that bogus tokens aren't accepted.
+ badtoken := token + "plussomeboguschars"
+ c.Logf("trying an API call with mangled token %q", badtoken)
+ ctx = auth.NewContext(context.Background(), &auth.Credentials{Tokens: []string{badtoken}})
+ cl, err = s.localdb.CollectionList(ctx, arvados.ListOptions{Limit: -1})
+ c.Check(cl.Items, check.HasLen, 0)
+ c.Check(err, check.NotNil)
+ c.Check(err, check.ErrorMatches, `.*401 Unauthorized: Not logged in.*`)
+}
+
+func (s *LoginSuite) fakeToken(c *check.C, payload []byte) string {
+ signer, err := jose.NewSigner(jose.SigningKey{Algorithm: jose.RS256, Key: s.issuerKey}, nil)
+ if err != nil {
+ c.Error(err)
+ }
+ object, err := signer.Sign(payload)
+ if err != nil {
+ c.Error(err)
+ }
+ t, err := object.CompactSerialize()
+ if err != nil {
+ c.Error(err)
+ }
+ c.Logf("fakeToken(%q) == %q", payload, t)
+ return t
+}
package railsproxy
import (
- "context"
- "errors"
"fmt"
+ "net/http"
"net/url"
"strings"
"git.curoverse.com/arvados.git/lib/controller/rpc"
"git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/auth"
)
// For now, FindRailsAPI always uses the rails API running on this
if err != nil {
panic(err)
}
- return rpc.NewConn(cluster.ClusterID, url, insecure, provideIncomingToken)
-}
-
-func provideIncomingToken(ctx context.Context) ([]string, error) {
- incoming, ok := auth.FromContext(ctx)
- if !ok {
- return nil, errors.New("no token provided")
- }
- return incoming.Tokens, nil
+ conn := rpc.NewConn(cluster.ClusterID, url, insecure, rpc.PassthroughTokenProvider)
+ // If Rails is running with force_ssl=true, this
+ // "X-Forwarded-Proto: https" header prevents it from
+ // redirecting our internal request to an invalid https URL.
+ conn.SendHeader = http.Header{"X-Forwarded-Proto": []string{"https"}}
+ return conn
}
return selected
}
-func (rtr *router) sendResponse(w http.ResponseWriter, resp interface{}, opts responseOptions) {
+func (rtr *router) sendResponse(w http.ResponseWriter, req *http.Request, resp interface{}, opts responseOptions) {
var tmp map[string]interface{}
+ if resp, ok := resp.(http.Handler); ok {
+ // resp knows how to write its own http response
+ // header and body.
+ resp.ServeHTTP(w, req)
+ return
+ }
+
err := rtr.transcode(resp, &tmp)
if err != nil {
rtr.sendError(w, err)
}
}
w.Header().Set("Content-Type", "application/json")
- json.NewEncoder(w).Encode(tmp)
+ enc := json.NewEncoder(w)
+ enc.SetEscapeHTML(false)
+ enc.Encode(tmp)
}
func (rtr *router) sendError(w http.ResponseWriter, err error) {
return rtr.fed.ConfigGet(ctx)
},
},
+ {
+ arvados.EndpointLogin,
+ func() interface{} { return &arvados.LoginOptions{} },
+ func(ctx context.Context, opts interface{}) (interface{}, error) {
+ return rtr.fed.Login(ctx, *opts.(*arvados.LoginOptions))
+ },
+ },
{
arvados.EndpointCollectionCreate,
func() interface{} { return &arvados.CreateOptions{} },
rtr.sendError(w, err)
return
}
- rtr.sendResponse(w, resp, respOpts)
+ rtr.sendResponse(w, req, resp, respOpts)
})
}
}
type Conn struct {
+ SendHeader http.Header
clusterID string
httpClient http.Client
baseURL url.URL
}
}
return &Conn{
- clusterID: clusterID,
- httpClient: http.Client{Transport: transport},
+ clusterID: clusterID,
+ httpClient: http.Client{
+ CheckRedirect: func(req *http.Request, via []*http.Request) error { return http.ErrUseLastResponse },
+ Transport: transport,
+ },
baseURL: *url,
tokenProvider: tp,
}
func (conn *Conn) requestAndDecode(ctx context.Context, dst interface{}, ep arvados.APIEndpoint, body io.Reader, opts interface{}) error {
aClient := arvados.Client{
- Client: &conn.httpClient,
- Scheme: conn.baseURL.Scheme,
- APIHost: conn.baseURL.Host,
+ Client: &conn.httpClient,
+ Scheme: conn.baseURL.Scheme,
+ APIHost: conn.baseURL.Host,
+ SendHeader: conn.SendHeader,
}
tokens, err := conn.tokenProvider(ctx)
if err != nil {
return aClient.RequestAndDecodeContext(ctx, dst, ep.Method, path, body, params)
}
+func (conn *Conn) BaseURL() url.URL {
+ return conn.baseURL
+}
+
func (conn *Conn) ConfigGet(ctx context.Context) (json.RawMessage, error) {
ep := arvados.EndpointConfigGet
var resp json.RawMessage
return resp, err
}
+func (conn *Conn) Login(ctx context.Context, options arvados.LoginOptions) (arvados.LoginResponse, error) {
+ ep := arvados.EndpointLogin
+ var resp arvados.LoginResponse
+ err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
+ resp.RedirectLocation = conn.relativeToBaseURL(resp.RedirectLocation)
+ return resp, err
+}
+
+// If the given location is a valid URL and its origin is the same as
+// conn.baseURL, return it as a relative URL. Otherwise, return it
+// unmodified.
+func (conn *Conn) relativeToBaseURL(location string) string {
+ u, err := url.Parse(location)
+ if err == nil && u.Scheme == conn.baseURL.Scheme && strings.ToLower(u.Host) == strings.ToLower(conn.baseURL.Host) {
+ u.Opaque = ""
+ u.Scheme = ""
+ u.User = nil
+ u.Host = ""
+ return u.String()
+ } else {
+ return location
+ }
+}
+
func (conn *Conn) CollectionCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Collection, error) {
ep := arvados.EndpointCollectionCreate
var resp arvados.Collection
err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
return resp, err
}
+
+type UserSessionCreateOptions struct {
+ AuthInfo map[string]interface{} `json:"auth_info"`
+ ReturnTo string `json:"return_to"`
+}
+
+func (conn *Conn) UserSessionCreate(ctx context.Context, options UserSessionCreateOptions) (arvados.LoginResponse, error) {
+ ep := arvados.APIEndpoint{Method: "POST", Path: "auth/controller/callback"}
+ var resp arvados.LoginResponse
+ err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
+ return resp, err
+}
"syscall"
"time"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/sirupsen/logrus"
)
type remoteRunner struct {
uuid string
executor Executor
- arvClient *arvados.Client
+ envJSON json.RawMessage
remoteUser string
timeoutTERM time.Duration
timeoutSignal time.Duration
// newRemoteRunner returns a new remoteRunner. Caller should ensure
// Close() is called to release resources.
func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
+ // Send the instance type record as a JSON doc so crunch-run
+ // can log it.
+ var instJSON bytes.Buffer
+ enc := json.NewEncoder(&instJSON)
+ enc.SetIndent("", " ")
+ if err := enc.Encode(wkr.instType); err != nil {
+ panic(err)
+ }
+ env := map[string]string{
+ "ARVADOS_API_HOST": wkr.wp.arvClient.APIHost,
+ "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
+ "InstanceType": instJSON.String(),
+ }
+ if wkr.wp.arvClient.Insecure {
+ env["ARVADOS_API_HOST_INSECURE"] = "1"
+ }
+ envJSON, err := json.Marshal(env)
+ if err != nil {
+ panic(err)
+ }
rr := &remoteRunner{
uuid: uuid,
executor: wkr.executor,
- arvClient: wkr.wp.arvClient,
+ envJSON: envJSON,
remoteUser: wkr.instance.RemoteUser(),
timeoutTERM: wkr.wp.timeoutTERM,
timeoutSignal: wkr.wp.timeoutSignal,
// assume the remote process _might_ have started, at least until it
// probes the worker and finds otherwise.
func (rr *remoteRunner) Start() {
- env := map[string]string{
- "ARVADOS_API_HOST": rr.arvClient.APIHost,
- "ARVADOS_API_TOKEN": rr.arvClient.AuthToken,
- }
- if rr.arvClient.Insecure {
- env["ARVADOS_API_HOST_INSECURE"] = "1"
- }
- envJSON, err := json.Marshal(env)
- if err != nil {
- panic(err)
- }
- stdin := bytes.NewBuffer(envJSON)
cmd := "crunch-run --detach --stdin-env '" + rr.uuid + "'"
if rr.remoteUser != "root" {
cmd = "sudo " + cmd
}
+ stdin := bytes.NewBuffer(rr.envJSON)
stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin)
if err != nil {
rr.logger.WithField("stdout", string(stdout)).
bootTimeout := time.Minute
probeTimeout := time.Second
+ ac := arvados.NewClientFromEnv()
is, err := (&test.StubDriver{}).InstanceSet(nil, "test-instance-set-id", nil, logger)
c.Assert(err, check.IsNil)
inst, err := is.Create(arvados.InstanceType{}, "", nil, "echo InitCommand", nil)
"crunch-run --list": trial.respRun,
}
wp := &Pool{
+ arvClient: ac,
newExecutor: func(cloud.Instance) Executor { return exr },
bootProbeCommand: "bootprobe",
timeoutBooting: bootTimeout,
from ._version import __version__
from .executor import ArvCwlExecutor
-# These arn't used directly in this file but
+# These aren't used directly in this file but
# other code expects to import them from here
from .arvcontainer import ArvadosContainer
from .arvtool import ArvadosCommandTool
"ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
"API": True
},
- "use_existing": self.enable_reuse,
+ "use_existing": False, # Never reuse the runner container - see #15497.
"properties": {}
}
'vcpus': 1,
'ram': (1024+256)*1024*1024
},
- 'use_existing': True,
+ 'use_existing': False,
'properties': {},
'secret_mounts': {}
}
'vcpus': 1,
'ram': 1342177280
},
- 'use_existing': True,
+ 'use_existing': False,
'properties': {},
'secret_mounts': {}
}
'vcpus': 1,
'ram': 1342177280
},
- 'use_existing': True,
+ 'use_existing': False,
'properties': {
"template_uuid": "962eh-7fd4e-gkbzl62qqtfig37"
},
}
},
"state": "Committed",
- "use_existing": True
+ "use_existing": False
}
stubs.api.container_requests().create.assert_called_with(
var (
EndpointConfigGet = APIEndpoint{"GET", "arvados/v1/config", ""}
+ EndpointLogin = APIEndpoint{"GET", "login", ""}
EndpointCollectionCreate = APIEndpoint{"POST", "arvados/v1/collections", "collection"}
EndpointCollectionUpdate = APIEndpoint{"PATCH", "arvados/v1/collections/:uuid", "collection"}
EndpointCollectionGet = APIEndpoint{"GET", "arvados/v1/collections/:uuid", ""}
UUID string `json:"uuid"`
}
+type LoginOptions struct {
+ ReturnTo string `json:"return_to"` // On success, redirect to this target with api_token=xxx query param
+ Remote string `json:"remote,omitempty"` // Salt token for remote Cluster ID
+ Code string `json:"code,omitempty"` // OAuth2 callback code
+ State string `json:"state,omitempty"` // OAuth2 callback state
+}
+
type API interface {
ConfigGet(ctx context.Context) (json.RawMessage, error)
+ Login(ctx context.Context, options LoginOptions) (LoginResponse, error)
CollectionCreate(ctx context.Context, options CreateOptions) (Collection, error)
CollectionUpdate(ctx context.Context, options UpdateOptions) (Collection, error)
CollectionGet(ctx context.Context, options GetOptions) (Collection, error)
// arvadosclient.ArvadosClient.)
KeepServiceURIs []string `json:",omitempty"`
+ // HTTP headers to add/override in outgoing requests.
+ SendHeader http.Header
+
dd *DiscoveryDocument
ctx context.Context
return c.httpClient().Do(req)
}
+func isRedirectStatus(code int) bool {
+ switch code {
+ case http.StatusMovedPermanently, http.StatusFound, http.StatusSeeOther, http.StatusTemporaryRedirect, http.StatusPermanentRedirect:
+ return true
+ default:
+ return false
+ }
+}
+
// DoAndDecode performs req and unmarshals the response (which must be
// JSON) into dst. Use this instead of RequestAndDecode if you need
// more control of the http.Request object.
+//
+// If the response status indicates an HTTP redirect, the Location
+// header value is unmarshalled to dst as a RedirectLocation
+// key/field.
func (c *Client) DoAndDecode(dst interface{}, req *http.Request) error {
resp, err := c.Do(req)
if err != nil {
if err != nil {
return err
}
- if resp.StatusCode != 200 {
- return newTransactionError(req, resp, buf)
- }
- if dst == nil {
+ switch {
+ case resp.StatusCode == http.StatusOK && dst == nil:
return nil
+ case resp.StatusCode == http.StatusOK:
+ return json.Unmarshal(buf, dst)
+
+ // If the caller uses a client with a custom CheckRedirect
+ // func, Do() might return the 3xx response instead of
+ // following it.
+ case isRedirectStatus(resp.StatusCode) && dst == nil:
+ return nil
+ case isRedirectStatus(resp.StatusCode):
+ // Copy the redirect target URL to dst.RedirectLocation.
+ buf, err := json.Marshal(map[string]string{"RedirectLocation": resp.Header.Get("Location")})
+ if err != nil {
+ return err
+ }
+ return json.Unmarshal(buf, dst)
+
+ default:
+ return newTransactionError(req, resp, buf)
}
- return json.Unmarshal(buf, dst)
}
// Convert an arbitrary struct to url.Values. For example,
}
req = req.WithContext(ctx)
req.Header.Set("Content-type", "application/x-www-form-urlencoded")
+ for k, v := range c.SendHeader {
+ req.Header[k] = v
+ }
return c.DoAndDecode(dst, req)
}
Repositories string
}
Login struct {
- ProviderAppSecret string
+ GoogleClientID string
+ GoogleClientSecret string
ProviderAppID string
+ ProviderAppSecret string
LoginCluster string
RemoteTokenRefresh Duration
}
UserProfileFormMessage string
VocabularyURL string
WelcomePageHTML string
+ InactivePageHTML string
}
EnableBetaController14287 bool
// while locking multiple inodes.
locker() sync.Locker
+ // throttle for limiting concurrent background writers
+ throttle() *throttle
+
// create a new node with nil parent.
newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error)
Remove(name string) error
RemoveAll(name string) error
Rename(oldname, newname string) error
+
+ // Write buffered data from memory to storage, returning when
+ // all updates have been saved to persistent storage.
Sync() error
+
+ // Write buffered data from memory to storage, but don't wait
+ // for all writes to finish before returning. If shortBlocks
+ // is true, flush everything; otherwise, if there's less than
+ // a full block of buffered data at the end of a stream, leave
+ // it buffered in memory in case more data can be appended. If
+ // path is "", flush all dirs/streams; otherwise, flush only
+ // the specified dir/stream.
+ Flush(path string, shortBlocks bool) error
}
type inode interface {
root inode
fsBackend
mutex sync.Mutex
+ thr *throttle
}
func (fs *fileSystem) rootnode() inode {
return fs.root
}
+func (fs *fileSystem) throttle() *throttle {
+ return fs.thr
+}
+
func (fs *fileSystem) locker() sync.Locker {
return &fs.mutex
}
return ErrInvalidOperation
}
+func (fs *fileSystem) Flush(string, bool) error {
+ log.Printf("TODO: flush fileSystem")
+ return ErrInvalidOperation
+}
+
// rlookup (recursive lookup) returns the inode for the file/directory
// with the given name (which may contain "/" separators). If no such
// file/directory exists, the returned node is nil.
var (
maxBlockSize = 1 << 26
- concurrentWriters = 4 // max goroutines writing to Keep during sync()
- writeAheadBlocks = 1 // max background jobs flushing to Keep before blocking writes
+ concurrentWriters = 4 // max goroutines writing to Keep in background and during flush()
)
// A CollectionFileSystem is a FileSystem that can be serialized as a
// Total data bytes in all files.
Size() int64
+
+ // Memory consumed by buffered file data.
+ memorySize() int64
}
type collectionFileSystem struct {
uuid: c.UUID,
fileSystem: fileSystem{
fsBackend: keepBackend{apiClient: client, keepClient: kc},
+ thr: newThrottle(concurrentWriters),
},
}
root := &dirnode{
return nil
}
+func (fs *collectionFileSystem) Flush(path string, shortBlocks bool) error {
+ node, err := rlookup(fs.fileSystem.root, path)
+ if err != nil {
+ return err
+ }
+ dn, ok := node.(*dirnode)
+ if !ok {
+ return ErrNotADirectory
+ }
+ dn.Lock()
+ defer dn.Unlock()
+ names := dn.sortedNames()
+ if path != "" {
+ // Caller only wants to flush the specified dir,
+ // non-recursively. Drop subdirs from the list of
+ // names.
+ var filenames []string
+ for _, name := range names {
+ if _, ok := dn.inodes[name].(*filenode); ok {
+ filenames = append(filenames, name)
+ }
+ }
+ names = filenames
+ }
+ for _, name := range names {
+ child := dn.inodes[name]
+ child.Lock()
+ defer child.Unlock()
+ }
+ return dn.flush(context.TODO(), names, flushOpts{sync: false, shortBlocks: shortBlocks})
+}
+
+func (fs *collectionFileSystem) memorySize() int64 {
+ fs.fileSystem.root.Lock()
+ defer fs.fileSystem.root.Unlock()
+ return fs.fileSystem.root.(*dirnode).memorySize()
+}
+
func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
fs.fileSystem.root.Lock()
defer fs.fileSystem.root.Unlock()
- return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix, newThrottle(concurrentWriters))
+ return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix)
}
func (fs *collectionFileSystem) Size() int64 {
memsize int64 // bytes in memSegments
sync.RWMutex
nullnode
- throttle *throttle
}
// caller must have lock
// Write some data out to disk to reduce memory use. Caller must have
// write lock.
func (fn *filenode) pruneMemSegments() {
- // TODO: share code with (*dirnode)sync()
+ // TODO: share code with (*dirnode)flush()
// TODO: pack/flush small blocks too, when fragmented
- if fn.throttle == nil {
- // TODO: share a throttle with filesystem
- fn.throttle = newThrottle(writeAheadBlocks)
- }
for idx, seg := range fn.segments {
seg, ok := seg.(*memSegment)
if !ok || seg.Len() < maxBlockSize || seg.flushing != nil {
// progress, block here until one finishes, rather
// than pile up an unlimited number of buffered writes
// and network flush operations.
- fn.throttle.Acquire()
+ fn.fs.throttle().Acquire()
go func() {
defer close(done)
locator, _, err := fn.FS().PutB(buf)
- fn.throttle.Release()
+ fn.fs.throttle().Release()
fn.Lock()
defer fn.Unlock()
- if curbuf := seg.buf[:1]; &curbuf[0] != &buf[0] {
+ if seg.flushing != done {
// A new seg.buf has been allocated.
return
}
}
}
-// Block until all pending pruneMemSegments work is finished. Caller
-// must NOT have lock.
+// Block until all pending pruneMemSegments/flush work is
+// finished. Caller must NOT have lock.
func (fn *filenode) waitPrune() {
var pending []<-chan struct{}
fn.Lock()
// storedSegments that reference the relevant portions of the new
// block.
//
+// bufsize is the total data size in refs. It is used to preallocate
+// the correct amount of memory when len(refs)>1.
+//
+// If sync is false, commitBlock returns right away, after starting a
+// goroutine to do the writes, reacquire the filenodes' locks, and
+// swap out the *memSegments. Some filenodes' segments might get
+// modified/rearranged in the meantime, in which case commitBlock
+// won't replace them.
+//
// Caller must have write lock.
-func (dn *dirnode) commitBlock(ctx context.Context, throttle *throttle, refs []fnSegmentRef) error {
+func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize int, sync bool) error {
if len(refs) == 0 {
return nil
}
- throttle.Acquire()
- defer throttle.Release()
if err := ctx.Err(); err != nil {
return err
}
- block := make([]byte, 0, maxBlockSize)
+ done := make(chan struct{})
+ var block []byte
+ segs := make([]*memSegment, 0, len(refs))
+ offsets := make([]int, 0, len(refs)) // location of segment's data within block
for _, ref := range refs {
- block = append(block, ref.fn.segments[ref.idx].(*memSegment).buf...)
- }
- locator, _, err := dn.fs.PutB(block)
- if err != nil {
- return err
+ seg := ref.fn.segments[ref.idx].(*memSegment)
+ if seg.flushing != nil && !sync {
+ // Let the other flushing goroutine finish. If
+ // it fails, we'll try again next time.
+ return nil
+ } else {
+ // In sync mode, we proceed regardless of
+ // whether another flush is in progress: It
+ // can't finish before we do, because we hold
+ // fn's lock until we finish our own writes.
+ }
+ seg.flushing = done
+ offsets = append(offsets, len(block))
+ if len(refs) == 1 {
+ block = seg.buf
+ } else if block == nil {
+ block = append(make([]byte, 0, bufsize), seg.buf...)
+ } else {
+ block = append(block, seg.buf...)
+ }
+ segs = append(segs, seg)
}
- off := 0
- for _, ref := range refs {
- data := ref.fn.segments[ref.idx].(*memSegment).buf
- ref.fn.segments[ref.idx] = storedSegment{
- kc: dn.fs,
- locator: locator,
- size: len(block),
- offset: off,
- length: len(data),
+ dn.fs.throttle().Acquire()
+ errs := make(chan error, 1)
+ go func() {
+ defer close(done)
+ defer close(errs)
+ locked := map[*filenode]bool{}
+ locator, _, err := dn.fs.PutB(block)
+ dn.fs.throttle().Release()
+ {
+ if !sync {
+ for _, name := range dn.sortedNames() {
+ if fn, ok := dn.inodes[name].(*filenode); ok {
+ fn.Lock()
+ defer fn.Unlock()
+ locked[fn] = true
+ }
+ }
+ }
+ defer func() {
+ for _, seg := range segs {
+ if seg.flushing == done {
+ seg.flushing = nil
+ }
+ }
+ }()
+ }
+ if err != nil {
+ errs <- err
+ return
}
- off += len(data)
- ref.fn.memsize -= int64(len(data))
+ for idx, ref := range refs {
+ if !sync {
+ // In async mode, fn's lock was
+ // released while we were waiting for
+ // PutB(); lots of things might have
+ // changed.
+ if len(ref.fn.segments) <= ref.idx {
+ // file segments have
+ // rearranged or changed in
+ // some way
+ continue
+ } else if seg, ok := ref.fn.segments[ref.idx].(*memSegment); !ok || seg != segs[idx] {
+ // segment has been replaced
+ continue
+ } else if seg.flushing != done {
+ // seg.buf has been replaced
+ continue
+ } else if !locked[ref.fn] {
+ // file was renamed, moved, or
+ // deleted since we called
+ // PutB
+ continue
+ }
+ }
+ data := ref.fn.segments[ref.idx].(*memSegment).buf
+ ref.fn.segments[ref.idx] = storedSegment{
+ kc: dn.fs,
+ locator: locator,
+ size: len(block),
+ offset: offsets[idx],
+ length: len(data),
+ }
+ ref.fn.memsize -= int64(len(data))
+ }
+ }()
+ if sync {
+ return <-errs
+ } else {
+ return nil
}
- return nil
}
-// sync flushes in-memory data and remote block references (for the
+type flushOpts struct {
+ sync bool
+ shortBlocks bool
+}
+
+// flush in-memory data and remote-cluster block references (for the
// children with the given names, which must be children of dn) to
-// local persistent storage. Caller must have write lock on dn and the
-// named children.
-func (dn *dirnode) sync(ctx context.Context, throttle *throttle, names []string) error {
+// local-cluster persistent storage.
+//
+// Caller must have write lock on dn and the named children.
+//
+// If any children are dirs, they will be flushed recursively.
+func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) error {
cg := newContextGroup(ctx)
defer cg.Cancel()
- goCommit := func(refs []fnSegmentRef) {
+ goCommit := func(refs []fnSegmentRef, bufsize int) {
cg.Go(func() error {
- return dn.commitBlock(cg.Context(), throttle, refs)
+ return dn.commitBlock(cg.Context(), refs, bufsize, opts.sync)
})
}
var pendingLen int = 0
localLocator := map[string]string{}
for _, name := range names {
- fn, ok := dn.inodes[name].(*filenode)
- if !ok {
- continue
- }
- for idx, seg := range fn.segments {
- switch seg := seg.(type) {
- case storedSegment:
- loc, ok := localLocator[seg.locator]
- if !ok {
- var err error
- loc, err = dn.fs.LocalLocator(seg.locator)
- if err != nil {
- return err
+ switch node := dn.inodes[name].(type) {
+ case *dirnode:
+ grandchildNames := node.sortedNames()
+ for _, grandchildName := range grandchildNames {
+ grandchild := node.inodes[grandchildName]
+ grandchild.Lock()
+ defer grandchild.Unlock()
+ }
+ cg.Go(func() error { return node.flush(cg.Context(), grandchildNames, opts) })
+ case *filenode:
+ for idx, seg := range node.segments {
+ switch seg := seg.(type) {
+ case storedSegment:
+ loc, ok := localLocator[seg.locator]
+ if !ok {
+ var err error
+ loc, err = dn.fs.LocalLocator(seg.locator)
+ if err != nil {
+ return err
+ }
+ localLocator[seg.locator] = loc
}
- localLocator[seg.locator] = loc
- }
- seg.locator = loc
- fn.segments[idx] = seg
- case *memSegment:
- if seg.Len() > maxBlockSize/2 {
- goCommit([]fnSegmentRef{{fn, idx}})
- continue
- }
- if pendingLen+seg.Len() > maxBlockSize {
- goCommit(pending)
- pending = nil
- pendingLen = 0
+ seg.locator = loc
+ node.segments[idx] = seg
+ case *memSegment:
+ if seg.Len() > maxBlockSize/2 {
+ goCommit([]fnSegmentRef{{node, idx}}, seg.Len())
+ continue
+ }
+ if pendingLen+seg.Len() > maxBlockSize {
+ goCommit(pending, pendingLen)
+ pending = nil
+ pendingLen = 0
+ }
+ pending = append(pending, fnSegmentRef{node, idx})
+ pendingLen += seg.Len()
+ default:
+ panic(fmt.Sprintf("can't sync segment type %T", seg))
}
- pending = append(pending, fnSegmentRef{fn, idx})
- pendingLen += seg.Len()
- default:
- panic(fmt.Sprintf("can't sync segment type %T", seg))
}
}
}
- goCommit(pending)
+ if opts.shortBlocks {
+ goCommit(pending, pendingLen)
+ }
return cg.Wait()
}
// caller must have write lock.
-func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle *throttle) (string, error) {
+func (dn *dirnode) memorySize() (size int64) {
+ for _, name := range dn.sortedNames() {
+ node := dn.inodes[name]
+ node.Lock()
+ defer node.Unlock()
+ switch node := node.(type) {
+ case *dirnode:
+ size += node.memorySize()
+ case *filenode:
+ for _, seg := range node.segments {
+ switch seg := seg.(type) {
+ case *memSegment:
+ size += int64(seg.Len())
+ }
+ }
+ }
+ }
+ return
+}
+
+// caller must have write lock.
+func (dn *dirnode) sortedNames() []string {
+ names := make([]string, 0, len(dn.inodes))
+ for name := range dn.inodes {
+ names = append(names, name)
+ }
+ sort.Strings(names)
+ return names
+}
+
+// caller must have write lock.
+func (dn *dirnode) marshalManifest(ctx context.Context, prefix string) (string, error) {
cg := newContextGroup(ctx)
defer cg.Cancel()
return manifestEscape(prefix) + " d41d8cd98f00b204e9800998ecf8427e+0 0:0:\\056\n", nil
}
- names := make([]string, 0, len(dn.inodes))
- for name := range dn.inodes {
- names = append(names, name)
- }
- sort.Strings(names)
+ names := dn.sortedNames()
// Wait for children to finish any pending write operations
// before locking them.
for i, name := range dirnames {
i, name := i, name
cg.Go(func() error {
- txt, err := dn.inodes[name].(*dirnode).marshalManifest(cg.Context(), prefix+"/"+name, throttle)
+ txt, err := dn.inodes[name].(*dirnode).marshalManifest(cg.Context(), prefix+"/"+name)
subdirs[i] = txt
return err
})
var fileparts []filepart
var blocks []string
- if err := dn.sync(cg.Context(), throttle, names); err != nil {
+ if err := dn.flush(cg.Context(), filenames, flushOpts{sync: true, shortBlocks: true}); err != nil {
return err
}
for _, name := range filenames {
default:
// This can't happen: we
// haven't unlocked since
- // calling sync().
+ // calling flush(sync=true).
panic(fmt.Sprintf("can't marshal segment type %T", seg))
}
}
}
maxBlockSize = 8
- defer func() { maxBlockSize = 2 << 26 }()
+ defer func() { maxBlockSize = 1 << 26 }()
var wg sync.WaitGroup
for n := 0; n < 128; n++ {
c.Check(err, check.ErrorMatches, `invalid flag.*`)
}
-func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
- defer func(wab, mbs int) {
- writeAheadBlocks = wab
+func (s *CollectionFSSuite) TestFlushFullBlocksWritingLongFile(c *check.C) {
+ defer func(cw, mbs int) {
+ concurrentWriters = cw
maxBlockSize = mbs
- }(writeAheadBlocks, maxBlockSize)
- writeAheadBlocks = 2
+ }(concurrentWriters, maxBlockSize)
+ concurrentWriters = 2
maxBlockSize = 1024
proceed := make(chan struct{})
default:
time.Sleep(time.Millisecond)
}
- c.Check(atomic.AddInt32(&concurrent, -1) < int32(writeAheadBlocks), check.Equals, true)
+ c.Check(atomic.AddInt32(&concurrent, -1) < int32(concurrentWriters), check.Equals, true)
}
fs, err := (&Collection{}).FileSystem(s.client, s.kc)
c.Check(currentMemExtents(), check.HasLen, 0)
}
+// Ensure blocks get flushed to disk if a lot of data is written to
+// small files/directories without calling sync().
+//
+// Write four 512KiB files into each of 256 top-level dirs (total
+// 512MiB), calling Flush() every 8 dirs. Ensure memory usage never
+// exceeds 24MiB (4 concurrentWriters * 2MiB + 8 unflushed dirs *
+// 2MiB).
+func (s *CollectionFSSuite) TestFlushAll(c *check.C) {
+ fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+
+ s.kc.onPut = func([]byte) {
+ // discard flushed data -- otherwise the stub will use
+ // unlimited memory
+ time.Sleep(time.Millisecond)
+ s.kc.Lock()
+ defer s.kc.Unlock()
+ s.kc.blocks = map[string][]byte{}
+ }
+ for i := 0; i < 256; i++ {
+ buf := bytes.NewBuffer(make([]byte, 524288))
+ fmt.Fprintf(buf, "test file in dir%d", i)
+
+ dir := fmt.Sprintf("dir%d", i)
+ fs.Mkdir(dir, 0755)
+ for j := 0; j < 2; j++ {
+ f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+ _, err = io.Copy(f, buf)
+ c.Assert(err, check.IsNil)
+ }
+
+ if i%8 == 0 {
+ fs.Flush("", true)
+ }
+
+ size := fs.memorySize()
+ if !c.Check(size <= 1<<24, check.Equals, true) {
+ c.Logf("at dir%d fs.memorySize()=%d", i, size)
+ return
+ }
+ }
+}
+
+// Ensure short blocks at the end of a stream don't get flushed by
+// Flush(false).
+//
+// Write 67x 1MiB files to each of 8 dirs, and check that 8 full 64MiB
+// blocks have been flushed while 8x 3MiB is still buffered in memory.
+func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
+ fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+
+ var flushed int64
+ s.kc.onPut = func(p []byte) {
+ atomic.AddInt64(&flushed, int64(len(p)))
+ }
+
+ nDirs := int64(8)
+ megabyte := make([]byte, 1<<20)
+ for i := int64(0); i < nDirs; i++ {
+ dir := fmt.Sprintf("dir%d", i)
+ fs.Mkdir(dir, 0755)
+ for j := 0; j < 67; j++ {
+ f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+ _, err = f.Write(megabyte)
+ c.Assert(err, check.IsNil)
+ }
+ }
+ c.Check(fs.memorySize(), check.Equals, int64(nDirs*67<<20))
+ c.Check(flushed, check.Equals, int64(0))
+
+ waitForFlush := func(expectUnflushed, expectFlushed int64) {
+ for deadline := time.Now().Add(5 * time.Second); fs.memorySize() > expectUnflushed && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
+ }
+ c.Check(fs.memorySize(), check.Equals, expectUnflushed)
+ c.Check(flushed, check.Equals, expectFlushed)
+ }
+
+ // Nothing flushed yet
+ waitForFlush((nDirs*67)<<20, 0)
+
+ // Flushing a non-empty dir "/" is non-recursive and there are
+ // no top-level files, so this has no effect
+ fs.Flush("/", false)
+ waitForFlush((nDirs*67)<<20, 0)
+
+ // Flush the full block in dir0
+ fs.Flush("dir0", false)
+ waitForFlush((nDirs*67-64)<<20, 64<<20)
+
+ err = fs.Flush("dir-does-not-exist", false)
+ c.Check(err, check.NotNil)
+
+ // Flush full blocks in all dirs
+ fs.Flush("", false)
+ waitForFlush(nDirs*3<<20, nDirs*64<<20)
+
+ // Flush non-full blocks, too
+ fs.Flush("", true)
+ waitForFlush(0, nDirs*67<<20)
+}
+
+// Even when writing lots of files/dirs from different goroutines, as
+// long as Flush(dir,false) is called after writing each file,
+// unflushed data should be limited to one full block per
+// concurrentWriter, plus one nearly-full block at the end of each
+// dir/stream.
+func (s *CollectionFSSuite) TestMaxUnflushed(c *check.C) {
+ nDirs := int64(8)
+ maxUnflushed := (int64(concurrentWriters) + nDirs) << 26
+
+ fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+
+ release := make(chan struct{})
+ timeout := make(chan struct{})
+ time.AfterFunc(10*time.Second, func() { close(timeout) })
+ var putCount, concurrency int64
+ var unflushed int64
+ s.kc.onPut = func(p []byte) {
+ defer atomic.AddInt64(&unflushed, -int64(len(p)))
+ cur := atomic.AddInt64(&concurrency, 1)
+ defer atomic.AddInt64(&concurrency, -1)
+ pc := atomic.AddInt64(&putCount, 1)
+ if pc < int64(concurrentWriters) {
+ // Block until we reach concurrentWriters, to
+ // make sure we're really accepting concurrent
+ // writes.
+ select {
+ case <-release:
+ case <-timeout:
+ c.Error("timeout")
+ }
+ } else if pc == int64(concurrentWriters) {
+ // Unblock the first N-1 PUT reqs.
+ close(release)
+ }
+ c.Assert(cur <= int64(concurrentWriters), check.Equals, true)
+ c.Assert(atomic.LoadInt64(&unflushed) <= maxUnflushed, check.Equals, true)
+ }
+
+ var owg sync.WaitGroup
+ megabyte := make([]byte, 1<<20)
+ for i := int64(0); i < nDirs; i++ {
+ dir := fmt.Sprintf("dir%d", i)
+ fs.Mkdir(dir, 0755)
+ owg.Add(1)
+ go func() {
+ defer owg.Done()
+ defer fs.Flush(dir, true)
+ var iwg sync.WaitGroup
+ defer iwg.Wait()
+ for j := 0; j < 67; j++ {
+ iwg.Add(1)
+ go func(j int) {
+ defer iwg.Done()
+ f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+ n, err := f.Write(megabyte)
+ c.Assert(err, check.IsNil)
+ atomic.AddInt64(&unflushed, int64(n))
+ fs.Flush(dir, false)
+ }(j)
+ }
+ }()
+ }
+ owg.Wait()
+ fs.Flush("", true)
+}
+
func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
for _, txt := range []string{
"\n",
type customFileSystem struct {
fileSystem
root *vdirnode
+ thr *throttle
staleThreshold time.Time
staleLock sync.Mutex
fileSystem: fileSystem{
fsBackend: keepBackend{apiClient: c, keepClient: kc},
root: root,
+ thr: newThrottle(concurrentWriters),
},
}
root.inode = &treenode{
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvados
+
+import (
+ "bytes"
+ "net/http"
+)
+
+type LoginResponse struct {
+ RedirectLocation string
+ HTML bytes.Buffer
+}
+
+func (resp LoginResponse) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+ if resp.RedirectLocation != "" {
+ w.Header().Set("Location", resp.RedirectLocation)
+ w.WriteHeader(http.StatusFound)
+ } else {
+ w.Write(resp.HTML.Bytes())
+ }
+}
"context"
"encoding/json"
"errors"
+ "net/url"
"reflect"
"runtime"
"sync"
mtx sync.Mutex
}
+// BaseURL implements federation.backend
+func (as *APIStub) BaseURL() url.URL {
+ return url.URL{Scheme: "https", Host: "apistub.example.com"}
+}
func (as *APIStub) ConfigGet(ctx context.Context) (json.RawMessage, error) {
as.appendCall(as.ConfigGet, ctx, nil)
return nil, as.Error
}
+func (as *APIStub) Login(ctx context.Context, options arvados.LoginOptions) (arvados.LoginResponse, error) {
+ as.appendCall(as.Login, ctx, options)
+ return arvados.LoginResponse{}, as.Error
+}
func (as *APIStub) CollectionCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Collection, error) {
as.appendCall(as.CollectionCreate, ctx, options)
return arvados.Collection{}, as.Error
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvadostest
+
+import (
+ "crypto/tls"
+ "net"
+ "net/http"
+ "net/http/httptest"
+ "net/http/httputil"
+ "net/url"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "gopkg.in/check.v1"
+)
+
+type Proxy struct {
+ *httptest.Server
+
+ // URL where the proxy is listening. Same as Server.URL, but
+ // with parsing already done for you.
+ URL *url.URL
+
+ // A dump of each request that has been proxied.
+ RequestDumps [][]byte
+}
+
+// NewProxy returns a new Proxy that saves a dump of each reqeust
+// before forwarding to the indicated service.
+func NewProxy(c *check.C, svc arvados.Service) *Proxy {
+ var target url.URL
+ c.Assert(svc.InternalURLs, check.HasLen, 1)
+ for u := range svc.InternalURLs {
+ target = url.URL(u)
+ break
+ }
+ rp := httputil.NewSingleHostReverseProxy(&target)
+ rp.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
+ dump, _ := httputil.DumpRequest(r, false)
+ c.Logf("arvadostest.Proxy ErrorHandler(%s): %s\n%s", r.URL, err, dump)
+ http.Error(w, err.Error(), http.StatusBadGateway)
+ }
+ rp.Transport = &http.Transport{
+ DialContext: (&net.Dialer{
+ Timeout: 30 * time.Second,
+ KeepAlive: 30 * time.Second,
+ DualStack: true,
+ }).DialContext,
+ MaxIdleConns: 100,
+ IdleConnTimeout: 90 * time.Second,
+ TLSHandshakeTimeout: 10 * time.Second,
+ ExpectContinueTimeout: 1 * time.Second,
+ TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
+ }
+ srv := httptest.NewServer(rp)
+ u, err := url.Parse(srv.URL)
+ c.Assert(err, check.IsNil)
+ proxy := &Proxy{
+ Server: srv,
+ URL: u,
+ }
+ rp.Director = func(r *http.Request) {
+ dump, _ := httputil.DumpRequest(r, true)
+ proxy.RequestDumps = append(proxy.RequestDumps, dump)
+ r.URL.Scheme = target.Scheme
+ r.URL.Host = target.Host
+ }
+ return proxy
+}
if respCode == 0 {
respCode = http.StatusOK
}
- lgr.WithFields(logrus.Fields{
+ fields := logrus.Fields{
"respStatusCode": respCode,
"respStatus": http.StatusText(respCode),
"respBytes": w.WroteBodyBytes(),
- }).Info("response")
+ }
+ if respCode >= 400 {
+ fields["respBody"] = string(w.Sniffed())
+ }
+ lgr.WithFields(fields).Info("response")
}
type responseTimer struct {
"bytes"
"context"
"encoding/json"
+ "fmt"
"net/http"
"net/http/httptest"
"testing"
var _ = check.Suite(&Suite{})
-type Suite struct{}
+type Suite struct {
+ ctx context.Context
+ log *logrus.Logger
+ logdata *bytes.Buffer
+}
-func (s *Suite) TestLogRequests(c *check.C) {
- captured := &bytes.Buffer{}
- log := logrus.New()
- log.Out = captured
- log.Formatter = &logrus.JSONFormatter{
+func (s *Suite) SetUpTest(c *check.C) {
+ s.logdata = bytes.NewBuffer(nil)
+ s.log = logrus.New()
+ s.log.Out = s.logdata
+ s.log.Formatter = &logrus.JSONFormatter{
TimestampFormat: time.RFC3339Nano,
}
- ctx := ctxlog.Context(context.Background(), log)
+ s.ctx = ctxlog.Context(context.Background(), s.log)
+}
+func (s *Suite) TestLogRequests(c *check.C) {
h := AddRequestIDs(LogRequests(
http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.Write([]byte("hello world"))
c.Assert(err, check.IsNil)
resp := httptest.NewRecorder()
- HandlerWithContext(ctx, h).ServeHTTP(resp, req)
+ HandlerWithContext(s.ctx, h).ServeHTTP(resp, req)
- dec := json.NewDecoder(captured)
+ dec := json.NewDecoder(s.logdata)
gotReq := make(map[string]interface{})
err = dec.Decode(&gotReq)
c.Check(gotResp[key].(float64), check.Not(check.Equals), float64(0))
}
}
+
+func (s *Suite) TestLogErrorBody(c *check.C) {
+ dec := json.NewDecoder(s.logdata)
+
+ for _, trial := range []struct {
+ label string
+ statusCode int
+ sentBody string
+ expectLog bool
+ expectBody string
+ }{
+ {"ok", 200, "hello world", false, ""},
+ {"redir", 302, "<a href='http://foo.example/baz'>redir</a>", false, ""},
+ {"4xx short body", 400, "oops", true, "oops"},
+ {"4xx long body", 400, fmt.Sprintf("%0*d", sniffBytes*2, 1), true, fmt.Sprintf("%0*d", sniffBytes, 0)},
+ {"5xx empty body", 500, "", true, ""},
+ } {
+ comment := check.Commentf("in trial: %q", trial.label)
+
+ req, err := http.NewRequest("GET", "https://foo.example/bar", nil)
+ c.Assert(err, check.IsNil)
+ resp := httptest.NewRecorder()
+
+ HandlerWithContext(s.ctx, LogRequests(
+ http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ w.WriteHeader(trial.statusCode)
+ w.Write([]byte(trial.sentBody))
+ }),
+ )).ServeHTTP(resp, req)
+
+ gotReq := make(map[string]interface{})
+ err = dec.Decode(&gotReq)
+ c.Logf("%#v", gotReq)
+ gotResp := make(map[string]interface{})
+ err = dec.Decode(&gotResp)
+ c.Logf("%#v", gotResp)
+ if trial.expectLog {
+ c.Check(gotResp["respBody"], check.Equals, trial.expectBody, comment)
+ } else {
+ c.Check(gotResp["respBody"], check.IsNil, comment)
+ }
+ }
+}
"net/http"
)
+const sniffBytes = 1024
+
type ResponseWriter interface {
http.ResponseWriter
WroteStatus() int
WroteBodyBytes() int
+ Sniffed() []byte
}
// responseWriter wraps http.ResponseWriter and exposes the status
// error.
type responseWriter struct {
http.ResponseWriter
- wroteStatus int // Last status given to WriteHeader()
+ wroteStatus int // First status given to WriteHeader()
wroteBodyBytes int // Bytes successfully written
err error // Last error returned from Write()
+ sniffed []byte
}
func WrapResponseWriter(orig http.ResponseWriter) ResponseWriter {
}
func (w *responseWriter) WriteHeader(s int) {
- w.wroteStatus = s
+ if w.wroteStatus == 0 {
+ w.wroteStatus = s
+ }
+ // ...else it's too late to change the status seen by the
+ // client -- but we call the wrapped WriteHeader() anyway so
+ // it can log a warning.
w.ResponseWriter.WriteHeader(s)
}
func (w *responseWriter) Write(data []byte) (n int, err error) {
if w.wroteStatus == 0 {
w.WriteHeader(http.StatusOK)
+ } else if w.wroteStatus >= 400 {
+ w.sniff(data)
}
n, err = w.ResponseWriter.Write(data)
w.wroteBodyBytes += n
func (w *responseWriter) Err() error {
return w.err
}
+
+func (w *responseWriter) sniff(data []byte) {
+ max := sniffBytes - len(w.sniffed)
+ if max <= 0 {
+ return
+ } else if max < len(data) {
+ data = data[:max]
+ }
+ w.sniffed = append(w.sniffed, data...)
+}
+
+func (w *responseWriter) Sniffed() []byte {
+ return w.sniffed
+}
public class FileToken {
private int filePosition;
- private int fileSize;
+ private long fileSize;
private String fileName;
private String path;
private void splitFileTokenInfo(String fileTokenInfo) {
String[] tokenPieces = fileTokenInfo.split(":");
this.filePosition = Integer.parseInt(tokenPieces[0]);
- this.fileSize = Integer.parseInt(tokenPieces[1]);
+ this.fileSize = Long.parseLong(tokenPieces[1]);
this.fileName = tokenPieces[2].replace(Characters.SPACE, " ");
}
return this.filePosition;
}
- public int getFileSize() {
+ public long getFileSize() {
return this.fileSize;
}
// values for tracking file output streams and matching data chunks with initial files
int currentDataChunkNumber;
int bytesDownloadedFromChunk;
- int bytesToDownload;
+ long bytesToDownload;
byte[] currentDataChunk;
boolean remainingDataInChunk;
final List<KeepLocator> keepLocators;
this.keepLocators = keepLocators;
}
- private int getBytesToDownload() {
+ private long getBytesToDownload() {
return bytesToDownload;
}
- private void setBytesToDownload(int bytesToDownload) {
+ private void setBytesToDownload(long bytesToDownload) {
this.bytesToDownload = bytesToDownload;
}
private void writeDownDataChunkPartially(FileOutputStream fos) throws IOException {
//write all remaining bytes for this file from current chunk
- fos.write(currentDataChunk, bytesDownloadedFromChunk, bytesToDownload);
+ fos.write(currentDataChunk, bytesDownloadedFromChunk, (int) bytesToDownload);
// update number of bytes downloaded from this chunk
bytesDownloadedFromChunk += bytesToDownload;
// set remaining data in chunk to true
public static final String FILE_TOKEN_INFO = "0:1024:test-file1";
public static final int FILE_POSITION = 0;
- public static final int FILE_LENGTH = 1024;
+ public static final long FILE_LENGTH = 1024L;
public static final String FILE_NAME = "test-file1";
public static final String FILE_PATH = "c" + Characters.SLASH;
#
def copy_collection(obj_uuid, src, dst, args):
if arvados.util.keep_locator_pattern.match(obj_uuid):
- # If the obj_uuid is a portable data hash, it might not be uniquely
- # identified with a particular collection. As a result, it is
- # ambigious as to what name to use for the copy. Apply some heuristics
- # to pick which collection to get the name from.
+ # If the obj_uuid is a portable data hash, it might not be
+ # uniquely identified with a particular collection. As a
+ # result, it is ambiguous as to what name to use for the copy.
+ # Apply some heuristics to pick which collection to get the
+ # name from.
srccol = src.collections().list(
filters=[['portable_data_hash', '=', obj_uuid]],
order="created_at asc"
raise "Local login disabled when LoginCluster is set"
end
- omniauth = request.env['omniauth.auth']
+ if params[:provider] == 'controller'
+ if request.headers['Authorization'] != 'Bearer ' + Rails.configuration.SystemRootToken
+ return send_error('Invalid authorization header', status: 401)
+ end
+ # arvados-controller verified the user and is passing auth_info
+ # in request params.
+ authinfo = SafeJSON.load(params[:auth_info])
+ else
+ # omniauth middleware verified the user and is passing auth_info
+ # in request.env.
+ authinfo = request.env['omniauth.auth']['info'].with_indifferent_access
+ end
begin
- user = User.register omniauth['info']
+ user = User.register(authinfo)
rescue => e
Rails.logger.warn e
return redirect_to login_failure_url
user.save or raise Exception.new(user.errors.messages)
- omniauth.delete('extra')
-
# Give the authenticated user a cookie for direct API access
session[:user_id] = user.id
session[:api_client_uuid] = nil
# alternate_emails
# identity_url
- info = info.with_indifferent_access
-
primary_user = nil
# local database
if !primary_user
primary_user = user.redirects_to
elsif primary_user.uuid != user.redirects_to.uuid
- raise "Ambigious email address, directs to both #{primary_user.uuid} and #{user.redirects_to.uuid}"
+ raise "Ambiguous email address, directs to both #{primary_user.uuid} and #{user.redirects_to.uuid}"
end
end
end
arvcfg = ConfigLoader.new
arvcfg.declare_config "ClusterID", NonemptyString, :uuid_prefix
arvcfg.declare_config "ManagementToken", String, :ManagementToken
+arvcfg.declare_config "SystemRootToken", String
arvcfg.declare_config "Git.Repositories", String, :git_repositories_dir
arvcfg.declare_config "API.DisabledAPIs", Hash, :disable_api_methods, ->(cfg, k, v) { arrayToHash cfg, "API.DisabledAPIs", v }
arvcfg.declare_config "API.MaxRequestSize", Integer, :max_request_size
arvcfg.declare_config "Users.UserNotifierEmailFrom", String, :user_notifier_email_from
arvcfg.declare_config "Users.NewUserNotificationRecipients", Hash, :new_user_notification_recipients, ->(cfg, k, v) { arrayToHash cfg, "Users.NewUserNotificationRecipients", v }
arvcfg.declare_config "Users.NewInactiveUserNotificationRecipients", Hash, :new_inactive_user_notification_recipients, method(:arrayToHash)
-arvcfg.declare_config "Login.ProviderAppSecret", NonemptyString, :sso_app_secret
-arvcfg.declare_config "Login.ProviderAppID", NonemptyString, :sso_app_id
+arvcfg.declare_config "Login.ProviderAppSecret", String, :sso_app_secret
+arvcfg.declare_config "Login.ProviderAppID", String, :sso_app_id
arvcfg.declare_config "Login.LoginCluster", String
arvcfg.declare_config "Login.RemoteTokenRefresh", ActiveSupport::Duration
arvcfg.declare_config "TLS.Insecure", Boolean, :sso_insecure
assert_nil assigns(:api_client)
end
+ test "controller cannot create session without SystemRootToken" do
+ get :create, params: {provider: 'controller', auth_info: {email: "foo@bar.com"}, return_to: ',https://app.example'}
+ assert_response 401
+ end
+
+ test "controller cannot create session with wrong SystemRootToken" do
+ @request.headers['Authorization'] = 'Bearer blah'
+ get :create, params: {provider: 'controller', auth_info: {email: "foo@bar.com"}, return_to: ',https://app.example'}
+ assert_response 401
+ end
+
+ test "controller can create session using SystemRootToken" do
+ @request.headers['Authorization'] = 'Bearer '+Rails.configuration.SystemRootToken
+ get :create, params: {provider: 'controller', auth_info: {email: "foo@bar.com"}, return_to: ',https://app.example'}
+ assert_response :redirect
+ api_client_auth = assigns(:api_client_auth)
+ assert_not_nil api_client_auth
+ assert_includes(@response.redirect_url, 'api_token='+api_client_auth.token)
+ end
end
assert_equal "Baratheon", nbs.last_name
end
- test "fail when email address is ambigious" do
+ test "fail when email address is ambiguous" do
User.register({"email" => "active-user@arvados.local"})
u = User.register({"email" => "never-before-seen-user@arvados.local"})
u.email = "active-user@arvados.local"
Description=Arvados Crunch Dispatcher for SLURM
Documentation=https://doc.arvados.org/
After=network.target
-AssertPathExists=/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml
# systemd==229 (ubuntu:xenial) obeys StartLimitInterval in the [Unit] section
StartLimitInterval=0
"strings"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/manifest"
)
return "", fmt.Errorf("error making directory %q in output collection: %v", d, err)
}
}
+ var unflushed int64
+ var lastparentdir string
for _, f := range cp.files {
- err = cp.copyFile(fs, f)
+ // If a dir has just had its last file added, do a
+ // full Flush. Otherwise, do a partial Flush (write
+ // full-size blocks, but leave the last short block
+ // open so f's data can be packed with it).
+ dir, _ := filepath.Split(f.dst)
+ if dir != lastparentdir || unflushed > keepclient.BLOCKSIZE {
+ if err := fs.Flush("/"+lastparentdir, dir != lastparentdir); err != nil {
+ return "", fmt.Errorf("error flushing output collection file data: %v", err)
+ }
+ unflushed = 0
+ }
+ lastparentdir = dir
+
+ n, err := cp.copyFile(fs, f)
if err != nil {
return "", fmt.Errorf("error copying file %q into output collection: %v", f, err)
}
+ unflushed += n
}
return fs.MarshalManifest(".")
}
-func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) error {
+func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) (int64, error) {
cp.logger.Printf("copying %q (%d bytes)", f.dst, f.size)
dst, err := fs.OpenFile(f.dst, os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
- return err
+ return 0, err
}
src, err := os.Open(f.src)
if err != nil {
dst.Close()
- return err
+ return 0, err
}
defer src.Close()
- _, err = io.Copy(dst, src)
+ n, err := io.Copy(dst, src)
if err != nil {
dst.Close()
- return err
+ return n, err
}
- return dst.Close()
+ return n, dst.Close()
}
// Append to cp.manifest, cp.files, and cp.dirs so as to copy src (an
Docker ThinDockerClient
// Dispatcher client is initialized with the Dispatcher token.
- // This is a priviledged token used to manage container status
+ // This is a privileged token used to manage container status
// and logs.
//
// We have both dispatcherClient and DispatcherArvClient
return err
}
-// LogNodeRecord logs arvados#node record corresponding to the current host.
+// LogNodeRecord logs the current host's InstanceType config entry (or
+// the arvados#node record, if running via crunch-dispatch-slurm).
func (runner *ContainerRunner) LogNodeRecord() error {
- hostname := os.Getenv("SLURMD_NODENAME")
- if hostname == "" {
- hostname, _ = os.Hostname()
- }
- _, err := runner.logAPIResponse("node", "nodes", map[string]interface{}{"filters": [][]string{{"hostname", "=", hostname}}}, func(resp interface{}) {
- // The "info" field has admin-only info when obtained
- // with a privileged token, and should not be logged.
- node, ok := resp.(map[string]interface{})
- if ok {
- delete(node, "info")
- }
- })
- return err
+ if it := os.Getenv("InstanceType"); it != "" {
+ // Dispatched via arvados-dispatch-cloud. Save
+ // InstanceType config fragment received from
+ // dispatcher on stdin.
+ w, err := runner.LogCollection.OpenFile("node.json", os.O_CREATE|os.O_WRONLY, 0666)
+ if err != nil {
+ return err
+ }
+ defer w.Close()
+ _, err = io.WriteString(w, it)
+ if err != nil {
+ return err
+ }
+ return w.Close()
+ } else {
+ // Dispatched via crunch-dispatch-slurm. Look up
+ // apiserver's node record corresponding to
+ // $SLURMD_NODENAME.
+ hostname := os.Getenv("SLURMD_NODENAME")
+ if hostname == "" {
+ hostname, _ = os.Hostname()
+ }
+ _, err := runner.logAPIResponse("node", "nodes", map[string]interface{}{"filters": [][]string{{"hostname", "=", hostname}}}, func(resp interface{}) {
+ // The "info" field has admin-only info when
+ // obtained with a privileged token, and
+ // should not be logged.
+ node, ok := resp.(map[string]interface{})
+ if ok {
+ delete(node, "info")
+ }
+ })
+ return err
+ }
}
func (runner *ContainerRunner) logAPIResponse(label, path string, params map[string]interface{}, munge func(interface{})) (logged bool, err error) {
import (
"encoding/json"
- "fmt"
"html"
"html/template"
"io"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/auth"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"git.curoverse.com/arvados.git/sdk/go/health"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
- log "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus"
"golang.org/x/net/webdav"
)
// are ignored (all response writes return the update error).
type updateOnSuccess struct {
httpserver.ResponseWriter
+ logger logrus.FieldLogger
update func() error
sentHeader bool
err error
if err, ok := uos.err.(*arvados.TransactionError); ok {
code = err.StatusCode
}
- log.Printf("update() changes response to HTTP %d: %T %q", code, uos.err, uos.err)
+ uos.logger.WithError(uos.err).Errorf("update() returned error type %T, changing response to HTTP %d", uos.err, code)
http.Error(uos.ResponseWriter, uos.err.Error(), code)
return
}
func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
h.setupOnce.Do(h.setup)
- var statusCode = 0
- var statusText string
-
remoteAddr := r.RemoteAddr
if xff := r.Header.Get("X-Forwarded-For"); xff != "" {
remoteAddr = xff + "," + remoteAddr
}
w := httpserver.WrapResponseWriter(wOrig)
- defer func() {
- if statusCode == 0 {
- statusCode = w.WroteStatus()
- } else if w.WroteStatus() == 0 {
- w.WriteHeader(statusCode)
- } else if w.WroteStatus() != statusCode {
- log.WithField("RequestID", r.Header.Get("X-Request-Id")).Warn(
- fmt.Sprintf("Our status changed from %d to %d after we sent headers", w.WroteStatus(), statusCode))
- }
- if statusText == "" {
- statusText = http.StatusText(statusCode)
- }
- }()
if strings.HasPrefix(r.URL.Path, "/_health/") && r.Method == "GET" {
h.healthHandler.ServeHTTP(w, r)
if method := r.Header.Get("Access-Control-Request-Method"); method != "" && r.Method == "OPTIONS" {
if !browserMethod[method] && !webdavMethod[method] {
- statusCode = http.StatusMethodNotAllowed
+ w.WriteHeader(http.StatusMethodNotAllowed)
return
}
w.Header().Set("Access-Control-Allow-Headers", corsAllowHeadersHeader)
w.Header().Set("Access-Control-Allow-Methods", "COPY, DELETE, GET, LOCK, MKCOL, MOVE, OPTIONS, POST, PROPFIND, PROPPATCH, PUT, RMCOL, UNLOCK")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Max-Age", "86400")
- statusCode = http.StatusOK
return
}
if !browserMethod[r.Method] && !webdavMethod[r.Method] {
- statusCode, statusText = http.StatusMethodNotAllowed, r.Method
+ w.WriteHeader(http.StatusMethodNotAllowed)
return
}
}
if collectionID == "" && !useSiteFS {
- statusCode = http.StatusNotFound
+ w.WriteHeader(http.StatusNotFound)
return
}
arv := h.clientPool.Get()
if arv == nil {
- statusCode, statusText = http.StatusInternalServerError, "Pool failed: "+h.clientPool.Err().Error()
+ http.Error(w, "client pool error: "+h.clientPool.Err().Error(), http.StatusInternalServerError)
return
}
defer h.clientPool.Put(arv)
}
}
// Something more serious is wrong
- statusCode, statusText = http.StatusInternalServerError, err.Error()
+ http.Error(w, "cache error: "+err.Error(), http.StatusInternalServerError)
return
}
if collection == nil {
// for additional credentials would just be
// confusing), or we don't even accept
// credentials at this path.
- statusCode = http.StatusNotFound
+ w.WriteHeader(http.StatusNotFound)
return
}
for _, t := range reqTokens {
if tokenResult[t] == 404 {
// The client provided valid token(s), but the
// collection was not found.
- statusCode = http.StatusNotFound
+ w.WriteHeader(http.StatusNotFound)
return
}
}
// data that has been deleted. Allow a referrer to
// provide this context somehow?
w.Header().Add("WWW-Authenticate", "Basic realm=\"collections\"")
- statusCode = http.StatusUnauthorized
+ w.WriteHeader(http.StatusUnauthorized)
return
}
kc, err := keepclient.MakeKeepClient(arv)
if err != nil {
- statusCode, statusText = http.StatusInternalServerError, err.Error()
+ http.Error(w, "error setting up keep client: "+err.Error(), http.StatusInternalServerError)
return
}
kc.RequestID = r.Header.Get("X-Request-Id")
fs, err := collection.FileSystem(client, kc)
if err != nil {
- statusCode, statusText = http.StatusInternalServerError, err.Error()
+ http.Error(w, "error creating collection filesystem: "+err.Error(), http.StatusInternalServerError)
return
}
writefs, writeOK := fs.(arvados.CollectionFileSystem)
targetIsPDH := arvadosclient.PDHMatch(collectionID)
if (targetIsPDH || !writeOK) && writeMethod[r.Method] {
- statusCode, statusText = http.StatusMethodNotAllowed, errReadOnly.Error()
+ http.Error(w, errReadOnly.Error(), http.StatusMethodNotAllowed)
return
}
// collection can't be saved.
w = &updateOnSuccess{
ResponseWriter: w,
+ logger: ctxlog.FromContext(r.Context()),
update: func() error {
return h.Config.Cache.Update(client, *collection, writefs)
}}
LockSystem: h.webdavLS,
Logger: func(_ *http.Request, err error) {
if err != nil {
- log.Printf("error from webdav handler: %q", err)
+ ctxlog.FromContext(r.Context()).WithError(err).Error("error reported by webdav handler")
}
},
}
openPath := "/" + strings.Join(targetPath, "/")
if f, err := fs.Open(openPath); os.IsNotExist(err) {
// Requested non-existent path
- statusCode = http.StatusNotFound
+ w.WriteHeader(http.StatusNotFound)
} else if err != nil {
// Some other (unexpected) error
- statusCode, statusText = http.StatusInternalServerError, err.Error()
+ http.Error(w, "open: "+err.Error(), http.StatusInternalServerError)
} else if stat, err := f.Stat(); err != nil {
// Can't get Size/IsDir (shouldn't happen with a collectionFS!)
- statusCode, statusText = http.StatusInternalServerError, err.Error()
+ http.Error(w, "stat: "+err.Error(), http.StatusInternalServerError)
} else if stat.IsDir() && !strings.HasSuffix(r.URL.Path, "/") {
// If client requests ".../dirname", redirect to
// ".../dirname/". This way, relative links in the
h.serveDirectory(w, r, collection.Name, fs, openPath, true)
} else {
http.ServeContent(w, r, basename, stat.ModTime(), f)
- if r.Header.Get("Range") == "" && int64(w.WroteBodyBytes()) != stat.Size() {
+ if wrote := int64(w.WroteBodyBytes()); wrote != stat.Size() && r.Header.Get("Range") == "" {
// If we wrote fewer bytes than expected, it's
// too late to change the real response code
// or send an error message to the client, but
// at least we can try to put some useful
// debugging info in the logs.
n, err := f.Read(make([]byte, 1024))
- statusCode, statusText = http.StatusInternalServerError, fmt.Sprintf("f.Size()==%d but only wrote %d bytes; read(1024) returns %d, %s", stat.Size(), w.WroteBodyBytes(), n, err)
+ ctxlog.FromContext(r.Context()).Errorf("stat.Size()==%d but only wrote %d bytes; read(1024) returns %d, %s", stat.Size(), wrote, n, err)
}
}
kc, err := keepclient.MakeKeepClient(arv)
if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
+ http.Error(w, "error setting up keep client: "+err.Error(), http.StatusInternalServerError)
return
}
kc.RequestID = r.Header.Get("X-Request-Id")
LockSystem: h.webdavLS,
Logger: func(_ *http.Request, err error) {
if err != nil {
- log.Printf("error from webdav handler: %q", err)
+ ctxlog.FromContext(r.Context()).WithError(err).Error("error reported by webdav handler")
}
},
}
return nil
}
if err := walk(""); err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
+ http.Error(w, "error getting directory listing: "+err.Error(), http.StatusInternalServerError)
return
}
}
tmpl, err := template.New("dir").Funcs(funcs).Parse(dirListingTemplate)
if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
+ http.Error(w, "error parsing template: "+err.Error(), http.StatusInternalServerError)
return
}
sort.Slice(files, func(i, j int) bool {
// into a cookie unless the current vhost
// (origin) serves only a single collection or
// we are in TrustAllContent mode.
- w.WriteHeader(http.StatusBadRequest)
+ http.Error(w, "cannot serve inline content at this URL (possible configuration error; see https://doc.arvados.org/install/install-keep-web.html#dns)", http.StatusBadRequest)
return
}
if location != "" {
newu, err := u.Parse(location)
if err != nil {
- w.WriteHeader(http.StatusInternalServerError)
+ http.Error(w, "error resolving redirect target: "+err.Error(), http.StatusInternalServerError)
return
}
u = newu
"",
"",
http.StatusBadRequest,
- "",
+ "cannot serve inline content at this URL (possible configuration error; see https://doc.arvados.org/install/install-keep-web.html#dns)\n",
)
}
"",
"",
http.StatusBadRequest,
- "",
+ "cannot serve inline content at this URL (possible configuration error; see https://doc.arvados.org/install/install-keep-web.html#dns)\n",
)
resp := s.testVhostRedirectTokenToCookie(c, "GET",
// Initialize the trashq and workers
h.trashq = NewWorkQueue()
for i := 0; i < 1 || i < h.Cluster.Collections.BlobTrashConcurrency; i++ {
- go RunTrashWorker(h.volmgr, h.Cluster, h.trashq)
+ go RunTrashWorker(h.volmgr, h.Logger, h.Cluster, h.trashq)
}
// Set up routes and metrics
testname string,
expectedStatus int,
response *httptest.ResponseRecorder) {
- if response.Code != expectedStatus {
- c.Errorf("%s: expected status %d, got %+v",
- testname, expectedStatus, response)
- }
+ c.Check(response.Code, check.Equals, expectedStatus, check.Commentf("%s", testname))
}
func ExpectBody(
"",
http.StatusOK,
response)
- expected := "Successfully untrashed on: [MockVolume],[MockVolume]"
- if response.Body.String() != expected {
- c.Errorf(
- "Untrash response mismatched: expected %s, got:\n%s",
- expected, response.Body.String())
- }
+ c.Check(response.Body.String(), check.Equals, "Successfully untrashed on: [MockVolume], [MockVolume]\n")
}
func (s *HandlerSuite) TestUntrashHandlerWithNoWritableVolumes(c *check.C) {
"encoding/json"
"fmt"
"io"
- "log"
"net/http"
"os"
"regexp"
for _, v := range vols {
if err := v.IndexTo(prefix, resp); err != nil {
- // We can't send an error message to the
- // client because we might have already sent
- // headers and index content. All we can do is
- // log the error in our own logs, and (in
- // cases where headers haven't been sent yet)
- // set a 500 status.
+ // We can't send an error status/message to
+ // the client because IndexTo() might have
+ // already written body content. All we can do
+ // is log the error in our own logs.
//
- // If headers have already been sent, the
- // client must notice the lack of trailing
+ // The client must notice the lack of trailing
// newline as an indication that the response
// is incomplete.
- log.Printf("index error from volume %s: %s", v, err)
- http.Error(resp, "", http.StatusInternalServerError)
+ ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v)
return
}
}
}
var ds debugStats
runtime.ReadMemStats(&ds.MemStats)
- err := json.NewEncoder(resp).Encode(&ds)
+ data, err := json.Marshal(&ds)
if err != nil {
- http.Error(resp, err.Error(), 500)
+ http.Error(resp, err.Error(), http.StatusInternalServerError)
+ return
}
+ resp.Write(data)
}
// StatusHandler addresses /status.json requests.
func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
stLock.Lock()
rtr.readNodeStatus(&st)
- jstat, err := json.Marshal(&st)
+ data, err := json.Marshal(&st)
stLock.Unlock()
- if err == nil {
- resp.Write(jstat)
- } else {
- log.Printf("json.Marshal: %s", err)
- log.Printf("NodeStatus = %v", &st)
- http.Error(resp, err.Error(), 500)
+ if err != nil {
+ http.Error(resp, err.Error(), http.StatusInternalServerError)
+ return
}
+ resp.Write(data)
}
// populate the given NodeStatus struct with current values.
continue
} else {
result.Failed++
- log.Println("DeleteHandler:", err)
+ ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol)
}
}
-
- var st int
-
if result.Deleted == 0 && result.Failed == 0 {
- st = http.StatusNotFound
- } else {
- st = http.StatusOK
+ resp.WriteHeader(http.StatusNotFound)
+ return
}
-
- resp.WriteHeader(st)
-
- if st == http.StatusOK {
- if body, err := json.Marshal(result); err == nil {
- resp.Write(body)
- } else {
- log.Printf("json.Marshal: %s (result = %v)", err, result)
- http.Error(resp, err.Error(), 500)
- }
+ body, err := json.Marshal(result)
+ if err != nil {
+ http.Error(resp, err.Error(), http.StatusInternalServerError)
+ return
}
+ resp.Write(body)
}
/* PullHandler processes "PUT /pull" requests for the data manager.
return
}
+ log := ctxlog.FromContext(req.Context())
hash := mux.Vars(req)["hash"]
if len(rtr.volmgr.AllWritable()) == 0 {
if os.IsNotExist(err) {
numNotFound++
} else if err != nil {
- log.Printf("Error untrashing %v on volume %v", hash, vol.String())
+ log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol)
failedOn = append(failedOn, vol.String())
} else {
- log.Printf("Untrashed %v on volume %v", hash, vol.String())
+ log.Infof("Untrashed %v on volume %v", hash, vol.String())
untrashedOn = append(untrashedOn, vol.String())
}
}
if numNotFound == len(rtr.volmgr.AllWritable()) {
http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
- return
- }
-
- if len(failedOn) == len(rtr.volmgr.AllWritable()) {
+ } else if len(failedOn) == len(rtr.volmgr.AllWritable()) {
http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
} else {
- respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
+ respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ")
if len(failedOn) > 0 {
- respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
+ respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ")
+ http.Error(resp, respBody, http.StatusInternalServerError)
+ } else {
+ fmt.Fprintln(resp, respBody)
}
- resp.Write([]byte(respBody))
}
}
// DiskHashError.
//
func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
+ log := ctxlog.FromContext(ctx)
+
// Attempt to read the requested hash from a keep volume.
errorToCaller := NotFoundError
// volumes. If all volumes report IsNotExist,
// we return a NotFoundError.
if !os.IsNotExist(err) {
- log.Printf("%s: Get(%s): %s", vol, hash, err)
+ log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol)
}
// If some volume returns a transient error, return it to the caller
// instead of "Not found" so it can retry.
continue
}
// Check the file checksum.
- //
filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
if filehash != hash {
// TODO: Try harder to tell a sysadmin about
// this.
- log.Printf("%s: checksum mismatch for request %s (actual %s)",
- vol, hash, filehash)
+ log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol)
errorToCaller = DiskHashError
continue
}
if errorToCaller == DiskHashError {
- log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
- vol, hash)
+ log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol)
}
return size, nil
}
// provide as much detail as possible.
//
func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (int, error) {
+ log := ctxlog.FromContext(ctx)
+
// Check that BLOCK's checksum matches HASH.
blockhash := fmt.Sprintf("%x", md5.Sum(block))
if blockhash != hash {
// Choose a Keep volume to write to.
// If this volume fails, try all of the volumes in order.
if mnt := volmgr.NextWritable(); mnt != nil {
- if err := mnt.Put(ctx, hash, block); err == nil {
+ if err := mnt.Put(ctx, hash, block); err != nil {
+ log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
+ } else {
return mnt.Replication, nil // success!
}
- if ctx.Err() != nil {
- return 0, ErrClientDisconnect
- }
+ }
+ if ctx.Err() != nil {
+ return 0, ErrClientDisconnect
}
writables := volmgr.AllWritable()
if len(writables) == 0 {
- log.Print("No writable volumes.")
+ log.Error("no writable volumes")
return 0, FullError
}
if ctx.Err() != nil {
return 0, ErrClientDisconnect
}
- if err == nil {
+ switch err {
+ case nil:
return vol.Replication, nil // success!
- }
- if err != FullError {
+ case FullError:
+ continue
+ default:
// The volume is not full but the
// write did not succeed. Report the
// error and continue trying.
allFull = false
- log.Printf("%s: Write(%s): %s", vol, hash, err)
+ log.WithError(err).Errorf("%s: Put(%s) failed", vol, hash)
}
}
if allFull {
- log.Print("All volumes are full.")
+ log.Error("all volumes are full")
return 0, FullError
}
// Already logged the non-full errors.
// premature garbage collection. Otherwise, it returns a non-nil
// error.
func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (int, error) {
+ log := ctxlog.FromContext(ctx)
var bestErr error = NotFoundError
for _, mnt := range volmgr.AllWritable() {
err := mnt.Compare(ctx, hash, buf)
// to tell which one is wanted if we have
// both, so there's no point writing it even
// on a different volume.)
- log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err)
+ log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
return 0, err
} else if os.IsNotExist(err) {
// Block does not exist. This is the only
// Couldn't open file, data is corrupt on
// disk, etc.: log this abnormal condition,
// and try the next volume.
- log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err)
+ log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume)
continue
}
if err := mnt.Touch(hash); err != nil {
- log.Printf("%s: Touch %s failed: %s", mnt.Volume, hash, err)
+ log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
bestErr = err
continue
}
return ""
}
-// IsExpired returns true if the given Unix timestamp (expressed as a
-// hexadecimal string) is in the past, or if timestampHex cannot be
-// parsed as a hexadecimal string.
-func IsExpired(timestampHex string) bool {
- ts, err := strconv.ParseInt(timestampHex, 16, 0)
- if err != nil {
- log.Printf("IsExpired: %s", err)
- return true
- }
- return time.Unix(ts, 0).Before(time.Now())
-}
-
// canDelete returns true if the user identified by apiToken is
// allowed to delete blocks.
func (rtr *router) canDelete(apiToken string) bool {
"fmt"
"io"
"io/ioutil"
- "log"
"net/http"
"os"
"regexp"
}
func newS3Volume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
- v := &S3Volume{cluster: cluster, volume: volume, logger: logger, metrics: metrics}
+ v := &S3Volume{cluster: cluster, volume: volume, metrics: metrics}
err := json.Unmarshal(volume.DriverParameters, &v)
if err != nil {
return nil, err
}
+ v.logger = logger.WithField("Volume", v.String())
return v, v.check()
}
rdr, err = v.bucket.GetReader(loc)
if err != nil {
- log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
+ v.logger.Warnf("reading %s after successful fixRace: %s", loc, err)
err = v.translateError(err)
}
return
go func() {
defer func() {
if ctx.Err() != nil {
- v.logger.Debugf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
+ v.logger.Debugf("abandoned PutReader goroutine finished with err: %s", err)
}
}()
defer close(ready)
}()
select {
case <-ctx.Done():
- v.logger.Debugf("%s: taking PutReader's input away: %s", v, ctx.Err())
+ v.logger.Debugf("taking PutReader's input away: %s", ctx.Err())
// Our pipe might be stuck in Write(), waiting for
// PutReader() to read. If so, un-stick it. This means
// PutReader will get corrupt data, but that's OK: the
go io.Copy(ioutil.Discard, bufr)
// CloseWithError() will return once pending I/O is done.
bufw.CloseWithError(ctx.Err())
- v.logger.Debugf("%s: abandoning PutReader goroutine", v)
+ v.logger.Debugf("abandoning PutReader goroutine")
return ctx.Err()
case <-ready:
// Unblock pipe in case PutReader did not consume it.
// The data object X exists, but recent/X is missing.
err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
if err != nil {
- log.Printf("error: creating %q: %s", "recent/"+loc, err)
+ v.logger.WithError(err).Errorf("error creating %q", "recent/"+loc)
return zeroTime, v.translateError(err)
}
- log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
+ v.logger.Infof("created %q to migrate existing block to new storage scheme", "recent/"+loc)
resp, err = v.bucket.Head("recent/"+loc, nil)
if err != nil {
- log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
+ v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+loc)
return zeroTime, v.translateError(err)
}
} else if err != nil {
func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
// Use a merge sort to find matching sets of X and recent/X.
dataL := s3Lister{
+ Logger: v.logger,
Bucket: v.bucket.Bucket(),
Prefix: prefix,
PageSize: v.IndexPageSize,
Stats: &v.bucket.stats,
}
recentL := s3Lister{
+ Logger: v.logger,
Bucket: v.bucket.Bucket(),
Prefix: "recent/" + prefix,
PageSize: v.IndexPageSize,
trash, err := v.bucket.Head("trash/"+loc, nil)
if err != nil {
if !os.IsNotExist(v.translateError(err)) {
- log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
+ v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+loc)
}
return false
}
trashTime, err := v.lastModified(trash)
if err != nil {
- log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
+ v.logger.WithError(err).Errorf("fixRace: error parsing time %q", trash.Header.Get("Last-Modified"))
return false
}
recent, err := v.bucket.Head("recent/"+loc, nil)
if err != nil {
- log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
+ v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+loc)
return false
}
recentTime, err := v.lastModified(recent)
if err != nil {
- log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
+ v.logger.WithError(err).Errorf("fixRace: error parsing time %q", recent.Header.Get("Last-Modified"))
return false
}
return false
}
- log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
- log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
+ v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
+ v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
err = v.safeCopy(loc, "trash/"+loc)
if err != nil {
- log.Printf("error: fixRace: %s", err)
+ v.logger.WithError(err).Error("fixRace: copy failed")
return false
}
return true
trashT, err := time.Parse(time.RFC3339, trash.LastModified)
if err != nil {
- log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
+ v.logger.Warnf("EmptyTrash: %q: parse %q: %s", trash.Key, trash.LastModified, err)
return
}
recent, err := v.bucket.Head("recent/"+loc, nil)
if err != nil && os.IsNotExist(v.translateError(err)) {
- log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
+ v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", trash.Key, "recent/"+loc, err)
err = v.Untrash(loc)
if err != nil {
- log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
+ v.logger.WithError(err).Errorf("EmptyTrash: Untrash(%q) failed", loc)
}
return
} else if err != nil {
- log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
+ v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+loc)
return
}
recentT, err := v.lastModified(recent)
if err != nil {
- log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
+ v.logger.WithError(err).Warnf("EmptyTrash: %q: error parsing %q", "recent/"+loc, recent.Header.Get("Last-Modified"))
return
}
if trashT.Sub(recentT) < v.cluster.Collections.BlobSigningTTL.Duration() {
// Note this means (TrashSweepInterval
// < BlobSigningTTL - raceWindow) is
// necessary to avoid starvation.
- log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
+ v.logger.Infof("EmptyTrash: detected old race for %q, calling fixRace + Touch", loc)
v.fixRace(loc)
v.Touch(loc)
return
}
_, err := v.bucket.Head(loc, nil)
if os.IsNotExist(err) {
- log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
+ v.logger.Infof("EmptyTrash: detected recent race for %q, calling fixRace", loc)
v.fixRace(loc)
return
} else if err != nil {
- log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
+ v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
return
}
}
}
err = v.bucket.Del(trash.Key)
if err != nil {
- log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
+ v.logger.WithError(err).Errorf("EmptyTrash: error deleting %q", trash.Key)
return
}
atomic.AddInt64(&bytesDeleted, trash.Size)
_, err = v.bucket.Head(loc, nil)
if err == nil {
- log.Printf("warning: %s: EmptyTrash: HEAD %q succeeded immediately after deleting %q", v, loc, loc)
+ v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", loc, loc)
return
}
if !os.IsNotExist(v.translateError(err)) {
- log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
+ v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
return
}
err = v.bucket.Del("recent/" + loc)
if err != nil {
- log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
+ v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+loc)
}
}
}
trashL := s3Lister{
+ Logger: v.logger,
Bucket: v.bucket.Bucket(),
Prefix: "trash/",
PageSize: v.IndexPageSize,
wg.Wait()
if err := trashL.Error(); err != nil {
- log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
+ v.logger.WithError(err).Error("EmptyTrash: lister failed")
}
- log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
+ v.logger.Infof("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
}
type s3Lister struct {
+ Logger logrus.FieldLogger
Bucket *s3.Bucket
Prefix string
PageSize int
lister.buf = make([]s3.Key, 0, len(resp.Contents))
for _, key := range resp.Contents {
if !strings.HasPrefix(key.Key, lister.Prefix) {
- log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
+ lister.Logger.Warnf("s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
continue
}
lister.buf = append(lister.buf, key)
"encoding/json"
"fmt"
"io"
- "log"
"net/http"
"net/http/httptest"
"os"
func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
err := v.bucket.Bucket().Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
if err != nil {
- log.Printf("PutRaw: %s: %+v", loc, err)
+ v.logger.Printf("PutRaw: %s: %+v", loc, err)
}
err = v.bucket.Bucket().Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
if err != nil {
- log.Printf("PutRaw: recent/%s: %+v", loc, err)
+ v.logger.Printf("PutRaw: recent/%s: %+v", loc, err)
}
}
import (
"errors"
- "log"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/sirupsen/logrus"
)
// RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine.
// Delete the block indicated by the trash request Locator
// Repeat
//
-func RunTrashWorker(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashq *WorkQueue) {
+func RunTrashWorker(volmgr *RRVolumeManager, logger logrus.FieldLogger, cluster *arvados.Cluster, trashq *WorkQueue) {
for item := range trashq.NextItem {
trashRequest := item.(TrashRequest)
- TrashItem(volmgr, cluster, trashRequest)
+ TrashItem(volmgr, logger, cluster, trashRequest)
trashq.DoneItem <- struct{}{}
}
}
// TrashItem deletes the indicated block from every writable volume.
-func TrashItem(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashRequest TrashRequest) {
+func TrashItem(volmgr *RRVolumeManager, logger logrus.FieldLogger, cluster *arvados.Cluster, trashRequest TrashRequest) {
reqMtime := time.Unix(0, trashRequest.BlockMtime)
if time.Since(reqMtime) < cluster.Collections.BlobSigningTTL.Duration() {
- log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
+ logger.Warnf("client asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
arvados.Duration(time.Since(reqMtime)),
trashRequest.Locator,
trashRequest.BlockMtime,
if uuid := trashRequest.MountUUID; uuid == "" {
volumes = volmgr.AllWritable()
} else if mnt := volmgr.Lookup(uuid, true); mnt == nil {
- log.Printf("warning: trash request for nonexistent mount: %v", trashRequest)
+ logger.Warnf("trash request for nonexistent mount: %v", trashRequest)
return
} else {
volumes = []*VolumeMount{mnt}
for _, volume := range volumes {
mtime, err := volume.Mtime(trashRequest.Locator)
if err != nil {
- log.Printf("%v Trash(%v): %v", volume, trashRequest.Locator, err)
+ logger.WithError(err).Errorf("%v Trash(%v)", volume, trashRequest.Locator)
continue
}
if trashRequest.BlockMtime != mtime.UnixNano() {
- log.Printf("%v Trash(%v): stored mtime %v does not match trash list value %v", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime)
+ logger.Infof("%v Trash(%v): stored mtime %v does not match trash list value %v; skipping", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime)
continue
}
}
if err != nil {
- log.Printf("%v Trash(%v): %v", volume, trashRequest.Locator, err)
+ logger.WithError(err).Errorf("%v Trash(%v)", volume, trashRequest.Locator)
} else {
- log.Printf("%v Trash(%v) OK", volume, trashRequest.Locator)
+ logger.Infof("%v Trash(%v) OK", volume, trashRequest.Locator)
}
}
}
"context"
"time"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"github.com/prometheus/client_golang/prometheus"
check "gopkg.in/check.v1"
)
}
}
}
- go RunTrashWorker(s.handler.volmgr, s.cluster, trashq)
+ go RunTrashWorker(s.handler.volmgr, ctxlog.TestLogger(c), s.cluster, trashq)
// Install gate so all local operations block until we say go
gate := make(chan struct{})
"fmt"
"io"
"io/ioutil"
- "log"
"os"
"os/exec"
"path/filepath"
if err != nil {
return nil, err
}
+ v.logger = v.logger.WithField("Volume", v.String())
return v, v.check()
}
// "fa0b6166-3b55-4994-bd3f-92f4e00a1bb0/keep".
func (v *UnixVolume) GetDeviceID() string {
giveup := func(f string, args ...interface{}) string {
- log.Printf(f+"; using blank DeviceID for volume %s", append(args, v)...)
+ v.logger.Infof(f+"; using blank DeviceID for volume %s", append(args, v)...)
return ""
}
buf, err := exec.Command("findmnt", "--noheadings", "--target", v.Root).CombinedOutput()
link := filepath.Join(udir, uuid)
fi, err = os.Stat(link)
if err != nil {
- log.Printf("error: stat %q: %s", link, err)
+ v.logger.WithError(err).Errorf("stat(%q) failed", link)
continue
}
if fi.Sys().(*syscall.Stat_t).Ino == ino {
}
bdir := v.blockDir(loc)
if err := os.MkdirAll(bdir, 0755); err != nil {
- log.Printf("%s: could not create directory %s: %s",
- loc, bdir, err)
- return err
+ return fmt.Errorf("error creating directory %s: %s", bdir, err)
}
tmpfile, tmperr := v.os.TempFile(bdir, "tmp"+loc)
if tmperr != nil {
- log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
- return tmperr
+ return fmt.Errorf("TempFile(%s, tmp%s) failed: %s", bdir, loc, tmperr)
}
bpath := v.blockPath(loc)
n, err := io.Copy(tmpfile, rdr)
v.os.stats.TickOutBytes(uint64(n))
if err != nil {
- log.Printf("%s: writing to %s: %s", v, bpath, err)
+ err = fmt.Errorf("error writing %s: %s", bpath, err)
tmpfile.Close()
v.os.Remove(tmpfile.Name())
return err
}
if err := tmpfile.Close(); err != nil {
- log.Printf("closing %s: %s", tmpfile.Name(), err)
+ err = fmt.Errorf("error closing %s: %s", tmpfile.Name(), err)
v.os.Remove(tmpfile.Name())
return err
}
if err := v.os.Rename(tmpfile.Name(), bpath); err != nil {
- log.Printf("rename %s %s: %s", tmpfile.Name(), bpath, err)
- return v.os.Remove(tmpfile.Name())
+ err = fmt.Errorf("error renaming %s to %s: %s", tmpfile.Name(), bpath, err)
+ v.os.Remove(tmpfile.Name())
+ return err
}
return nil
}
func (v *UnixVolume) Status() *VolumeStatus {
fi, err := v.os.Stat(v.Root)
if err != nil {
- log.Printf("%s: os.Stat: %s", v, err)
+ v.logger.WithError(err).Error("stat failed")
return nil
}
devnum := fi.Sys().(*syscall.Stat_t).Dev
var fs syscall.Statfs_t
if err := syscall.Statfs(v.Root, &fs); err != nil {
- log.Printf("%s: statfs: %s", v, err)
+ v.logger.WithError(err).Error("statfs failed")
return nil
}
// These calculations match the way df calculates disk usage:
blockdirpath := filepath.Join(v.Root, names[0])
blockdir, err := v.os.Open(blockdirpath)
if err != nil {
- log.Print("Error reading ", blockdirpath, ": ", err)
- lastErr = err
+ v.logger.WithError(err).Errorf("error reading %q", blockdirpath)
+ lastErr = fmt.Errorf("error reading %q: %s", blockdirpath, err)
continue
}
v.os.stats.TickOps("readdir")
if err == io.EOF {
break
} else if err != nil {
- log.Print("Error reading ", blockdirpath, ": ", err)
- lastErr = err
+ v.logger.WithError(err).Errorf("error reading %q", blockdirpath)
+ lastErr = fmt.Errorf("error reading %q: %s", blockdirpath, err)
break
}
name := fileInfo[0].Name()
" ", fileInfo[0].ModTime().UnixNano(),
"\n")
if err != nil {
- log.Print("Error writing : ", err)
- lastErr = err
- break
+ blockdir.Close()
+ return fmt.Errorf("error writing: %s", err)
}
}
blockdir.Close()
if avail, err := v.FreeDiskSpace(); err == nil {
isFull = avail < MinFreeKilobytes
} else {
- log.Printf("%s: FreeDiskSpace: %s", v, err)
+ v.logger.WithError(err).Errorf("%s: FreeDiskSpace failed", v)
isFull = false
}
}()
select {
case <-ctx.Done():
- log.Printf("%s: client hung up while waiting for Serialize lock (%s)", v, time.Since(t0))
+ v.logger.Infof("client hung up while waiting for Serialize lock (%s)", time.Since(t0))
go func() {
<-locked
v.locker.Unlock()
}
deadline, err := strconv.ParseInt(matches[2], 10, 64)
if err != nil {
- log.Printf("EmptyTrash: %v: ParseInt(%v): %v", path, matches[2], err)
+ v.logger.WithError(err).Errorf("EmptyTrash: %v: ParseInt(%q) failed", path, matches[2])
return
}
atomic.AddInt64(&bytesInTrash, info.Size())
}
err = v.os.Remove(path)
if err != nil {
- log.Printf("EmptyTrash: Remove %v: %v", path, err)
+ v.logger.WithError(err).Errorf("EmptyTrash: Remove(%q) failed", path)
return
}
atomic.AddInt64(&bytesDeleted, info.Size())
err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error {
if err != nil {
- log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
+ v.logger.WithError(err).Errorf("EmptyTrash: filepath.Walk(%q) failed", path)
return nil
}
todo <- dirent{path, info}
wg.Wait()
if err != nil {
- log.Printf("EmptyTrash error for %v: %v", v.String(), err)
+ v.logger.WithError(err).Error("EmptyTrash failed")
}
- log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
+ v.logger.Infof("EmptyTrash stats: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
}
type unixStats struct {
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
check "gopkg.in/check.v1"
Root: d,
locker: locker,
cluster: cluster,
+ logger: ctxlog.TestLogger(c),
volume: volume,
metrics: metrics,
},
Description=Arvados websocket server
Documentation=https://doc.arvados.org/
After=network.target
-AssertPathExists=/etc/arvados/ws/ws.yml
# systemd==229 (ubuntu:xenial) obeys StartLimitInterval in the [Unit] section
StartLimitInterval=0
fi
management_token=$(cat /var/lib/arvados/management_token)
+if ! test -s /var/lib/arvados/system_root_token ; then
+ ruby -e 'puts rand(2**400).to_s(36)' > /var/lib/arvados/system_root_token
+fi
+system_root_token=$(cat /var/lib/arvados/system_root_token)
+
if ! test -s /var/lib/arvados/sso_app_secret ; then
ruby -e 'puts rand(2**400).to_s(36)' > /var/lib/arvados/sso_app_secret
fi
cat >/var/lib/arvados/cluster_config.yml <<EOF
Clusters:
${uuid_prefix}:
+ SystemRootToken: $system_root_token
ManagementToken: $management_token
Services:
RailsAPI:
export R_LIBS=/var/lib/Rlibs
export HOME=$(getent passwd arvbox | cut -d: -f6)
+defaultdev=$(/sbin/ip route|awk '/default/ { print $5 }')
+containerip=$(ip addr show $defaultdev | grep 'inet ' | sed 's/ *inet \(.*\)\/.*/\1/')
if test -s /var/run/localip_override ; then
localip=$(cat /var/run/localip_override)
else
- defaultdev=$(/sbin/ip route|awk '/default/ { print $5 }')
- localip=$(ip addr show $defaultdev | grep 'inet ' | sed 's/ *inet \(.*\)\/.*/\1/')
+ localip=$containerip
fi
root_cert=/var/lib/arvados/root-cert.pem
. /usr/local/lib/arvbox/common.sh
+if [[ $containerip != $localip ]] ; then
+ if ! grep -q $localip /etc/hosts ; then
+ echo $containerip $localip >> /etc/hosts
+ fi
+fi
+
openssl verify -CAfile $root_cert $server_cert
cat <<EOF >/var/lib/arvados/nginx.conf
geo \$external_client {
default 1;
127.0.0.0/8 0;
- $localip/32 0;
+ $containerip/32 0;
}
server {
{
"comment": "",
- "ignore": "test",
+ "ignore": "test appengine",
"package": [
{
"checksumSHA1": "jfYWZyRWLMfG0J5K7G2K8a9AKfs=",
"revision": "4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9",
"revisionTime": "2016-08-04T10:47:26Z"
},
+ {
+ "checksumSHA1": "bNT5FFLDUXSamYK3jGHSwsTJqqo=",
+ "path": "github.com/coreos/go-oidc",
+ "revision": "2be1c5b8a260760503f66dc0996e102b683b3ac3",
+ "revisionTime": "2019-08-15T17:57:29Z"
+ },
{
"checksumSHA1": "+Zz+leZHHC9C0rx8DoRuffSRPso=",
"path": "github.com/coreos/go-systemd/daemon",
"revision": "e881fd58d78e04cf6d0de1217f8707c8cc2249bc",
"revisionTime": "2017-12-16T07:03:16Z"
},
+ {
+ "checksumSHA1": "KxkAlLxQkuSGHH46Dxu6wpAybO4=",
+ "path": "github.com/pquerna/cachecontrol",
+ "revision": "1555304b9b35fdd2b425bccf1a5613677705e7d0",
+ "revisionTime": "2018-05-17T16:36:45Z"
+ },
+ {
+ "checksumSHA1": "wwaht1P9i8vQu6DqNvMEy24IMgY=",
+ "path": "github.com/pquerna/cachecontrol/cacheobject",
+ "revision": "1555304b9b35fdd2b425bccf1a5613677705e7d0",
+ "revisionTime": "2018-05-17T16:36:45Z"
+ },
{
"checksumSHA1": "Ajt29IHVbX99PUvzn8Gc/lMCXBY=",
"path": "github.com/prometheus/client_golang/prometheus",
"revisionTime": "2017-11-10T11:01:46Z"
},
{
- "checksumSHA1": "ySaT8G3I3y4MmnoXOYAAX0rC+p8=",
+ "checksumSHA1": "umeXHK5iK/3th4PtrTkZllezgWo=",
"path": "github.com/sirupsen/logrus",
"revision": "d682213848ed68c0a260ca37d6dd5ace8423f5ba",
"revisionTime": "2017-12-05T20:32:29Z"
"revision": "0fcca4842a8d74bfddc2c96a073bd2a4d2a7a2e8",
"revisionTime": "2017-11-25T19:00:56Z"
},
+ {
+ "checksumSHA1": "1MGpGDQqnUoRpv7VEcQrXOBydXE=",
+ "path": "golang.org/x/crypto/pbkdf2",
+ "revision": "ae8bce0030810cf999bb2b9868ae5c7c58e6343b",
+ "revisionTime": "2018-04-30T17:54:52Z"
+ },
{
"checksumSHA1": "PJY7uCr3UnX4/Mf/RoWnbieSZ8o=",
"path": "golang.org/x/crypto/pkcs12",
"revision": "434ec0c7fe3742c984919a691b2018a6e9694425",
"revisionTime": "2017-09-25T09:26:47Z"
},
+ {
+ "checksumSHA1": "+33kONpAOtjMyyw0uD4AygLvIXg=",
+ "path": "golang.org/x/oauth2",
+ "revision": "ec22f46f877b4505e0117eeaab541714644fdd28",
+ "revisionTime": "2018-05-28T20:23:04Z"
+ },
+ {
+ "checksumSHA1": "fddd1btmbXxnlMKHUZewlVlSaEQ=",
+ "path": "golang.org/x/oauth2/internal",
+ "revision": "ec22f46f877b4505e0117eeaab541714644fdd28",
+ "revisionTime": "2018-05-28T20:23:04Z"
+ },
{
"checksumSHA1": "znPq37/LZ4pJh7B4Lbu0ZuoMhNk=",
"origin": "github.com/docker/docker/vendor/golang.org/x/sys/unix",
"revision": "20d25e2804050c1cd24a7eea1e7a6447dd0e74ec",
"revisionTime": "2016-12-08T18:13:25Z"
},
+ {
+ "checksumSHA1": "oRfTuL23MIBG2nCwjweTJz4Eiqg=",
+ "path": "gopkg.in/square/go-jose.v2",
+ "revision": "730df5f748271903322feb182be83b43ebbbe27d",
+ "revisionTime": "2019-04-10T21:58:30Z"
+ },
+ {
+ "checksumSHA1": "Ho5sr2GbiR8S35IRni7vC54d5Js=",
+ "path": "gopkg.in/square/go-jose.v2/cipher",
+ "revision": "730df5f748271903322feb182be83b43ebbbe27d",
+ "revisionTime": "2019-04-10T21:58:30Z"
+ },
+ {
+ "checksumSHA1": "JFun0lWY9eqd80Js2iWsehu1gc4=",
+ "path": "gopkg.in/square/go-jose.v2/json",
+ "revision": "730df5f748271903322feb182be83b43ebbbe27d",
+ "revisionTime": "2019-04-10T21:58:30Z"
+ },
{
"checksumSHA1": "GdsHg+yOsZtdMvD9HJFovPsqKec=",
"path": "gopkg.in/src-d/go-billy.v4",