- sdk/cli/index.html.textile.liquid
- sdk/cli/reference.html.textile.liquid
- sdk/cli/subcommands.html.textile.liquid
+ - sdk/cli/project-management.html.textile.liquid
- Go:
- sdk/go/index.html.textile.liquid
- sdk/go/example.html.textile.liquid
- Users and Groups:
- admin/user-management.html.textile.liquid
- admin/user-management-cli.html.textile.liquid
+ - admin/group-management.html.textile.liquid
- admin/reassign-ownership.html.textile.liquid
- admin/link-accounts.html.textile.liquid
- - admin/group-management.html.textile.liquid
- admin/federation.html.textile.liquid
- admin/merge-remote-account.html.textile.liquid
- admin/migrating-providers.html.textile.liquid
- user/topics/arvados-sync-external-sources.html.textile.liquid
- admin/scoped-tokens.html.textile.liquid
- admin/token-expiration-policy.html.textile.liquid
- - admin/user-activity.html.textile.liquid
- Monitoring:
- admin/logging.html.textile.liquid
- admin/metrics.html.textile.liquid
- admin/health-checks.html.textile.liquid
- admin/management-token.html.textile.liquid
+ - admin/user-activity.html.textile.liquid
- Data Management:
- admin/collection-versioning.html.textile.liquid
- admin/collection-managed-properties.html.textile.liquid
---
layout: default
navsection: admin
-title: Group management
+title: Role group management at the CLI
...
{% comment %}
This page describes how to manage groups at the command line. You should be familiar with the "permission system":{{site.baseurl}}/api/permission-model.html .
-h2. Create a group
+h2. Create a role group
User groups are entries in the "groups" table with @"group_class": "role"@.
arv group create --group '{"name": "My new group", "group_class": "role"}'
</pre>
-h2(#add). Add a user to a group
+h2(#add). Add a user to a role group
There are two separate permissions associated with group membership. The first link grants the user @can_manage@ permission to manage things that the group can manage. The second link grants permission for other users of the group to see that this user is part of the group.
A user can also be given read-only access to a group. In that case, the first link should be created with @can_read@ instead of @can_manage@.
-h2. List groups
+h2. List role groups
<pre>
arv group list --filters '[["group_class", "=", "role"]]'
</pre>
-h2. List members of a group
+h2. List members of a role group
Use the command "jq":https://stedolan.github.io/jq/ to extract the tail_uuid of each permission link which has the user uuid.
["head_uuid", "=", "the_group_uuid"]]' | jq .items[].tail_uuid
</pre>
-h2. Share a project with a group
+h2(#share-project). Share a project with a role group
-This will give all members of the group @can_manage@ access.
+Members of the role group will have access to the project based on their level of access to the role group.
<pre>
arv link create --link '{
"head_uuid": "the_project_uuid"}'
</pre>
-A project can also be shared read-only. In that case, the first link should be created with @can_read@ instead of @can_manage@.
+A project can also be shared read-only. In that case, the link @name@ should be @can_read@ instead of @can_manage@.
h2. List things shared with the group
["tail_uuid", "=", "the_group_uuid"]]' | jq .items[].head_uuid
</pre>
-h2. Stop sharing a project with a group
+h2(#stop-sharing-project). Stop sharing a project with a group
This will remove access for members of the group.
arv link delete --uuid each_link_uuid
</pre>
-h2. Remove user from a group
+h2. Remove user from a role group
The first step is to find the permission link objects. The second step is to delete them.
<div class="releasenotes">
</notextile>
-h2(#main). development main (as of 2022-08-26)
-"previous: Upgrading to 2.4.2":#v2_4_2
+h2(#main). development main (as of 2022-09-21)
+
+"previous: Upgrading to 2.4.3":#v2_4_3
h3. Login-sync script requires configuration update on LoginCluster federations
If you have @arvados-login-sync@ running on a satellite cluster, please update the environment variable settings by removing the @LOGINCLUSTER_ARVADOS_API_*@ variables and setting @ARVADOS_API_TOKEN@ to a LoginCluster's admin token, as described on the "updated install page":{{site.baseurl}}/install/install-shell-server.html#arvados-login-sync.
+h3. Renamed keep-web metrics and WebDAV configs
+
+Metrics previously reported by keep-web (@arvados_keepweb_collectioncache_requests@, @..._hits@, @..._pdh_hits@, @..._api_calls@, @..._cached_manifests@, and @arvados_keepweb_sessions_cached_collection_bytes@) have been replaced with @arvados_keepweb_cached_session_bytes@.
+
+The config entries @Collections.WebDAVCache.UUIDTTL@, @...MaxCollectionEntries@, and @...MaxUUIDEntries@ are no longer used, and should be removed from your config file.
+
+h2(#v2_4_3). v2.4.3 (2022-09-21)
+
+"previous: Upgrading to 2.4.2":#v2_4_2
+
+h3. Fixed PAM authentication security vulnerability
+
+In Arvados 2.4.2 and earlier, when using PAM authentication, if a user
+presented valid credentials but the account is disabled or otherwise
+not allowed to access the host, it would still be accepted for access
+to Arvados. From 2.4.3 onwards, Arvados now also checks that the
+account is permitted to access the host before completing the PAM login
+process.
+
+Other authentication methods (LDAP, OpenID Connect) are not affected
+by this flaw.
+
h2(#v2_4_2). v2.4.2 (2022-08-09)
"previous: Upgrading to 2.4.1":#v2_4_1
h3. get
-Gets a Collection's metadata by UUID or portable data hash. When making a request by portable data hash, attributes other than @portable_data_hash@ and @manifest_text@ are not returned, even when requested explicitly using the @select@ parameter.
+Gets a Collection's metadata by UUID or portable data hash. When making a request by portable data hash, attributes other than @portable_data_hash@, @manifest_text@, and @trash_at@ are not returned, even when requested explicitly using the @select@ parameter.
Arguments:
</notextile>
-h3(#SbatchArguments). Containers.LSF.BsubArgumentsList
+h3(#BsubArgumentsList). Containers.LSF.BsubArgumentsList
When arvados-dispatch-lsf invokes @bsub@, you can add arguments to the command by specifying @BsubArgumentsList@. You can use this to send the jobs to specific cluster partitions or add resource requests. Set @BsubArgumentsList@ to an array of strings.
Note that the default value for @BsubArgumentsList@ uses the @-o@ and @-e@ arguments to write stdout/stderr data to files in @/tmp@ on the compute nodes, which is helpful for troubleshooting installation/configuration problems. Ensure you have something in place to delete old files from @/tmp@, or adjust these arguments accordingly.
-h3(#SbatchArguments). Containers.LSF.BsubCUDAArguments
+h3(#BsubCUDAArguments). Containers.LSF.BsubCUDAArguments
If the container requests access to GPUs (@runtime_constraints.cuda.device_count@ of the container request is greater than zero), the command line arguments in @BsubCUDAArguments@ will be added to the command line _after_ @BsubArgumentsList@. This should consist of the additional @bsub@ flags your site requires to schedule the job on a node with GPU support. Set @BsubCUDAArguments@ to an array of strings. For example:
</pre>
</notextile>
-h3(#PollPeriod). Containers.PollInterval
+h3(#PollInterval). Containers.PollInterval
arvados-dispatch-lsf polls the API server periodically for new containers to run. The @PollInterval@ option controls how often this poll happens. Set this to a string of numbers suffixed with one of the time units @s@, @m@, or @h@. For example:
</notextile>
-h3(#CrunchRunCommand-network). Containers.CrunchRunArgumentList: Using host networking for containers
+h3(#CrunchRunArgumentList). Containers.CrunchRunArgumentList: Using host networking for containers
Older Linux kernels (prior to 3.18) have bugs in network namespace handling which can lead to compute node lockups. This by is indicated by blocked kernel tasks in "Workqueue: netns cleanup_net". If you are experiencing this problem, as a workaround you can disable use of network namespaces by Docker across the cluster. Be aware this reduces container isolation, which may be a security risk.
</pre>
</notextile>
+
+h3(#InstanceTypes). InstanceTypes: Avoid submitting jobs with unsatisfiable resource constraints
+
+LSF does not provide feedback when a submitted job's RAM, CPU, or disk space constraints cannot be satisfied by any node: the job will wait in the queue indefinitely with "pending" status, reported by Arvados as "queued".
+
+As a workaround, you can configure @InstanceTypes@ with your LSF cluster's compute node sizes. Arvados will use these sizes to determine when a container is impossible to run, and cancel it instead of submitting an LSF job.
+
+Apart from detecting non-runnable containers, the configured instance types will not have any effect on scheduling.
+
+<notextile>
+<pre> InstanceTypes:
+ most-ram:
+ VCPUs: 8
+ RAM: 640GiB
+ IncludedScratch: 640GB
+ most-cpus:
+ VCPUs: 32
+ RAM: 256GiB
+ IncludedScratch: 640GB
+ gpu:
+ VCPUs: 8
+ RAM: 256GiB
+ IncludedScratch: 640GB
+ CUDA:
+ DriverVersion: "11.4"
+ HardwareCapability: "7.5"
+ DeviceCount: 1
+</pre>
+</notextile>
+
+
{% assign arvados_component = 'arvados-dispatch-lsf' %}
{% include 'install_packages' %}
SPDX-License-Identifier: CC-BY-SA-3.0
{% endcomment %}
-h2. Cancel a container request
+# "Cancel a container request":#cancel-a-container-request
+# "Cancel all container requests":#cancel-all-container-requests
+# "List completed container requests":#list-completed-container-requests
+# "Get input of a CWL workflow":#get-input-of-a-cwl-workflow
+# "Get output of a CWL workflow":#get-output-of-a-cwl-workflow
+# "Get state of a CWL workflow":#get-state-of-a-cwl-workflow
+# "List input of child requests":#list-input-of-child-requests
+# "List output of child requests":#list-output-of-child-requests
+# "List failed child requests":#list-failed-child-requests
+# "Get log of a child request":#get-log-of-a-child-request
+# "Create a collection sharing link":#sharing-link
+# "Combine two or more collections":#combine-two-or-more-collections
+# "Upload a file into a new collection":#upload-a-file-into-a-new-collection
+# "Download a file from a collection":#download-a-file-from-a-collection
+# "Copy files from a collection to a new collection":#copy-files-from-a-collection-to-a-new-collection
+# "Copy files from a collection to another collection":#copy-files-from-a-collection-to-another-collection
+# "Delete a file from an existing collection":#delete-a-file-from-an-existing-collection
+# "Listing records with paging":#listing-records-with-paging
+# "Querying the vocabulary definition":#querying-the-vocabulary-definition
+# "Translating between vocabulary identifiers and labels":#translating-between-vocabulary-identifiers-and-labels
+# "Create a Project":#create-a-project
+
+h2(#cancel-a-container-request). Cancel a container request
{% codeblock as python %}
import arvados
arvados.api().container_requests().update(uuid=container_request_uuid, body={"priority": 0}).execute()
{% endcodeblock %}
-h2. Cancel all container requests
+h2(#cancel-all-container-requests). Cancel all container requests
{% codeblock as python %}
import arvados
api.container_requests().update(uuid=container_request["uuid"], body={"priority": 0}).execute()
{% endcodeblock %}
-h2. List completed container requests
+h2(#list-completed-container-requests). List completed container requests
{% codeblock as python %}
import arvados
print("%s, %s, %s" % (container_request["uuid"], container_request["name"], "Success" if container["exit_code"] == 0 else "Failed"))
{% endcodeblock %}
-h2. Get input of a CWL workflow
+h2(#get-input-of-a-cwl-workflow). Get input of a CWL workflow
{% codeblock as python %}
import arvados
print(container_request["mounts"]["/var/lib/cwl/cwl.input.json"])
{% endcodeblock %}
-h2. Get output of a CWL workflow
+h2(#get-output-of-a-cwl-workflow). Get output of a CWL workflow
{% codeblock as python %}
import arvados
print(collection.open("cwl.output.json").read())
{% endcodeblock %}
-h2. Get state of a CWL workflow
+h2(#get-state-of-a-cwl-workflow). Get state of a CWL workflow
{% codeblock as python %}
import arvados
print(get_cr_state(container_request_uuid))
{% endcodeblock %}
-h2. List input of child requests
+h2(#list-input-of-child-requests). List input of child requests
{% codeblock as python %}
import arvados
print(" %s" % m["portable_data_hash"])
{% endcodeblock %}
-h2. List output of child requests
+h2(#list-output-of-child-requests). List output of child requests
{% codeblock as python %}
import arvados
print("%s -> %s" % (c["name"], uuid_to_pdh[c["output_uuid"]]))
{% endcodeblock %}
-h2. List failed child requests
+h2(#list-failed-child-requests). List failed child requests
{% codeblock as python %}
import arvados
print("%s (%s)" % (child_containers[c["uuid"]]["name"], child_containers[c["uuid"]]["uuid"]))
{% endcodeblock %}
-h2. Get log of a child request
+h2(#get-log-of-a-child-request). Get log of a child request
{% codeblock as python %}
import arvados
print("%s/c=%s/t=%s/_/" % (download, collection_uuid, token["api_token"]))
{% endcodeblock %}
-h2. Combine two or more collections
+h2(#combine-two-or-more-collections). Combine two or more collections
Note, if two collections have files of the same name, the contents will be concatenated in the resulting manifest.
import arvados
import arvados.collection
api = arvados.api()
-project_uuid = "zzzzz-tpzed-zzzzzzzzzzzzzzz"
+project_uuid = "zzzzz-j7d0g-zzzzzzzzzzzzzzz"
collection_uuids = ["zzzzz-4zz18-aaaaaaaaaaaaaaa", "zzzzz-4zz18-bbbbbbbbbbbbbbb"]
combined_manifest = ""
for u in collection_uuids:
newcol.save_new(name="My combined collection", owner_uuid=project_uuid)
{% endcodeblock %}
-h2. Upload a file into a new collection
+h2(#upload-a-file-into-a-new-collection). Upload a file into a new collection
{% codeblock as python %}
import arvados
print("Saved %s to %s" % (collection_name, c.manifest_locator()))
{% endcodeblock %}
-h2. Download a file from a collection
+h2(#download-a-file-from-a-collection). Download a file from a collection
{% codeblock as python %}
import arvados
print("Finished downloading %s" % filename)
{% endcodeblock %}
-h2. Copy files from a collection to a new collection
+h2(#copy-files-from-a-collection-to-a-new-collection). Copy files from a collection to a new collection
{% codeblock as python %}
import arvados.collection
print("Created collection %s" % target.manifest_locator())
{% endcodeblock %}
-h2. Copy files from a collection to another collection
+h2(#copy-files-from-a-collection-to-another-collection). Copy files from a collection to another collection
{% codeblock as python %}
import arvados.collection
target.save()
{% endcodeblock %}
-h2. Delete a file from an existing collection
+h2(#delete-a-file-from-an-existing-collection). Delete a file from an existing collection
{% codeblock as python %}
import arvados
c.save()
{% endcodeblock %}
-h2. Listing records with paging
+h2(#listing-records-with-paging). Listing records with paging
Use the @arvados.util.keyset_list_all@ helper method to iterate over all the records matching an optional filter. This method handles paging internally and returns results incrementally using a Python iterator. The first parameter of the method takes a @list@ method of an Arvados resource (@collections@, @container_requests@, etc).
print("got collection " + c["uuid"])
{% endcodeblock %}
-h2. Querying the vocabulary definition
+h2(#querying-the-vocabulary-definition). Querying the vocabulary definition
The Python SDK provides facilities to interact with the "active metadata vocabulary":{{ site.baseurl }}/admin/metadata-vocabulary.html in the system. The developer can do key and value lookups in a case-insensitive manner:
# Example output: 'IDVALSIZES2'
{% endcodeblock %}
-h2. Translating between vocabulary identifiers and labels
+h2(#translating-between-vocabulary-identifiers-and-labels). Translating between vocabulary identifiers and labels
Client software might need to present properties to the user in a human-readable form or take input from the user without requiring them to remember identifiers. For these cases, there're a couple of conversion methods that take a dictionary as input like this:
# Example output: {'Importance': 'Critical'}
voc.convert_to_identifiers({'creature': 'elephant'})
# Example output: {'IDTAGANIMALS': 'IDVALANIMALS3'}
-{% endcodeblock %}
\ No newline at end of file
+{% endcodeblock %}
+
+h2(#create-a-project). Create a Project
+
+{% codeblock as python %}
+import arvados
+
+parent_project_uuid = "zzzzz-j7d0g-zzzzzzzzzzzzzzz"
+project_name = "My project"
+
+g = arvados.api().groups().create(body={
+ "group": {
+ "group_class": "project",
+ "owner_uuid": parent_project_uuid,
+ "name": project_name,
+ }}).execute()
+
+print("New project uuid is", g["uuid"])
+{% endcodeblock %}
# cluster.
RoleGroupsVisibleToAll: true
+ # During each period, a log entry with event_type="activity"
+ # will be recorded for each user who is active during that
+ # period. The object_uuid attribute will indicate the user's
+ # UUID.
+ #
+ # Multiple log entries for the same user may be generated during
+ # a period if there are multiple controller processes or a
+ # controller process is restarted.
+ #
+ # Use 0 to disable activity logging.
+ ActivityLoggingPeriod: 24h
+
AuditLogs:
# Time to keep audit logs, in seconds. (An audit log is a row added
# to the "logs" table in the PostgreSQL database each time an
# Time to cache manifests, permission checks, and sessions.
TTL: 300s
- # Time to cache collection state.
- UUIDTTL: 5s
-
# Block cache entries. Each block consumes up to 64 MiB RAM.
MaxBlockEntries: 20
- # Collection cache entries.
- MaxCollectionEntries: 1000
-
- # Approximate memory limit (in bytes) for collection cache.
+ # Approximate memory limit (in bytes) for session cache.
+ #
+ # Note this applies to the in-memory representation of
+ # projects and collections -- metadata, block locators,
+ # filenames, etc. -- excluding cached file content, which is
+ # limited by MaxBlockEntries.
MaxCollectionBytes: 100000000
- # UUID cache entries.
- MaxUUIDEntries: 1000
-
# Persistent sessions.
MaxSessions: 100
if oc.Cache.TTL != nil {
cluster.Collections.WebDAVCache.TTL = *oc.Cache.TTL
}
- if oc.Cache.UUIDTTL != nil {
- cluster.Collections.WebDAVCache.UUIDTTL = *oc.Cache.UUIDTTL
- }
- if oc.Cache.MaxCollectionEntries != nil {
- cluster.Collections.WebDAVCache.MaxCollectionEntries = *oc.Cache.MaxCollectionEntries
- }
if oc.Cache.MaxCollectionBytes != nil {
cluster.Collections.WebDAVCache.MaxCollectionBytes = *oc.Cache.MaxCollectionBytes
}
- if oc.Cache.MaxUUIDEntries != nil {
- cluster.Collections.WebDAVCache.MaxUUIDEntries = *oc.Cache.MaxUUIDEntries
- }
if oc.AnonymousTokens != nil {
if len(*oc.AnonymousTokens) > 0 {
cluster.Users.AnonymousUserToken = (*oc.AnonymousTokens)[0]
c.Check(cluster.SystemRootToken, check.Equals, "abcdefg")
c.Check(cluster.Collections.WebDAVCache.TTL, check.Equals, arvados.Duration(60*time.Second))
- c.Check(cluster.Collections.WebDAVCache.UUIDTTL, check.Equals, arvados.Duration(time.Second))
- c.Check(cluster.Collections.WebDAVCache.MaxCollectionEntries, check.Equals, 42)
c.Check(cluster.Collections.WebDAVCache.MaxCollectionBytes, check.Equals, int64(1234567890))
- c.Check(cluster.Collections.WebDAVCache.MaxUUIDEntries, check.Equals, 100)
c.Check(cluster.Services.WebDAVDownload.ExternalURL, check.Equals, arvados.URL{Host: "download.example.com", Path: "/"})
c.Check(cluster.Services.WebDAVDownload.InternalURLs[arvados.URL{Host: ":80"}], check.NotNil)
"TLS.Key": false,
"Users": true,
"Users.ActivatedUsersAreVisibleToOthers": false,
+ "Users.ActivityLoggingPeriod": false,
"Users.AdminNotifierEmailFrom": false,
"Users.AnonymousUserToken": true,
"Users.AutoAdminFirstUser": false,
}
return c, err
}
+ if len(options.UUID) < 34 || options.UUID[32] != '+' {
+ return arvados.Collection{}, httpErrorf(http.StatusNotFound, "invalid UUID or PDH %q", options.UUID)
+ }
// UUID is a PDH
first := make(chan arvados.Collection, 1)
err := conn.tryLocalThenRemotes(ctx, options.ForwardedFor, func(ctx context.Context, remoteID string, be backend) error {
h.federation = federation.New(h.Cluster, &healthFuncs)
rtr := router.New(h.federation, router.Config{
MaxRequestSize: h.Cluster.API.MaxRequestSize,
- WrapCalls: api.ComposeWrappers(ctrlctx.WrapCallsInTransactions(h.db), oidcAuthorizer.WrapCalls),
+ WrapCalls: api.ComposeWrappers(
+ ctrlctx.WrapCallsInTransactions(h.db),
+ oidcAuthorizer.WrapCalls,
+ ctrlctx.WrapCallsWithAuth(h.Cluster)),
})
healthRoutes := health.Routes{"ping": func() error { _, err := h.db(context.TODO()); return err }}
"testing"
"time"
+ "git.arvados.org/arvados.git/lib/controller/rpc"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
"git.arvados.org/arvados.git/sdk/go/auth"
time.Sleep(time.Second / 10)
}
}
+
+func (s *HandlerSuite) TestLogActivity(c *check.C) {
+ s.cluster.SystemRootToken = arvadostest.SystemRootToken
+ s.cluster.Users.ActivityLoggingPeriod = arvados.Duration(24 * time.Hour)
+ s.handler.CheckHealth()
+
+ testServer := newServerFromIntegrationTestEnv(c)
+ testServer.Server.Handler = httpserver.AddRequestIDs(httpserver.LogRequests(s.handler))
+ c.Assert(testServer.Start(), check.IsNil)
+ defer testServer.Close()
+
+ u, _ := url.Parse("http://" + testServer.Addr)
+ client := rpc.NewConn(s.cluster.ClusterID, u, true, rpc.PassthroughTokenProvider)
+
+ starttime := time.Now()
+ for i := 0; i < 4; i++ {
+ for _, token := range []string{
+ arvadostest.ActiveTokenV2,
+ arvadostest.ActiveToken,
+ arvadostest.SpectatorToken,
+ } {
+ ctx := auth.NewContext(s.ctx, &auth.Credentials{Tokens: []string{token}})
+ _, err := client.CollectionList(ctx, arvados.ListOptions{})
+ c.Assert(err, check.IsNil)
+ }
+ }
+ db, err := s.handler.db(s.ctx)
+ c.Assert(err, check.IsNil)
+ for _, userUUID := range []string{arvadostest.ActiveUserUUID, arvadostest.SpectatorUserUUID} {
+ var rows int
+ err = db.QueryRowContext(s.ctx, `select count(uuid) from logs where object_uuid = $1 and event_at > $2`, arvadostest.ActiveUserUUID, starttime.UTC()).Scan(&rows)
+ c.Assert(err, check.IsNil)
+ c.Check(rows, check.Equals, 1, check.Commentf("expect 1 row for user uuid %s", userUUID))
+ }
+}
// CollectionGet defers to railsProxy for everything except blob
// signatures.
func (conn *Conn) CollectionGet(ctx context.Context, opts arvados.GetOptions) (arvados.Collection, error) {
+ conn.logActivity(ctx)
if len(opts.Select) > 0 {
// We need to know IsTrashed and TrashAt to implement
// signing properly, even if the caller doesn't want
// CollectionList defers to railsProxy for everything except blob
// signatures.
func (conn *Conn) CollectionList(ctx context.Context, opts arvados.ListOptions) (arvados.CollectionList, error) {
+ conn.logActivity(ctx)
if len(opts.Select) > 0 {
// We need to know IsTrashed and TrashAt to implement
// signing properly, even if the caller doesn't want
// CollectionCreate defers to railsProxy for everything except blob
// signatures and vocabulary checking.
func (conn *Conn) CollectionCreate(ctx context.Context, opts arvados.CreateOptions) (arvados.Collection, error) {
+ conn.logActivity(ctx)
err := conn.checkProperties(ctx, opts.Attrs["properties"])
if err != nil {
return arvados.Collection{}, err
// CollectionUpdate defers to railsProxy for everything except blob
// signatures and vocabulary checking.
func (conn *Conn) CollectionUpdate(ctx context.Context, opts arvados.UpdateOptions) (arvados.Collection, error) {
+ conn.logActivity(ctx)
err := conn.checkProperties(ctx, opts.Attrs["properties"])
if err != nil {
return arvados.Collection{}, err
"fmt"
"net/http"
"os"
- "strings"
"sync"
"time"
lastVocabularyRefreshCheck time.Time
lastVocabularyError error
loginController
- gwTunnels map[string]*yamux.Session
- gwTunnelsLock sync.Mutex
+ gwTunnels map[string]*yamux.Session
+ gwTunnelsLock sync.Mutex
+ activeUsers map[string]bool
+ activeUsersLock sync.Mutex
+ activeUsersReset time.Time
}
func NewConn(cluster *arvados.Cluster) *Conn {
return conn.loginController.UserAuthenticate(ctx, opts)
}
-func (conn *Conn) GroupContents(ctx context.Context, options arvados.GroupContentsOptions) (arvados.ObjectList, error) {
- // The requested UUID can be a user (virtual home project), which we just pass on to
- // the API server.
- if strings.Index(options.UUID, "-j7d0g-") != 5 {
- return conn.railsProxy.GroupContents(ctx, options)
- }
-
- var resp arvados.ObjectList
-
- // Get the group object
- respGroup, err := conn.GroupGet(ctx, arvados.GetOptions{UUID: options.UUID})
- if err != nil {
- return resp, err
- }
-
- // If the group has groupClass 'filter', apply the filters before getting the contents.
- if respGroup.GroupClass == "filter" {
- if filters, ok := respGroup.Properties["filters"].([]interface{}); ok {
- for _, f := range filters {
- // f is supposed to be a []string
- tmp, ok2 := f.([]interface{})
- if !ok2 || len(tmp) < 3 {
- return resp, fmt.Errorf("filter unparsable: %T, %+v, original field: %T, %+v\n", tmp, tmp, f, f)
- }
- var filter arvados.Filter
- if attr, ok2 := tmp[0].(string); ok2 {
- filter.Attr = attr
- } else {
- return resp, fmt.Errorf("filter unparsable: attribute must be string: %T, %+v, filter: %T, %+v\n", tmp[0], tmp[0], f, f)
- }
- if operator, ok2 := tmp[1].(string); ok2 {
- filter.Operator = operator
- } else {
- return resp, fmt.Errorf("filter unparsable: operator must be string: %T, %+v, filter: %T, %+v\n", tmp[1], tmp[1], f, f)
- }
- filter.Operand = tmp[2]
- options.Filters = append(options.Filters, filter)
- }
- } else {
- return resp, fmt.Errorf("filter unparsable: not an array\n")
- }
- // Use the generic /groups/contents endpoint for filter groups
- options.UUID = ""
- }
-
- return conn.railsProxy.GroupContents(ctx, options)
-}
-
func httpErrorf(code int, format string, args ...interface{}) error {
return httpserver.ErrorWithStatus(fmt.Errorf(format, args...), code)
}
// ContainerRequestCreate defers to railsProxy for everything except
// vocabulary checking.
func (conn *Conn) ContainerRequestCreate(ctx context.Context, opts arvados.CreateOptions) (arvados.ContainerRequest, error) {
+ conn.logActivity(ctx)
err := conn.checkProperties(ctx, opts.Attrs["properties"])
if err != nil {
return arvados.ContainerRequest{}, err
// ContainerRequestUpdate defers to railsProxy for everything except
// vocabulary checking.
func (conn *Conn) ContainerRequestUpdate(ctx context.Context, opts arvados.UpdateOptions) (arvados.ContainerRequest, error) {
+ conn.logActivity(ctx)
err := conn.checkProperties(ctx, opts.Attrs["properties"])
if err != nil {
return arvados.ContainerRequest{}, err
}
return resp, nil
}
+
+func (conn *Conn) ContainerRequestGet(ctx context.Context, opts arvados.GetOptions) (arvados.ContainerRequest, error) {
+ conn.logActivity(ctx)
+ return conn.railsProxy.ContainerRequestGet(ctx, opts)
+}
+
+func (conn *Conn) ContainerRequestList(ctx context.Context, opts arvados.ListOptions) (arvados.ContainerRequestList, error) {
+ conn.logActivity(ctx)
+ return conn.railsProxy.ContainerRequestList(ctx, opts)
+}
+
+func (conn *Conn) ContainerRequestDelete(ctx context.Context, opts arvados.DeleteOptions) (arvados.ContainerRequest, error) {
+ conn.logActivity(ctx)
+ return conn.railsProxy.ContainerRequestDelete(ctx, opts)
+}
import (
"context"
+ "fmt"
+ "strings"
"git.arvados.org/arvados.git/sdk/go/arvados"
)
// GroupCreate defers to railsProxy for everything except vocabulary
// checking.
func (conn *Conn) GroupCreate(ctx context.Context, opts arvados.CreateOptions) (arvados.Group, error) {
+ conn.logActivity(ctx)
err := conn.checkProperties(ctx, opts.Attrs["properties"])
if err != nil {
return arvados.Group{}, err
return resp, nil
}
+func (conn *Conn) GroupGet(ctx context.Context, opts arvados.GetOptions) (arvados.Group, error) {
+ conn.logActivity(ctx)
+ return conn.railsProxy.GroupGet(ctx, opts)
+}
+
// GroupUpdate defers to railsProxy for everything except vocabulary
// checking.
func (conn *Conn) GroupUpdate(ctx context.Context, opts arvados.UpdateOptions) (arvados.Group, error) {
+ conn.logActivity(ctx)
err := conn.checkProperties(ctx, opts.Attrs["properties"])
if err != nil {
return arvados.Group{}, err
}
return resp, nil
}
+
+func (conn *Conn) GroupList(ctx context.Context, opts arvados.ListOptions) (arvados.GroupList, error) {
+ conn.logActivity(ctx)
+ return conn.railsProxy.GroupList(ctx, opts)
+}
+
+func (conn *Conn) GroupDelete(ctx context.Context, opts arvados.DeleteOptions) (arvados.Group, error) {
+ conn.logActivity(ctx)
+ return conn.railsProxy.GroupDelete(ctx, opts)
+}
+
+func (conn *Conn) GroupContents(ctx context.Context, options arvados.GroupContentsOptions) (arvados.ObjectList, error) {
+ conn.logActivity(ctx)
+
+ // The requested UUID can be a user (virtual home project), which we just pass on to
+ // the API server.
+ if strings.Index(options.UUID, "-j7d0g-") != 5 {
+ return conn.railsProxy.GroupContents(ctx, options)
+ }
+
+ var resp arvados.ObjectList
+
+ // Get the group object
+ respGroup, err := conn.GroupGet(ctx, arvados.GetOptions{UUID: options.UUID})
+ if err != nil {
+ return resp, err
+ }
+
+ // If the group has groupClass 'filter', apply the filters before getting the contents.
+ if respGroup.GroupClass == "filter" {
+ if filters, ok := respGroup.Properties["filters"].([]interface{}); ok {
+ for _, f := range filters {
+ // f is supposed to be a []string
+ tmp, ok2 := f.([]interface{})
+ if !ok2 || len(tmp) < 3 {
+ return resp, fmt.Errorf("filter unparsable: %T, %+v, original field: %T, %+v\n", tmp, tmp, f, f)
+ }
+ var filter arvados.Filter
+ if attr, ok2 := tmp[0].(string); ok2 {
+ filter.Attr = attr
+ } else {
+ return resp, fmt.Errorf("filter unparsable: attribute must be string: %T, %+v, filter: %T, %+v\n", tmp[0], tmp[0], f, f)
+ }
+ if operator, ok2 := tmp[1].(string); ok2 {
+ filter.Operator = operator
+ } else {
+ return resp, fmt.Errorf("filter unparsable: operator must be string: %T, %+v, filter: %T, %+v\n", tmp[1], tmp[1], f, f)
+ }
+ filter.Operand = tmp[2]
+ options.Filters = append(options.Filters, filter)
+ }
+ } else {
+ return resp, fmt.Errorf("filter unparsable: not an array\n")
+ }
+ // Use the generic /groups/contents endpoint for filter groups
+ options.UUID = ""
+ }
+
+ return conn.railsProxy.GroupContents(ctx, options)
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package localdb
+
+import (
+ "context"
+ "time"
+
+ "git.arvados.org/arvados.git/lib/ctrlctx"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+)
+
+func (conn *Conn) logActivity(ctx context.Context) {
+ p := conn.cluster.Users.ActivityLoggingPeriod.Duration()
+ if p < 1 {
+ ctxlog.FromContext(ctx).Debug("logActivity disabled by config")
+ return
+ }
+ user, _, err := ctrlctx.CurrentAuth(ctx)
+ if err == ctrlctx.ErrUnauthenticated {
+ ctxlog.FromContext(ctx).Debug("logActivity skipped for unauthenticated request")
+ return
+ } else if err != nil {
+ ctxlog.FromContext(ctx).WithError(err).Error("logActivity CurrentAuth failed")
+ return
+ }
+ now := time.Now()
+ conn.activeUsersLock.Lock()
+ if conn.activeUsers == nil || conn.activeUsersReset.IsZero() || conn.activeUsersReset.Before(now) {
+ conn.activeUsersReset = alignedPeriod(now, p)
+ conn.activeUsers = map[string]bool{}
+ }
+ logged := conn.activeUsers[user.UUID]
+ if !logged {
+ // Prevent other concurrent calls from logging about
+ // this user until we finish.
+ conn.activeUsers[user.UUID] = true
+ }
+ conn.activeUsersLock.Unlock()
+ if logged {
+ return
+ }
+ defer func() {
+ // If we return without logging, reset the flag so we
+ // try again on the user's next API call.
+ if !logged {
+ conn.activeUsersLock.Lock()
+ conn.activeUsers[user.UUID] = false
+ conn.activeUsersLock.Unlock()
+ }
+ }()
+
+ tx, err := ctrlctx.NewTx(ctx)
+ if err != nil {
+ ctxlog.FromContext(ctx).WithError(err).Error("logActivity NewTx failed")
+ return
+ }
+ defer tx.Rollback()
+ _, err = tx.ExecContext(ctx, `
+insert into logs
+ (uuid,
+ owner_uuid, modified_by_user_uuid, object_owner_uuid,
+ event_type,
+ summary,
+ object_uuid,
+ properties,
+ event_at, created_at, updated_at, modified_at)
+ values
+ ($1, $2, $2, $2, $3, $4, $5, $6,
+ current_timestamp at time zone 'UTC',
+ current_timestamp at time zone 'UTC',
+ current_timestamp at time zone 'UTC',
+ current_timestamp at time zone 'UTC')
+ returning id`,
+ arvados.RandomUUID(conn.cluster.ClusterID, "57u5n"),
+ conn.cluster.ClusterID+"-tpzed-000000000000000", // both modified_by and object_owner
+ "activity",
+ "activity of "+user.UUID,
+ user.UUID,
+ "{}")
+ if err != nil {
+ ctxlog.FromContext(ctx).WithError(err).Error("logActivity query failed")
+ return
+ }
+ err = tx.Commit()
+ if err != nil {
+ ctxlog.FromContext(ctx).WithError(err).Error("logActivity commit failed")
+ return
+ }
+ logged = true
+}
+
+// alignedPeriod computes a time interval that includes now and aligns
+// to local clock times that are multiples of p. For example, if local
+// time is UTC-5 and ActivityLoggingPeriod=4h, periodStart and
+// periodEnd will be 0000-0400, 0400-0800, etc., in local time. If p
+// is a multiple of 24h, periods will start and end at midnight.
+//
+// If DST starts or ends during this period, the boundaries will be
+// aligned based on either DST or non-DST time depending on whether
+// now is before or after the DST transition. The consequences are
+// presumed to be inconsequential, e.g., logActivity may unnecessarily
+// log activity more than once in a period that includes a DST
+// transition.
+//
+// In all cases, the period ends in the future.
+//
+// Only the end of the period is returned.
+func alignedPeriod(now time.Time, p time.Duration) time.Time {
+ _, tzsec := now.Zone()
+ tzoff := time.Duration(tzsec) * time.Second
+ periodStart := now.Add(tzoff).Truncate(p).Add(-tzoff)
+ return periodStart.Add(p)
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package localdb
+
+import (
+ "context"
+ "database/sql"
+ "time"
+
+ "git.arvados.org/arvados.git/lib/controller/api"
+ "git.arvados.org/arvados.git/lib/ctrlctx"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadostest"
+ "git.arvados.org/arvados.git/sdk/go/auth"
+ "github.com/jmoiron/sqlx"
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&activityPeriodSuite{})
+
+type activityPeriodSuite struct{}
+
+// The important thing is that, even when daylight savings time is
+// making things difficult, the current period ends in the future.
+func (*activityPeriodSuite) TestPeriod(c *check.C) {
+ toronto, err := time.LoadLocation("America/Toronto")
+ c.Assert(err, check.IsNil)
+
+ format := "2006-01-02 15:04:05 MST"
+ dststartday, err := time.ParseInLocation(format, "2022-03-13 00:00:00 EST", toronto)
+ c.Assert(err, check.IsNil)
+ dstendday, err := time.ParseInLocation(format, "2022-11-06 00:00:00 EDT", toronto)
+ c.Assert(err, check.IsNil)
+
+ for _, period := range []time.Duration{
+ time.Minute * 13,
+ time.Minute * 49,
+ time.Hour,
+ 4 * time.Hour,
+ 48 * time.Hour,
+ } {
+ for offset := time.Duration(0); offset < 48*time.Hour; offset += 3 * time.Minute {
+ t := dststartday.Add(offset)
+ end := alignedPeriod(t, period)
+ c.Check(end.After(t), check.Equals, true, check.Commentf("period %v offset %v", period, offset))
+
+ t = dstendday.Add(offset)
+ end = alignedPeriod(t, period)
+ c.Check(end.After(t), check.Equals, true, check.Commentf("period %v offset %v", period, offset))
+ }
+ }
+}
+
+func (s *CollectionSuite) TestLogActivity(c *check.C) {
+ starttime := time.Now()
+ s.localdb.activeUsersLock.Lock()
+ s.localdb.activeUsersReset = starttime
+ s.localdb.activeUsersLock.Unlock()
+ db := arvadostest.DB(c, s.cluster)
+ wrap := api.ComposeWrappers(
+ ctrlctx.WrapCallsInTransactions(func(ctx context.Context) (*sqlx.DB, error) { return db, nil }),
+ ctrlctx.WrapCallsWithAuth(s.cluster))
+ collectionCreate := wrap(func(ctx context.Context, opts interface{}) (interface{}, error) {
+ return s.localdb.CollectionCreate(ctx, opts.(arvados.CreateOptions))
+ })
+ ctx := auth.NewContext(context.Background(), &auth.Credentials{Tokens: []string{arvadostest.ActiveTokenV2}})
+ for i := 0; i < 2; i++ {
+ logthreshold := time.Now()
+ _, err := collectionCreate(ctx, arvados.CreateOptions{
+ Attrs: map[string]interface{}{
+ "name": "test collection",
+ },
+ EnsureUniqueName: true,
+ })
+ c.Assert(err, check.IsNil)
+ var uuid string
+ err = db.QueryRowContext(ctx, `select uuid from logs where object_uuid = $1 and event_at > $2`, arvadostest.ActiveUserUUID, logthreshold.UTC()).Scan(&uuid)
+ if i == 0 {
+ c.Check(err, check.IsNil)
+ c.Check(uuid, check.HasLen, 27)
+ } else {
+ c.Check(err, check.Equals, sql.ErrNoRows)
+ }
+ }
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package ctrlctx
+
+import (
+ "context"
+ "crypto/hmac"
+ "crypto/sha256"
+ "database/sql"
+ "errors"
+ "fmt"
+ "io"
+ "strings"
+ "sync"
+ "time"
+
+ "git.arvados.org/arvados.git/lib/controller/api"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/auth"
+ "github.com/ghodss/yaml"
+)
+
+var (
+ ErrNoAuthContext = errors.New("bug: there is no authorization in this context")
+ ErrUnauthenticated = errors.New("unauthenticated request")
+)
+
+// WrapCallsWithAuth returns a call wrapper (suitable for assigning to
+// router.router.WrapCalls) that makes CurrentUser(ctx) et al. work
+// from inside the wrapped functions.
+//
+// The incoming context must come from WrapCallsInTransactions or
+// NewWithTransaction.
+func WrapCallsWithAuth(cluster *arvados.Cluster) func(api.RoutableFunc) api.RoutableFunc {
+ var authcache authcache
+ return func(origFunc api.RoutableFunc) api.RoutableFunc {
+ return func(ctx context.Context, opts interface{}) (_ interface{}, err error) {
+ var tokens []string
+ if creds, ok := auth.FromContext(ctx); ok {
+ tokens = creds.Tokens
+ }
+ return origFunc(context.WithValue(ctx, contextKeyAuth, &authcontext{
+ authcache: &authcache,
+ cluster: cluster,
+ tokens: tokens,
+ }), opts)
+ }
+ }
+}
+
+// CurrentAuth returns the arvados.User whose privileges should be
+// used in the given context, and the arvados.APIClientAuthorization
+// the caller presented in order to authenticate the current request.
+//
+// Returns ErrUnauthenticated if the current request was not
+// authenticated (no token provided, token is expired, etc).
+func CurrentAuth(ctx context.Context) (*arvados.User, *arvados.APIClientAuthorization, error) {
+ ac, ok := ctx.Value(contextKeyAuth).(*authcontext)
+ if !ok {
+ return nil, nil, ErrNoAuthContext
+ }
+ ac.lookupOnce.Do(func() {
+ // We only validate/lookup the token once per API
+ // call, even though authcache should be efficient
+ // enough to do a lookup each time. This guarantees we
+ // always return the same result when called multiple
+ // times in the course of handling a single API call.
+ for _, token := range ac.tokens {
+ user, aca, err := ac.authcache.lookup(ctx, ac.cluster, token)
+ if err != nil {
+ ac.err = err
+ return
+ }
+ if user != nil {
+ ac.user, ac.apiClientAuthorization = user, aca
+ return
+ }
+ }
+ ac.err = ErrUnauthenticated
+ })
+ return ac.user, ac.apiClientAuthorization, ac.err
+}
+
+type contextKeyA string
+
+var contextKeyAuth = contextKeyT("auth")
+
+type authcontext struct {
+ authcache *authcache
+ cluster *arvados.Cluster
+ tokens []string
+ user *arvados.User
+ apiClientAuthorization *arvados.APIClientAuthorization
+ err error
+ lookupOnce sync.Once
+}
+
+var authcacheTTL = time.Minute
+
+type authcacheent struct {
+ expireTime time.Time
+ apiClientAuthorization arvados.APIClientAuthorization
+ user arvados.User
+}
+
+type authcache struct {
+ mtx sync.Mutex
+ entries map[string]*authcacheent
+ nextCleanup time.Time
+}
+
+// lookup returns the user and aca info for a given token. Returns nil
+// if the token is not valid. Returns a non-nil error if there was an
+// unexpected error from the database, etc.
+func (ac *authcache) lookup(ctx context.Context, cluster *arvados.Cluster, token string) (*arvados.User, *arvados.APIClientAuthorization, error) {
+ ac.mtx.Lock()
+ ent := ac.entries[token]
+ ac.mtx.Unlock()
+ if ent != nil && ent.expireTime.After(time.Now()) {
+ return &ent.user, &ent.apiClientAuthorization, nil
+ }
+ if token == "" {
+ return nil, nil, nil
+ }
+ tx, err := CurrentTx(ctx)
+ if err != nil {
+ return nil, nil, err
+ }
+ var aca arvados.APIClientAuthorization
+ var user arvados.User
+
+ var cond string
+ var args []interface{}
+ if len(token) > 30 && strings.HasPrefix(token, "v2/") && token[30] == '/' {
+ fields := strings.Split(token, "/")
+ cond = `aca.uuid = $1 and aca.api_token = $2`
+ args = []interface{}{fields[1], fields[2]}
+ } else {
+ // Bare token or OIDC access token
+ mac := hmac.New(sha256.New, []byte(cluster.SystemRootToken))
+ io.WriteString(mac, token)
+ hmac := fmt.Sprintf("%x", mac.Sum(nil))
+ cond = `aca.api_token in ($1, $2)`
+ args = []interface{}{token, hmac}
+ }
+ var expiresAt sql.NullTime
+ var scopesYAML []byte
+ err = tx.QueryRowContext(ctx, `
+select aca.uuid, aca.expires_at, aca.api_token, aca.scopes, users.uuid, users.is_active, users.is_admin
+ from api_client_authorizations aca
+ left join users on aca.user_id = users.id
+ where `+cond+`
+ and (expires_at is null or expires_at > current_timestamp at time zone 'UTC')`, args...).Scan(
+ &aca.UUID, &expiresAt, &aca.APIToken, &scopesYAML,
+ &user.UUID, &user.IsActive, &user.IsAdmin)
+ if err == sql.ErrNoRows {
+ return nil, nil, nil
+ } else if err != nil {
+ return nil, nil, err
+ }
+ aca.ExpiresAt = expiresAt.Time
+ if len(scopesYAML) > 0 {
+ err = yaml.Unmarshal(scopesYAML, &aca.Scopes)
+ if err != nil {
+ return nil, nil, fmt.Errorf("loading scopes for %s: %w", aca.UUID, err)
+ }
+ }
+ ent = &authcacheent{
+ expireTime: time.Now().Add(authcacheTTL),
+ apiClientAuthorization: aca,
+ user: user,
+ }
+ ac.mtx.Lock()
+ defer ac.mtx.Unlock()
+ if ac.entries == nil {
+ ac.entries = map[string]*authcacheent{}
+ }
+ if ac.nextCleanup.IsZero() || ac.nextCleanup.Before(time.Now()) {
+ for token, ent := range ac.entries {
+ if !ent.expireTime.After(time.Now()) {
+ delete(ac.entries, token)
+ }
+ }
+ ac.nextCleanup = time.Now().Add(authcacheTTL)
+ }
+ ac.entries[token] = ent
+ return &ent.user, &ent.apiClientAuthorization, nil
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package ctrlctx
+
+import (
+ "context"
+
+ "git.arvados.org/arvados.git/lib/config"
+ "git.arvados.org/arvados.git/sdk/go/arvadostest"
+ "git.arvados.org/arvados.git/sdk/go/auth"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "github.com/jmoiron/sqlx"
+ _ "github.com/lib/pq"
+ check "gopkg.in/check.v1"
+)
+
+func (*DatabaseSuite) TestAuthContext(c *check.C) {
+ cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+ c.Assert(err, check.IsNil)
+ cluster, err := cfg.GetCluster("")
+ c.Assert(err, check.IsNil)
+
+ getter := func(context.Context) (*sqlx.DB, error) {
+ return sqlx.Open("postgres", cluster.PostgreSQL.Connection.String())
+ }
+ authwrapper := WrapCallsWithAuth(cluster)
+ dbwrapper := WrapCallsInTransactions(getter)
+
+ // valid tokens
+ for _, token := range []string{
+ arvadostest.ActiveToken,
+ arvadostest.ActiveTokenV2,
+ arvadostest.ActiveTokenV2 + "/asdfasdfasdf",
+ arvadostest.ActiveTokenV2, // cached
+ } {
+ ok, err := dbwrapper(authwrapper(func(ctx context.Context, opts interface{}) (interface{}, error) {
+ user, aca, err := CurrentAuth(ctx)
+ if c.Check(err, check.IsNil) {
+ c.Check(user.UUID, check.Equals, "zzzzz-tpzed-xurymjxw79nv3jz")
+ c.Check(aca.UUID, check.Equals, "zzzzz-gj3su-077z32aux8dg2s1")
+ c.Check(aca.Scopes, check.DeepEquals, []string{"all"})
+ }
+ return true, nil
+ }))(auth.NewContext(context.Background(), auth.NewCredentials(token)), "blah")
+ c.Check(ok, check.Equals, true)
+ c.Check(err, check.IsNil)
+ }
+
+ // bad tokens
+ for _, token := range []string{
+ arvadostest.ActiveToken + "X",
+ arvadostest.ActiveTokenV2 + "X",
+ arvadostest.ActiveTokenV2[:30], // "v2/{uuid}"
+ arvadostest.ActiveTokenV2[:31], // "v2/{uuid}/"
+ "bogus",
+ "",
+ } {
+ ok, err := dbwrapper(authwrapper(func(ctx context.Context, opts interface{}) (interface{}, error) {
+ user, aca, err := CurrentAuth(ctx)
+ c.Check(err, check.Equals, ErrUnauthenticated)
+ c.Check(user, check.IsNil)
+ c.Check(aca, check.IsNil)
+ return true, err
+ }))(auth.NewContext(context.Background(), auth.NewCredentials(token)), "blah")
+ c.Check(ok, check.Equals, true)
+ c.Check(err, check.Equals, ErrUnauthenticated)
+ }
+
+ // no auth context
+ {
+ ok, err := dbwrapper(authwrapper(func(ctx context.Context, opts interface{}) (interface{}, error) {
+ user, aca, err := CurrentAuth(ctx)
+ c.Check(err, check.Equals, ErrUnauthenticated)
+ c.Check(user, check.IsNil)
+ c.Check(aca, check.IsNil)
+ return true, err
+ }))(context.Background(), "blah")
+ c.Check(ok, check.Equals, true)
+ c.Check(err, check.Equals, ErrUnauthenticated)
+ }
+}
"git.arvados.org/arvados.git/lib/controller/api"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"github.com/jmoiron/sqlx"
+
// sqlx needs lib/pq to talk to PostgreSQL
_ "github.com/lib/pq"
)
}
}
+// NewTx starts a new transaction. The caller is responsible for
+// calling Commit or Rollback. This is suitable for database queries
+// that are separate from the API transaction (see CurrentTx), e.g.,
+// ones that will be committed even if the API call fails, or held
+// open after the API call finishes.
+func NewTx(ctx context.Context) (*sqlx.Tx, error) {
+ txn, ok := ctx.Value(contextKeyTransaction).(*transaction)
+ if !ok {
+ return nil, ErrNoTransaction
+ }
+ db, err := txn.getdb(ctx)
+ if err != nil {
+ return nil, err
+ }
+ return db.Beginx()
+}
+
+// CurrentTx returns a transaction that will be committed after the
+// current API call completes, or rolled back if the current API call
+// returns an error.
func CurrentTx(ctx context.Context) (*sqlx.Tx, error) {
txn, ok := ctx.Value(contextKeyTransaction).(*transaction)
if !ok {
"net"
"net/http"
"net/url"
+ "os"
"strings"
"time"
"git.arvados.org/arvados.git/lib/cmd"
+ "git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "git.arvados.org/arvados.git/sdk/go/health"
"github.com/sirupsen/logrus"
)
return
}
+ diag.dotest(5, "running health check (same as `arvados-server check`)", func() error {
+ ldr := config.NewLoader(&bytes.Buffer{}, ctxlog.New(&bytes.Buffer{}, "text", "info"))
+ ldr.SetupFlags(flag.NewFlagSet("diagnostics", flag.ContinueOnError))
+ cfg, err := ldr.Load()
+ if err != nil {
+ diag.infof("skipping because config could not be loaded: %s", err)
+ return nil
+ }
+ cluster, err := cfg.GetCluster("")
+ if err != nil {
+ return err
+ }
+ if cluster.SystemRootToken != os.Getenv("ARVADOS_API_TOKEN") {
+ diag.infof("skipping because provided token is not SystemRootToken")
+ }
+ agg := &health.Aggregator{Cluster: cluster}
+ resp := agg.ClusterHealth()
+ for _, e := range resp.Errors {
+ diag.errorf("health check: %s", e)
+ }
+ diag.infof("health check: reported clock skew %v", resp.ClockSkew)
+ return nil
+ })
+
var dd arvados.DiscoveryDocument
ddpath := "discovery/v1/apis/arvados/v1/rest"
diag.dotest(10, fmt.Sprintf("getting discovery document from https://%s/%s", client.APIHost, ddpath), func() error {
if ctr.State != dispatch.Locked {
// already started by prior invocation
} else if _, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok {
+ if _, err := dispatchcloud.ChooseInstanceType(disp.Cluster, &ctr); errors.As(err, &dispatchcloud.ConstraintsNotSatisfiableError{}) {
+ err := disp.arvDispatcher.Arv.Update("containers", ctr.UUID, arvadosclient.Dict{
+ "container": map[string]interface{}{
+ "runtime_status": map[string]string{
+ "error": err.Error(),
+ },
+ },
+ }, nil)
+ if err != nil {
+ return fmt.Errorf("error setting runtime_status on %s: %s", ctr.UUID, err)
+ }
+ return disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
+ }
disp.logger.Printf("Submitting container %s to LSF", ctr.UUID)
cmd := []string{disp.Cluster.Containers.CrunchRunCommand}
cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine)
defer disp.logger.Printf("Done monitoring container %s", ctr.UUID)
go func(uuid string) {
- cancelled := false
for ctx.Err() == nil {
- qent, ok := disp.lsfqueue.Lookup(uuid)
+ _, ok := disp.lsfqueue.Lookup(uuid)
if !ok {
// If the container disappears from
// the lsf queue, there is no point in
cancel()
return
}
- if !cancelled && qent.Stat == "PEND" && strings.Contains(qent.PendReason, "There are no suitable hosts for the job") {
- disp.logger.Printf("container %s: %s", uuid, qent.PendReason)
- err := disp.arvDispatcher.Arv.Update("containers", uuid, arvadosclient.Dict{
- "container": map[string]interface{}{
- "runtime_status": map[string]string{
- "error": qent.PendReason,
- },
- },
- }, nil)
- if err != nil {
- disp.logger.Printf("error setting runtime_status on %s: %s", uuid, err)
- continue // retry
- }
- err = disp.arvDispatcher.UpdateState(uuid, dispatch.Cancelled)
- if err != nil {
- continue // retry (UpdateState() already logged the error)
- }
- cancelled = true
- }
}
}(ctr.UUID)
type suite struct {
disp *dispatcher
crTooBig arvados.ContainerRequest
+ crPending arvados.ContainerRequest
crCUDARequest arvados.ContainerRequest
}
c.Assert(err, check.IsNil)
cluster.Containers.CloudVMs.PollInterval = arvados.Duration(time.Second / 4)
cluster.Containers.MinRetryPeriod = arvados.Duration(time.Second / 4)
+ cluster.InstanceTypes = arvados.InstanceTypeMap{
+ "biggest_available_node": arvados.InstanceType{
+ RAM: 100 << 30, // 100 GiB
+ VCPUs: 4,
+ IncludedScratch: 100 << 30,
+ Scratch: 100 << 30,
+ }}
s.disp = newHandler(context.Background(), cluster, arvadostest.Dispatch1Token, prometheus.NewRegistry()).(*dispatcher)
s.disp.lsfcli.stubCommand = func(string, ...string) *exec.Cmd {
return exec.Command("bash", "-c", "echo >&2 unimplemented stub; false")
})
c.Assert(err, check.IsNil)
+ err = arvados.NewClientFromEnv().RequestAndDecode(&s.crPending, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
+ "container_request": map[string]interface{}{
+ "runtime_constraints": arvados.RuntimeConstraints{
+ RAM: 100000000,
+ VCPUs: 2,
+ },
+ "container_image": arvadostest.DockerImage112PDH,
+ "command": []string{"sleep", "1"},
+ "mounts": map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
+ "output_path": "/mnt/out",
+ "state": arvados.ContainerRequestStateCommitted,
+ "priority": 1,
+ "container_count_max": 1,
+ },
+ })
+ c.Assert(err, check.IsNil)
+
err = arvados.NewClientFromEnv().RequestAndDecode(&s.crCUDARequest, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
"container_request": map[string]interface{}{
"runtime_constraints": arvados.RuntimeConstraints{
fakejobq[nextjobid] = args[1]
nextjobid++
mtx.Unlock()
- case s.crTooBig.ContainerUUID:
+ case s.crPending.ContainerUUID:
c.Check(args, check.DeepEquals, []string{
- "-J", s.crTooBig.ContainerUUID,
- "-n", "1",
- "-D", "954187MB",
- "-R", "rusage[mem=954187MB:tmp=256MB] span[hosts=1]",
- "-R", "select[mem>=954187MB]",
+ "-J", s.crPending.ContainerUUID,
+ "-n", "2",
+ "-D", "608MB",
+ "-R", "rusage[mem=608MB:tmp=256MB] span[hosts=1]",
+ "-R", "select[mem>=608MB]",
"-R", "select[tmp>=256MB]",
- "-R", "select[ncpus>=1]"})
+ "-R", "select[ncpus>=2]"})
mtx.Lock()
fakejobq[nextjobid] = args[1]
nextjobid++
var records []map[string]interface{}
for jobid, uuid := range fakejobq {
stat, reason := "RUN", ""
- if uuid == s.crTooBig.ContainerUUID {
+ if uuid == s.crPending.ContainerUUID {
// The real bjobs output includes a trailing ';' here:
stat, reason = "PEND", "There are no suitable hosts for the job;"
}
c.Error("timed out")
break
}
+ // "crTooBig" should never be submitted to lsf because
+ // it is bigger than any configured instance type
+ if ent, ok := s.disp.lsfqueue.Lookup(s.crTooBig.ContainerUUID); ok {
+ c.Errorf("Lookup(crTooBig) == true, ent = %#v", ent)
+ break
+ }
// "queuedcontainer" should be running
if _, ok := s.disp.lsfqueue.Lookup(arvadostest.QueuedContainerUUID); !ok {
c.Log("Lookup(queuedcontainer) == false")
continue
}
+ // "crPending" should be pending
+ if ent, ok := s.disp.lsfqueue.Lookup(s.crPending.ContainerUUID); !ok {
+ c.Logf("Lookup(crPending) == false", ent)
+ continue
+ }
// "lockedcontainer" should be cancelled because it
// has priority 0 (no matching container requests)
if ent, ok := s.disp.lsfqueue.Lookup(arvadostest.LockedContainerUUID); ok {
c.Logf("Lookup(lockedcontainer) == true, ent = %#v", ent)
continue
}
- // "crTooBig" should be cancelled because lsf stub
- // reports there is no suitable instance type
- if ent, ok := s.disp.lsfqueue.Lookup(s.crTooBig.ContainerUUID); ok {
- c.Logf("Lookup(crTooBig) == true, ent = %#v", ent)
- continue
- }
var ctr arvados.Container
if err := s.disp.arvDispatcher.Arv.Get("containers", arvadostest.LockedContainerUUID, nil, &ctr); err != nil {
c.Logf("error getting container state for %s: %s", arvadostest.LockedContainerUUID, err)
c.Logf("container %s is not in the LSF queue but its arvados record has not been updated to state==Cancelled (state is %q)", s.crTooBig.ContainerUUID, ctr.State)
continue
} else {
- c.Check(ctr.RuntimeStatus["error"], check.Equals, "There are no suitable hosts for the job;")
+ c.Check(ctr.RuntimeStatus["error"], check.Equals, "constraints not satisfiable by any configured instance type")
}
c.Log("reached desired state")
break
git_commit = subprocess.run(["git", "log", "--format=%H", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
git_date = subprocess.run(["git", "log", "--format=%cD", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
git_committer = subprocess.run(["git", "log", "--format=%cn <%ce>", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
- git_branch = subprocess.run(["git", "branch", "--show-current"], cwd=cwd, capture_output=True, text=True).stdout
+ git_branch = subprocess.run(["git", "rev-parse", "--abbrev-ref", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
git_origin = subprocess.run(["git", "remote", "get-url", "origin"], cwd=cwd, capture_output=True, text=True).stdout
git_status = subprocess.run(["git", "status", "--untracked-files=no", "--porcelain"], cwd=cwd, capture_output=True, text=True).stdout
- git_describe = subprocess.run(["git", "describe", "--always"], cwd=cwd, capture_output=True, text=True).stdout
+ git_describe = subprocess.run(["git", "describe", "--always", "--tags"], cwd=cwd, capture_output=True, text=True).stdout
git_toplevel = subprocess.run(["git", "rev-parse", "--show-toplevel"], cwd=cwd, capture_output=True, text=True).stdout
git_path = filepath[len(git_toplevel):]
import (
"bytes"
"context"
+ "crypto/rand"
"crypto/tls"
"encoding/json"
"errors"
"io/fs"
"io/ioutil"
"log"
+ "math/big"
"net"
"net/http"
"net/url"
}
return path, nil
}
+
+var maxUUIDInt = (&big.Int{}).Exp(big.NewInt(36), big.NewInt(15), nil)
+
+func RandomUUID(clusterID, infix string) string {
+ n, err := rand.Int(rand.Reader, maxUUIDInt)
+ if err != nil {
+ panic(err)
+ }
+ return clusterID + "-" + infix + "-" + n.Text(36)
+}
}
type WebDAVCacheConfig struct {
- TTL Duration
- UUIDTTL Duration
- MaxBlockEntries int
- MaxCollectionEntries int
- MaxCollectionBytes int64
- MaxUUIDEntries int
- MaxSessions int
+ TTL Duration
+ MaxBlockEntries int
+ MaxCollectionBytes int64
+ MaxSessions int
}
type UploadDownloadPermission struct {
PreferDomainForUsername string
UserSetupMailText string
RoleGroupsVisibleToAll bool
+ ActivityLoggingPeriod Duration
}
StorageClasses map[string]StorageClassConfig
Volumes map[string]Volume
}
locked := map[sync.Locker]bool{}
for i := len(needLock) - 1; i >= 0; i-- {
- if n := needLock[i]; !locked[n] {
+ n := needLock[i]
+ if fs, ok := n.(interface{ rootnode() inode }); ok {
+ // Lock the fs's root dir directly, not
+ // through the fs. Otherwise our "locked" map
+ // would not reliably prevent double-locking
+ // the fs's root dir.
+ n = fs.rootnode()
+ }
+ if !locked[n] {
n.Lock()
defer n.Unlock()
locked[n] = true
return ErrInvalidOperation
}
-// Check for and incorporate upstream changes -- unless that has
-// already been done recently, in which case this func is a no-op.
-func (fs *collectionFileSystem) checkChangesOnServer() error {
+// Check for and incorporate upstream changes. If force==false, this
+// is a no-op except once every ttl/100 or so.
+//
+// Return value is true if new content was loaded from upstream and
+// any unsaved local changes have been discarded.
+func (fs *collectionFileSystem) checkChangesOnServer(force bool) (bool, error) {
if fs.uuid == "" && fs.savedPDH.Load() == "" {
- return nil
+ return false, nil
}
- // First try UUID if any, then last known PDH. Stop if all
- // signatures are new enough.
- checkingAll := false
- for _, id := range []string{fs.uuid, fs.savedPDH.Load().(string)} {
- if id == "" {
- continue
- }
-
- fs.lockCheckChanges.Lock()
- if !checkingAll && fs.holdCheckChanges.After(time.Now()) {
- fs.lockCheckChanges.Unlock()
- return nil
- }
- remain, ttl := fs.signatureTimeLeft()
- if remain > 0.01 && !checkingAll {
- fs.holdCheckChanges = time.Now().Add(ttl / 100)
- }
+ fs.lockCheckChanges.Lock()
+ if !force && fs.holdCheckChanges.After(time.Now()) {
fs.lockCheckChanges.Unlock()
+ return false, nil
+ }
+ remain, ttl := fs.signatureTimeLeft()
+ if remain > 0.01 {
+ fs.holdCheckChanges = time.Now().Add(ttl / 100)
+ }
+ fs.lockCheckChanges.Unlock()
- if remain >= 0.5 {
- break
+ if !force && remain >= 0.5 {
+ // plenty of time left on current signatures
+ return false, nil
+ }
+
+ getparams := map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}}
+ if fs.uuid != "" {
+ var coll Collection
+ err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+fs.uuid, nil, getparams)
+ if err != nil {
+ return false, err
}
- checkingAll = true
+ if coll.PortableDataHash != fs.savedPDH.Load().(string) {
+ // collection has changed upstream since we
+ // last loaded or saved. Refresh local data,
+ // losing any unsaved local changes.
+ newfs, err := coll.FileSystem(fs.fileSystem.fsBackend, fs.fileSystem.fsBackend)
+ if err != nil {
+ return false, err
+ }
+ snap, err := Snapshot(newfs, "/")
+ if err != nil {
+ return false, err
+ }
+ err = Splice(fs, "/", snap)
+ if err != nil {
+ return false, err
+ }
+ fs.savedPDH.Store(coll.PortableDataHash)
+ return true, nil
+ }
+ fs.updateSignatures(coll.ManifestText)
+ return false, nil
+ }
+ if pdh := fs.savedPDH.Load().(string); pdh != "" {
var coll Collection
- err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}})
+ err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+pdh, nil, getparams)
if err != nil {
- continue
+ return false, err
}
fs.updateSignatures(coll.ManifestText)
}
- return nil
+ return false, nil
}
// Refresh signature on a single locator, if necessary. Assume caller
if err != nil || exp.Sub(time.Now()) > time.Minute {
// Synchronous update is not needed. Start an
// asynchronous update if needed.
- go fs.checkChangesOnServer()
+ go fs.checkChangesOnServer(false)
return locator
}
var manifests string
}
func (fs *collectionFileSystem) Sync() error {
- err := fs.checkChangesOnServer()
+ refreshed, err := fs.checkChangesOnServer(true)
if err != nil {
return err
}
- if fs.uuid == "" {
+ if refreshed || fs.uuid == "" {
return nil
}
txt, err := fs.MarshalManifest(".")
"select": selectFields,
})
if err != nil {
- return fmt.Errorf("sync failed: update %s: %s", fs.uuid, err)
+ return fmt.Errorf("sync failed: update %s: %w", fs.uuid, err)
}
fs.updateSignatures(coll.ManifestText)
fs.savedPDH.Store(coll.PortableDataHash)
defer fn.RUnlock()
size = 64
for _, seg := range fn.segments {
- size += 64
- if seg, ok := seg.(*memSegment); ok {
- size += int64(seg.Len())
- }
+ size += seg.memorySize()
}
return
}
// Return a new segment with a subsection of the data from this
// one. length<0 means length=Len()-off.
Slice(off int, length int) segment
+ memorySize() int64
}
type memSegment struct {
return
}
+func (me *memSegment) memorySize() int64 {
+ return 64 + int64(len(me.buf))
+}
+
type storedSegment struct {
kc fsBackend
locator string
return se.kc.ReadAt(se.locator, p, int(off)+se.offset)
}
+func (se storedSegment) memorySize() int64 {
+ return 64 + int64(len(se.locator))
+}
+
func canonicalName(name string) string {
name = path.Clean("/" + name)
if name == "/" || name == "./" {
}
nDirs := int64(8)
+ nFiles := int64(67)
megabyte := make([]byte, 1<<20)
for i := int64(0); i < nDirs; i++ {
dir := fmt.Sprintf("dir%d", i)
fs.Mkdir(dir, 0755)
- for j := 0; j < 67; j++ {
+ for j := int64(0); j < nFiles; j++ {
f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
c.Assert(err, check.IsNil)
defer f.Close()
c.Assert(err, check.IsNil)
}
}
- inodebytes := int64((nDirs*(67*2+1) + 1) * 64)
- c.Check(fs.MemorySize(), check.Equals, int64(nDirs*67<<20)+inodebytes)
+ inodebytes := int64((nDirs*(nFiles+1) + 1) * 64)
+ c.Check(fs.MemorySize(), check.Equals, nDirs*nFiles*(1<<20+64)+inodebytes)
c.Check(flushed, check.Equals, int64(0))
waitForFlush := func(expectUnflushed, expectFlushed int64) {
}
// Nothing flushed yet
- waitForFlush((nDirs*67)<<20+inodebytes, 0)
+ waitForFlush(nDirs*nFiles*(1<<20+64)+inodebytes, 0)
// Flushing a non-empty dir "/" is non-recursive and there are
// no top-level files, so this has no effect
fs.Flush("/", false)
- waitForFlush((nDirs*67)<<20+inodebytes, 0)
+ waitForFlush(nDirs*nFiles*(1<<20+64)+inodebytes, 0)
// Flush the full block in dir0
fs.Flush("dir0", false)
- waitForFlush((nDirs*67-64)<<20+inodebytes, 64<<20)
+ bigloclen := int64(32 + 9 + 51 + 64) // md5 + "+" + "67xxxxxx" + "+Axxxxxx..." + 64 (see (storedSegment)memorySize)
+ waitForFlush((nDirs*nFiles-64)*(1<<20+64)+inodebytes+bigloclen*64, 64<<20)
err = fs.Flush("dir-does-not-exist", false)
c.Check(err, check.NotNil)
// Flush full blocks in all dirs
fs.Flush("", false)
- waitForFlush(nDirs*3<<20+inodebytes, nDirs*64<<20)
+ waitForFlush(nDirs*3*(1<<20+64)+inodebytes+bigloclen*64*nDirs, nDirs*64<<20)
// Flush non-full blocks, too
fs.Flush("", true)
- waitForFlush(inodebytes, nDirs*67<<20)
+ smallloclen := int64(32 + 8 + 51 + 64) // md5 + "+" + "3xxxxxx" + "+Axxxxxx..." + 64 (see (storedSegment)memorySize)
+ waitForFlush(inodebytes+bigloclen*64*nDirs+smallloclen*3*nDirs, nDirs*67<<20)
}
// Even when writing lots of files/dirs from different goroutines, as
package arvados
import (
- "log"
"os"
"sync"
- "time"
)
-func deferredCollectionFS(fs FileSystem, parent inode, coll Collection) inode {
- modTime := coll.ModifiedAt
- if modTime.IsZero() {
- modTime = time.Now()
- }
- placeholder := &treenode{
- fs: fs,
- parent: parent,
- inodes: nil,
- fileinfo: fileinfo{
- name: coll.Name,
- modTime: modTime,
- mode: 0755 | os.ModeDir,
- sys: func() interface{} { return &coll },
- },
- }
- return &deferrednode{wrapped: placeholder, create: func() inode {
- err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+coll.UUID, nil, nil)
- if err != nil {
- log.Printf("BUG: unhandled error: %s", err)
- return placeholder
- }
- newfs, err := coll.FileSystem(fs, fs)
- if err != nil {
- log.Printf("BUG: unhandled error: %s", err)
- return placeholder
- }
- cfs := newfs.(*collectionFileSystem)
- cfs.SetParent(parent, coll.Name)
- return cfs
- }}
-}
-
// A deferrednode wraps an inode that's expensive to build. Initially,
// it responds to basic directory functions by proxying to the given
// placeholder. If a caller uses a read/write/lock operation,
import (
"log"
+ "os"
"strings"
+ "time"
)
func (fs *customFileSystem) defaultUUID(uuid string) (string, error) {
if strings.Contains(coll.UUID, "-j7d0g-") {
// Group item was loaded into a Collection var -- but
// we only need the Name and UUID anyway, so it's OK.
- return fs.newProjectNode(parent, coll.Name, coll.UUID, nil), nil
+ return &hardlink{
+ inode: fs.projectSingleton(coll.UUID, &Group{
+ UUID: coll.UUID,
+ Name: coll.Name,
+ ModifiedAt: coll.ModifiedAt,
+ Properties: coll.Properties,
+ }),
+ parent: parent,
+ name: coll.Name,
+ }, nil
} else if strings.Contains(coll.UUID, "-4zz18-") {
- return deferredCollectionFS(fs, parent, coll), nil
+ return fs.newDeferredCollectionDir(parent, name, coll.UUID, coll.ModifiedAt, coll.Properties), nil
} else {
log.Printf("group contents: unrecognized UUID in response: %q", coll.UUID)
return nil, ErrInvalidArgument
}
for {
- // The groups content endpoint returns Collection and Group (project)
- // objects. This function only accesses the UUID and Name field. Both
- // collections and groups have those fields, so it is easier to just treat
- // the ObjectList that comes back as a CollectionList.
+ // The groups content endpoint returns
+ // Collection and Group (project)
+ // objects. This function only accesses the
+ // UUID, Name, and ModifiedAt fields. Both
+ // collections and groups have those fields,
+ // so it is easier to just treat the
+ // ObjectList that comes back as a
+ // CollectionList.
var resp CollectionList
err = fs.RequestAndDecode(&resp, "GET", "arvados/v1/groups/"+uuid+"/contents", nil, params)
if err != nil {
continue
}
if strings.Contains(i.UUID, "-j7d0g-") {
- inodes = append(inodes, fs.newProjectNode(parent, i.Name, i.UUID, &Group{
+ inodes = append(inodes, fs.newProjectDir(parent, i.Name, i.UUID, &Group{
UUID: i.UUID,
Name: i.Name,
ModifiedAt: i.ModifiedAt,
Properties: i.Properties,
}))
} else if strings.Contains(i.UUID, "-4zz18-") {
- inodes = append(inodes, deferredCollectionFS(fs, parent, i))
+ inodes = append(inodes, fs.newDeferredCollectionDir(parent, i.Name, i.UUID, i.ModifiedAt, i.Properties))
} else {
log.Printf("group contents: unrecognized UUID in response: %q", i.UUID)
return nil, ErrInvalidArgument
}
return inodes, nil
}
+
+func (fs *customFileSystem) newProjectDir(parent inode, name, uuid string, proj *Group) inode {
+ return &hardlink{inode: fs.projectSingleton(uuid, proj), parent: parent, name: name}
+}
+
+func (fs *customFileSystem) newDeferredCollectionDir(parent inode, name, uuid string, modTime time.Time, props map[string]interface{}) inode {
+ if modTime.IsZero() {
+ modTime = time.Now()
+ }
+ placeholder := &treenode{
+ fs: fs,
+ parent: parent,
+ inodes: nil,
+ fileinfo: fileinfo{
+ name: name,
+ modTime: modTime,
+ mode: 0755 | os.ModeDir,
+ sys: func() interface{} { return &Collection{UUID: uuid, Name: name, ModifiedAt: modTime, Properties: props} },
+ },
+ }
+ return &deferrednode{wrapped: placeholder, create: func() inode {
+ node, err := fs.collectionSingleton(uuid)
+ if err != nil {
+ log.Printf("BUG: unhandled error: %s", err)
+ return placeholder
+ }
+ return &hardlink{inode: node, parent: parent, name: name}
+ }}
+}
"errors"
"io"
"os"
- "path/filepath"
"strings"
check "gopkg.in/check.v1"
func (s *SiteFSSuite) TestCurrentUserHome(c *check.C) {
s.fs.MountProject("home", "")
- s.testHomeProject(c, "/home")
+ s.testHomeProject(c, "/home", "home")
}
func (s *SiteFSSuite) TestUsersDir(c *check.C) {
- s.testHomeProject(c, "/users/active")
+ // /users/active is a hardlink to a dir whose name is the UUID
+ // of the active user
+ s.testHomeProject(c, "/users/active", fixtureActiveUserUUID)
}
-func (s *SiteFSSuite) testHomeProject(c *check.C, path string) {
+func (s *SiteFSSuite) testHomeProject(c *check.C, path, expectRealName string) {
f, err := s.fs.Open(path)
c.Assert(err, check.IsNil)
fis, err := f.Readdir(-1)
fi, err := f.Stat()
c.Assert(err, check.IsNil)
c.Check(fi.IsDir(), check.Equals, true)
- _, basename := filepath.Split(path)
- c.Check(fi.Name(), check.Equals, basename)
+ c.Check(fi.Name(), check.Equals, expectRealName)
f, err = s.fs.Open(path + "/A Project/A Subproject")
c.Assert(err, check.IsNil)
err = project.Sync()
c.Check(err, check.IsNil)
- _, err = s.fs.Open("/home/A Project/oob/test.txt")
- c.Check(err, check.IsNil)
-
- // Sync again to mark the project dir as stale, so the
- // collection gets reloaded from the controller on next
- // lookup.
- err = project.Sync()
- c.Check(err, check.IsNil)
+ f, err = s.fs.Open("/home/A Project/oob/test.txt")
+ if c.Check(err, check.IsNil) {
+ f.Close()
+ }
// Ensure collection was flushed by Sync
var latest Collection
})
c.Assert(err, check.IsNil)
+ // Sync again to reload collection.
+ err = project.Sync()
+ c.Check(err, check.IsNil)
+
+ // Check test.txt deletion is reflected in fs.
_, err = s.fs.Open("/home/A Project/oob/test.txt")
c.Check(err, check.NotNil)
- _, err = s.fs.Open("/home/A Project/oob")
- c.Check(err, check.IsNil)
+ f, err = s.fs.Open("/home/A Project/oob")
+ if c.Check(err, check.IsNil) {
+ f.Close()
+ }
err = s.client.RequestAndDecode(nil, "DELETE", "arvados/v1/collections/"+oob.UUID, nil, nil)
c.Assert(err, check.IsNil)
package arvados
import (
+ "net/http"
"os"
"strings"
"sync"
staleLock sync.Mutex
forwardSlashNameSubstitution string
+
+ byID map[string]inode
+ byIDLock sync.Mutex
+ byIDRoot *treenode
}
func (c *Client) CustomFileSystem(kc keepClient) CustomFileSystem {
},
inodes: make(map[string]inode),
}
+ fs.byID = map[string]inode{}
+ fs.byIDRoot = &treenode{
+ fs: fs,
+ parent: root,
+ inodes: make(map[string]inode),
+ fileinfo: fileinfo{
+ name: "_internal_by_id",
+ modTime: time.Now(),
+ mode: 0755 | os.ModeDir,
+ },
+ }
return fs
}
mode: 0755 | os.ModeDir,
},
},
- create: fs.mountByID,
+ create: fs.newCollectionOrProjectHardlink,
}, nil
})
}
fs.root.treenode.Lock()
defer fs.root.treenode.Unlock()
fs.root.treenode.Child(mount, func(inode) (inode, error) {
- return fs.newProjectNode(fs.root, mount, uuid, nil), nil
+ return fs.newProjectDir(fs.root, mount, uuid, nil), nil
})
}
}
func (fs *customFileSystem) Sync() error {
- return fs.root.Sync()
+ return fs.byIDRoot.Sync()
}
// Stale returns true if information obtained at time t should be
return nil, ErrInvalidOperation
}
-func (fs *customFileSystem) mountByID(parent inode, id string) inode {
+func (fs *customFileSystem) newCollectionOrProjectHardlink(parent inode, id string) (inode, error) {
if strings.Contains(id, "-4zz18-") || pdhRegexp.MatchString(id) {
- return fs.mountCollection(parent, id)
- } else if strings.Contains(id, "-j7d0g-") {
- return fs.newProjectNode(fs.root, id, id, nil)
+ node, err := fs.collectionSingleton(id)
+ if os.IsNotExist(err) {
+ return nil, nil
+ } else if err != nil {
+ return nil, err
+ }
+ return &hardlink{inode: node, parent: parent, name: id}, nil
+ } else if strings.Contains(id, "-j7d0g-") || strings.Contains(id, "-tpzed-") {
+ fs.byIDLock.Lock()
+ node := fs.byID[id]
+ fs.byIDLock.Unlock()
+ if node == nil {
+ // Look up the project synchronously before
+ // calling projectSingleton (otherwise we
+ // wouldn't detect a nonexistent project until
+ // it's too late to return ErrNotExist).
+ proj, err := fs.getProject(id)
+ if os.IsNotExist(err) {
+ return nil, nil
+ } else if err != nil {
+ return nil, err
+ }
+ node = fs.projectSingleton(id, proj)
+ }
+ return &hardlink{inode: node, parent: parent, name: id}, nil
} else {
- return nil
+ return nil, nil
}
}
-func (fs *customFileSystem) mountCollection(parent inode, id string) inode {
- var coll Collection
- err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, nil)
- if err != nil {
- return nil
+func (fs *customFileSystem) projectSingleton(uuid string, proj *Group) inode {
+ fs.byIDLock.Lock()
+ defer fs.byIDLock.Unlock()
+ if n := fs.byID[uuid]; n != nil {
+ return n
}
- newfs, err := coll.FileSystem(fs, fs)
- if err != nil {
- return nil
+ name := uuid
+ if name == "" {
+ // special case uuid=="" implements the "home project"
+ // (owner_uuid == current user uuid)
+ name = "home"
}
- cfs := newfs.(*collectionFileSystem)
- cfs.SetParent(parent, id)
- return cfs
-}
-
-func (fs *customFileSystem) newProjectNode(root inode, name, uuid string, proj *Group) inode {
var projLoading sync.Mutex
- return &lookupnode{
+ n := &lookupnode{
stale: fs.Stale,
loadOne: func(parent inode, name string) (inode, error) { return fs.projectsLoadOne(parent, uuid, name) },
loadAll: func(parent inode) ([]inode, error) { return fs.projectsLoadAll(parent, uuid) },
treenode: treenode{
fs: fs,
- parent: root,
+ parent: fs.byIDRoot,
inodes: make(map[string]inode),
fileinfo: fileinfo{
name: name,
if proj != nil {
return proj
}
- var g Group
- err := fs.RequestAndDecode(&g, "GET", "arvados/v1/groups/"+uuid, nil, nil)
+ g, err := fs.getProject(uuid)
if err != nil {
return err
}
- proj = &g
+ proj = g
return proj
},
},
},
}
+ fs.byID[uuid] = n
+ return n
+}
+
+func (fs *customFileSystem) getProject(uuid string) (*Group, error) {
+ var g Group
+ err := fs.RequestAndDecode(&g, "GET", "arvados/v1/groups/"+uuid, nil, nil)
+ if statusErr, ok := err.(interface{ HTTPStatus() int }); ok && statusErr.HTTPStatus() == http.StatusNotFound {
+ return nil, os.ErrNotExist
+ } else if err != nil {
+ return nil, err
+ }
+ return &g, err
+}
+
+func (fs *customFileSystem) collectionSingleton(id string) (inode, error) {
+ // Return existing singleton, if we have it
+ fs.byIDLock.Lock()
+ existing := fs.byID[id]
+ fs.byIDLock.Unlock()
+ if existing != nil {
+ return existing, nil
+ }
+
+ coll, err := fs.getCollection(id)
+ if err != nil {
+ return nil, err
+ }
+ newfs, err := coll.FileSystem(fs, fs)
+ if err != nil {
+ return nil, err
+ }
+ cfs := newfs.(*collectionFileSystem)
+ cfs.SetParent(fs.byIDRoot, id)
+
+ // Check again in case another goroutine has added a node to
+ // fs.byID since we checked above.
+ fs.byIDLock.Lock()
+ defer fs.byIDLock.Unlock()
+ if existing = fs.byID[id]; existing != nil {
+ // Other goroutine won the race. Discard the node we
+ // just made, and return the race winner.
+ return existing, nil
+ }
+ // We won the race. Save the new node in fs.byID and
+ // fs.byIDRoot.
+ fs.byID[id] = cfs
+ fs.byIDRoot.Lock()
+ defer fs.byIDRoot.Unlock()
+ fs.byIDRoot.Child(id, func(inode) (inode, error) { return cfs, nil })
+ return cfs, nil
+}
+
+func (fs *customFileSystem) getCollection(id string) (*Collection, error) {
+ var coll Collection
+ err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, nil)
+ if statusErr, ok := err.(interface{ HTTPStatus() int }); ok && statusErr.HTTPStatus() == http.StatusNotFound {
+ return nil, os.ErrNotExist
+ } else if err != nil {
+ return nil, err
+ }
+ if len(id) != 27 {
+ // This means id is a PDH, and controller/railsapi
+ // returned one of (possibly) many collections with
+ // that PDH. Even if controller returns more fields
+ // besides PDH and manifest text (which are equal for
+ // all matching collections), we don't want to expose
+ // them (e.g., through Sys()).
+ coll = Collection{
+ PortableDataHash: coll.PortableDataHash,
+ ManifestText: coll.ManifestText,
+ }
+ }
+ return &coll, nil
}
// vdirnode wraps an inode by rejecting (with ErrInvalidOperation)
// treenode, or nil for ENOENT.
type vdirnode struct {
treenode
- create func(parent inode, name string) inode
+ create func(parent inode, name string) (inode, error)
}
func (vn *vdirnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
return vn.treenode.Child(name, func(existing inode) (inode, error) {
if existing == nil && vn.create != nil {
- existing = vn.create(vn, name)
- if existing != nil {
- existing.SetParent(vn, name)
+ newnode, err := vn.create(vn, name)
+ if err != nil {
+ return nil, err
+ }
+ if newnode != nil {
+ newnode.SetParent(vn, name)
+ existing = newnode
vn.treenode.fileinfo.modTime = time.Now()
}
}
}
})
}
+
+// A hardlink can be used to mount an existing node at an additional
+// point in the same filesystem.
+type hardlink struct {
+ inode
+ parent inode
+ name string
+}
+
+// If the wrapped inode is a filesystem, rootnode returns the wrapped
+// fs's rootnode, otherwise inode itself. This allows
+// (*fileSystem)Rename() to lock the root node of a hardlink-wrapped
+// filesystem.
+func (hl *hardlink) rootnode() inode {
+ if node, ok := hl.inode.(interface{ rootnode() inode }); ok {
+ return node.rootnode()
+ } else {
+ return hl.inode
+ }
+}
+
+func (hl *hardlink) Sync() error {
+ if node, ok := hl.inode.(syncer); ok {
+ return node.Sync()
+ } else {
+ return ErrInvalidOperation
+ }
+}
+
+func (hl *hardlink) SetParent(parent inode, name string) {
+ hl.Lock()
+ defer hl.Unlock()
+ hl.parent = parent
+ hl.name = name
+}
+
+func (hl *hardlink) Parent() inode {
+ hl.RLock()
+ defer hl.RUnlock()
+ return hl.parent
+}
+
+func (hl *hardlink) FileInfo() os.FileInfo {
+ fi := hl.inode.FileInfo()
+ if fi, ok := fi.(fileinfo); ok {
+ fi.name = hl.name
+ return fi
+ }
+ return fi
+}
// Importing arvadostest would be an import cycle, so these
// fixtures are duplicated here [until fs moves to a separate
// package].
+ fixtureActiveUserUUID = "zzzzz-tpzed-xurymjxw79nv3jz"
fixtureActiveToken = "3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
fixtureAProjectUUID = "zzzzz-j7d0g-v955i6s2oi1cbso"
fixtureThisFilterGroupUUID = "zzzzz-j7d0g-thisfiltergroup"
c.Assert(err, check.ErrorMatches, `.*stub does not write storage class "archive"`)
}
+func (s *SiteFSSuite) TestSameCollectionDifferentPaths(c *check.C) {
+ s.fs.MountProject("home", "")
+ var coll Collection
+ err := s.client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
+ "collection": map[string]interface{}{
+ "owner_uuid": fixtureAProjectUUID,
+ "name": fmt.Sprintf("test collection %d", time.Now().UnixNano()),
+ },
+ })
+ c.Assert(err, check.IsNil)
+
+ viaProjID := "by_id/" + fixtureAProjectUUID + "/" + coll.Name
+ viaProjName := "home/A Project/" + coll.Name
+ viaCollID := "by_id/" + coll.UUID
+ for n, dirs := range [][]string{
+ {viaCollID, viaProjID, viaProjName},
+ {viaCollID, viaProjName, viaProjID},
+ {viaProjID, viaProjName, viaCollID},
+ {viaProjID, viaCollID, viaProjName},
+ {viaProjName, viaCollID, viaProjID},
+ {viaProjName, viaProjID, viaCollID},
+ } {
+ filename := fmt.Sprintf("file %d", n)
+ f := make([]File, 3)
+ for i, dir := range dirs {
+ path := dir + "/" + filename
+ mode := os.O_RDWR
+ if i == 0 {
+ mode |= os.O_CREATE
+ c.Logf("create %s", path)
+ } else {
+ c.Logf("open %s", path)
+ }
+ f[i], err = s.fs.OpenFile(path, mode, 0777)
+ c.Assert(err, check.IsNil, check.Commentf("n=%d i=%d path=%s", n, i, path))
+ defer f[i].Close()
+ }
+ _, err = io.WriteString(f[0], filename)
+ c.Assert(err, check.IsNil)
+ _, err = f[1].Seek(0, io.SeekEnd)
+ c.Assert(err, check.IsNil)
+ _, err = io.WriteString(f[1], filename)
+ c.Assert(err, check.IsNil)
+ buf, err := io.ReadAll(f[2])
+ c.Assert(err, check.IsNil)
+ c.Check(string(buf), check.Equals, filename+filename)
+ }
+}
+
func (s *SiteFSSuite) TestByUUIDAndPDH(c *check.C) {
f, err := s.fs.Open("/by_id")
c.Assert(err, check.IsNil)
return nil, os.ErrNotExist
}
user := resp.Items[0]
- return fs.newProjectNode(parent, user.Username, user.UUID, nil), nil
+ return fs.newProjectDir(parent, user.Username, user.UUID, nil), nil
}
func (fs *customFileSystem) usersLoadAll(parent inode) ([]inode, error) {
if user.Username == "" {
continue
}
- inodes = append(inodes, fs.newProjectNode(parent, user.Username, user.UUID, nil))
+ inodes = append(inodes, fs.newProjectDir(parent, user.Username, user.UUID, nil))
}
params.Filters = []Filter{{"uuid", ">", resp.Items[len(resp.Items)-1].UUID}}
}
package arvadostest
import (
- "context"
-
- "git.arvados.org/arvados.git/lib/ctrlctx"
"git.arvados.org/arvados.git/sdk/go/arvados"
"github.com/jmoiron/sqlx"
+
// sqlx needs lib/pq to talk to PostgreSQL
_ "github.com/lib/pq"
"gopkg.in/check.v1"
c.Assert(err, check.IsNil)
return db
}
-
-// TransactionContext returns a context suitable for running a test
-// case in a new transaction, and a rollback func which the caller
-// should call after the test.
-func TransactionContext(c *check.C, db *sqlx.DB) (ctx context.Context, rollback func()) {
- tx, err := db.Beginx()
- c.Assert(err, check.IsNil)
- return ctrlctx.NewWithTransaction(context.Background(), tx), func() {
- c.Check(tx.Rollback(), check.IsNil)
- }
-}
HelloWorldPdh = "55713e6a34081eb03609e7ad5fcad129+62"
MultilevelCollection1 = "zzzzz-4zz18-pyw8yp9g3pr7irn"
+ MultilevelCollection1PDH = "f9ddda46bb293b6847da984e3aa735db+290"
StorageClassesDesiredDefaultConfirmedDefault = "zzzzz-4zz18-3t236wr12769tga"
StorageClassesDesiredArchiveConfirmedDefault = "zzzzz-4zz18-3t236wr12769qqa"
EmptyCollectionUUID = "zzzzz-4zz18-gs9ooj1h9sd5mde"
Repository2UUID = "zzzzz-s0uqq-382brsig8rp3667"
Repository2Name = "active/foo2"
- FooCollectionSharingTokenUUID = "zzzzz-gj3su-gf02tdm4g1z3e3u"
- FooCollectionSharingToken = "iknqgmunrhgsyfok8uzjlwun9iscwm3xacmzmg65fa1j1lpdss"
+ FooFileCollectionUUID = "zzzzz-4zz18-znfnqtbbv4spc3w"
+ FooFileCollectionSharingTokenUUID = "zzzzz-gj3su-gf02tdm4g1z3e3u"
+ FooFileCollectionSharingToken = "iknqgmunrhgsyfok8uzjlwun9iscwm3xacmzmg65fa1j1lpdss"
+ BarFileCollectionUUID = "zzzzz-4zz18-ehbhgtheo8909or"
+ BarFileCollectionPDH = "fa7aeb5140e2848d39b416daeef4ffc5+45"
WorkflowWithDefinitionYAMLUUID = "zzzzz-7fd4e-validworkfloyml"
for svcName, sh := range resp.Services {
switch svcName {
case arvados.ServiceNameDispatchCloud,
- arvados.ServiceNameDispatchLSF:
+ arvados.ServiceNameDispatchLSF,
+ arvados.ServiceNameDispatchSLURM:
// ok to not run any given dispatcher
case arvados.ServiceNameHealth,
arvados.ServiceNameWorkbench1,
package keepclient
import (
+ "fmt"
"io"
"sort"
"strconv"
data = make([]byte, size, bufsize)
_, err = io.ReadFull(rdr, data)
err2 := rdr.Close()
- if err == nil {
- err = err2
+ if err == nil && err2 != nil {
+ err = fmt.Errorf("close(): %w", err2)
+ }
+ if err != nil {
+ err = fmt.Errorf("Get %s: %w", locator, err)
}
}
c.mtx.Lock()
listprop: [elem1, elem3, 5]
collection_with_list_prop_even:
- uuid: zzzzz-4zz18-listpropertyeven
- current_version_uuid: zzzzz-4zz18-listpropertyeven
+ uuid: zzzzz-4zz18-listpropertyevn
+ current_version_uuid: zzzzz-4zz18-listpropertyevn
portable_data_hash: fa7aeb5140e2848d39b416daeef4ffc5+45
owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
created_at: 2015-02-13T17:22:54Z
listprop: [elem2, 4, elem6, ELEM8]
collection_with_listprop_elem1:
- uuid: zzzzz-4zz18-listpropelem1
- current_version_uuid: zzzzz-4zz18-listpropelem1
+ uuid: zzzzz-4zz18-listpropelemen1
+ current_version_uuid: zzzzz-4zz18-listpropelemen1
portable_data_hash: fa7aeb5140e2848d39b416daeef4ffc5+45
owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
created_at: 2015-02-13T17:22:54Z
package keepweb
import (
+ "net/http"
"sync"
"sync/atomic"
"time"
const metricsUpdateInterval = time.Second / 10
type cache struct {
- cluster *arvados.Cluster
- logger logrus.FieldLogger
- registry *prometheus.Registry
- metrics cacheMetrics
- pdhs *lru.TwoQueueCache
- collections *lru.TwoQueueCache
- sessions *lru.TwoQueueCache
- setupOnce sync.Once
+ cluster *arvados.Cluster
+ logger logrus.FieldLogger
+ registry *prometheus.Registry
+ metrics cacheMetrics
+ sessions *lru.TwoQueueCache
+ setupOnce sync.Once
- chPruneSessions chan struct{}
- chPruneCollections chan struct{}
+ chPruneSessions chan struct{}
}
type cacheMetrics struct {
- requests prometheus.Counter
- collectionBytes prometheus.Gauge
- collectionEntries prometheus.Gauge
- sessionEntries prometheus.Gauge
- collectionHits prometheus.Counter
- pdhHits prometheus.Counter
- sessionHits prometheus.Counter
- sessionMisses prometheus.Counter
- apiCalls prometheus.Counter
+ requests prometheus.Counter
+ collectionBytes prometheus.Gauge
+ sessionEntries prometheus.Gauge
+ sessionHits prometheus.Counter
+ sessionMisses prometheus.Counter
}
func (m *cacheMetrics) setup(reg *prometheus.Registry) {
- m.requests = prometheus.NewCounter(prometheus.CounterOpts{
- Namespace: "arvados",
- Subsystem: "keepweb_collectioncache",
- Name: "requests",
- Help: "Number of targetID-to-manifest lookups handled.",
- })
- reg.MustRegister(m.requests)
- m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{
- Namespace: "arvados",
- Subsystem: "keepweb_collectioncache",
- Name: "hits",
- Help: "Number of pdh-to-manifest cache hits.",
- })
- reg.MustRegister(m.collectionHits)
- m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{
- Namespace: "arvados",
- Subsystem: "keepweb_collectioncache",
- Name: "pdh_hits",
- Help: "Number of uuid-to-pdh cache hits.",
- })
- reg.MustRegister(m.pdhHits)
- m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
- Namespace: "arvados",
- Subsystem: "keepweb_collectioncache",
- Name: "api_calls",
- Help: "Number of outgoing API calls made by cache.",
- })
- reg.MustRegister(m.apiCalls)
m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "keepweb_sessions",
- Name: "cached_collection_bytes",
- Help: "Total size of all cached manifests and sessions.",
+ Name: "cached_session_bytes",
+ Help: "Total size of all cached sessions.",
})
reg.MustRegister(m.collectionBytes)
- m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "keepweb_collectioncache",
- Name: "cached_manifests",
- Help: "Number of manifests in cache.",
- })
- reg.MustRegister(m.collectionEntries)
m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "keepweb_sessions",
reg.MustRegister(m.sessionMisses)
}
-type cachedPDH struct {
- expire time.Time
- refresh time.Time
- pdh string
-}
-
-type cachedCollection struct {
- expire time.Time
- collection *arvados.Collection
-}
-
-type cachedPermission struct {
- expire time.Time
-}
-
type cachedSession struct {
expire time.Time
fs atomic.Value
func (c *cache) setup() {
var err error
- c.pdhs, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxUUIDEntries)
- if err != nil {
- panic(err)
- }
- c.collections, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxCollectionEntries)
- if err != nil {
- panic(err)
- }
c.sessions, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxSessions)
if err != nil {
panic(err)
c.updateGauges()
}
}()
- c.chPruneCollections = make(chan struct{}, 1)
- go func() {
- for range c.chPruneCollections {
- c.pruneCollections()
- }
- }()
c.chPruneSessions = make(chan struct{}, 1)
go func() {
for range c.chPruneSessions {
func (c *cache) updateGauges() {
c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
- c.metrics.collectionEntries.Set(float64(c.collections.Len()))
c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
}
"select": []string{"portable_data_hash"},
}
-// Update saves a modified version (fs) to an existing collection
-// (coll) and, if successful, updates the relevant cache entries so
-// subsequent calls to Get() reflect the modifications.
-func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
- c.setupOnce.Do(c.setup)
-
- m, err := fs.MarshalManifest(".")
- if err != nil || m == coll.ManifestText {
- return err
- }
- coll.ManifestText = m
- var updated arvados.Collection
- err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
- "collection": map[string]string{
- "manifest_text": coll.ManifestText,
- },
- })
- if err != nil {
- c.pdhs.Remove(coll.UUID)
- return err
- }
- c.collections.Add(client.AuthToken+"\000"+updated.PortableDataHash, &cachedCollection{
- expire: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL)),
- collection: &updated,
- })
- c.pdhs.Add(coll.UUID, &cachedPDH{
- expire: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL)),
- refresh: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.UUIDTTL)),
- pdh: updated.PortableDataHash,
- })
- return nil
-}
-
// ResetSession unloads any potentially stale state. Should be called
// after write operations, so subsequent reads don't return stale
// data.
// Get a long-lived CustomFileSystem suitable for doing a read operation
// with the given token.
-func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, error) {
+func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, *arvados.User, error) {
c.setupOnce.Do(c.setup)
now := time.Now()
ent, _ := c.sessions.Get(token)
var err error
sess.client, err = arvados.NewClientFromConfig(c.cluster)
if err != nil {
- return nil, nil, err
+ return nil, nil, nil, err
}
sess.client.AuthToken = token
sess.arvadosclient, err = arvadosclient.New(sess.client)
if err != nil {
- return nil, nil, err
+ return nil, nil, nil, err
}
sess.keepclient = keepclient.New(sess.arvadosclient)
c.sessions.Add(token, sess)
case c.chPruneSessions <- struct{}{}:
default:
}
+
fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
- if fs != nil && !expired {
- return fs, sess, nil
+ if fs == nil || expired {
+ fs = sess.client.SiteFileSystem(sess.keepclient)
+ fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
+ sess.fs.Store(fs)
}
- fs = sess.client.SiteFileSystem(sess.keepclient)
- fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
- sess.fs.Store(fs)
- return fs, sess, nil
+
+ user, _ := sess.user.Load().(*arvados.User)
+ if user == nil || expired {
+ user = new(arvados.User)
+ err := sess.client.RequestAndDecode(user, "GET", "/arvados/v1/users/current", nil, nil)
+ if statusErr, ok := err.(interface{ HTTPStatus() int }); ok && statusErr.HTTPStatus() == http.StatusForbidden {
+ // token is OK, but "get user id" api is out
+ // of scope -- return nil, signifying unknown
+ // user
+ } else if err != nil {
+ return nil, nil, nil, err
+ }
+ sess.user.Store(user)
+ }
+
+ return fs, sess, user, nil
}
// Remove all expired session cache entries, then remove more entries
}
// Remove tokens until reaching size limit, starting with the
// least frequently used entries (which Keys() returns last).
- for i := len(keys) - 1; i >= 0 && size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2; i-- {
+ for i := len(keys) - 1; i >= 0 && size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes; i-- {
if sizes[i] > 0 {
c.sessions.Remove(keys[i])
size -= sizes[i]
}
}
-func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
- c.setupOnce.Do(c.setup)
- c.metrics.requests.Inc()
-
- var pdhRefresh bool
- var pdh string
- if arvadosclient.PDHMatch(targetID) {
- pdh = targetID
- } else if ent, cached := c.pdhs.Get(targetID); cached {
- ent := ent.(*cachedPDH)
- if ent.expire.Before(time.Now()) {
- c.pdhs.Remove(targetID)
- } else {
- pdh = ent.pdh
- pdhRefresh = forceReload || time.Now().After(ent.refresh)
- c.metrics.pdhHits.Inc()
- }
- }
-
- if pdh == "" {
- // UUID->PDH mapping is not cached, might as well get
- // the whole collection record and be done (below).
- c.logger.Debugf("cache(%s): have no pdh", targetID)
- } else if cached := c.lookupCollection(arv.ApiToken + "\000" + pdh); cached == nil {
- // PDH->manifest is not cached, might as well get the
- // whole collection record (below).
- c.logger.Debugf("cache(%s): have pdh %s but manifest is not cached", targetID, pdh)
- } else if !pdhRefresh {
- // We looked up UUID->PDH very recently, and we still
- // have the manifest for that PDH.
- c.logger.Debugf("cache(%s): have pdh %s and refresh not needed", targetID, pdh)
- return cached, nil
- } else {
- // Get current PDH for this UUID (and confirm we still
- // have read permission). Most likely, the cached PDH
- // is still correct, in which case we can use our
- // cached manifest.
- c.metrics.apiCalls.Inc()
- var current arvados.Collection
- err := arv.Get("collections", targetID, selectPDH, ¤t)
- if err != nil {
- return nil, err
- }
- if current.PortableDataHash == pdh {
- // PDH has not changed, cached manifest is
- // correct.
- c.logger.Debugf("cache(%s): verified cached pdh %s is still correct", targetID, pdh)
- return cached, nil
- }
- if cached := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); cached != nil {
- // PDH changed, and we already have the
- // manifest for that new PDH.
- c.logger.Debugf("cache(%s): cached pdh %s was stale, new pdh is %s and manifest is already in cache", targetID, pdh, current.PortableDataHash)
- return cached, nil
- }
- }
-
- // Either UUID->PDH is not cached, or PDH->manifest is not
- // cached.
- var retrieved arvados.Collection
- c.metrics.apiCalls.Inc()
- err := arv.Get("collections", targetID, nil, &retrieved)
- if err != nil {
- return nil, err
- }
- c.logger.Debugf("cache(%s): retrieved manifest, caching with pdh %s", targetID, retrieved.PortableDataHash)
- exp := time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL))
- if targetID != retrieved.PortableDataHash {
- c.pdhs.Add(targetID, &cachedPDH{
- expire: exp,
- refresh: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.UUIDTTL)),
- pdh: retrieved.PortableDataHash,
- })
- }
- c.collections.Add(arv.ApiToken+"\000"+retrieved.PortableDataHash, &cachedCollection{
- expire: exp,
- collection: &retrieved,
- })
- if int64(len(retrieved.ManifestText)) > c.cluster.Collections.WebDAVCache.MaxCollectionBytes/int64(c.cluster.Collections.WebDAVCache.MaxCollectionEntries) {
- select {
- case c.chPruneCollections <- struct{}{}:
- default:
- }
- }
- return &retrieved, nil
-}
-
-// pruneCollections checks the total bytes occupied by manifest_text
-// in the collection cache and removes old entries as needed to bring
-// the total size down to CollectionBytes. It also deletes all expired
-// entries.
-//
-// pruneCollections does not aim to be perfectly correct when there is
-// concurrent cache activity.
-func (c *cache) pruneCollections() {
- var size int64
- now := time.Now()
- keys := c.collections.Keys()
- entsize := make([]int, len(keys))
- expired := make([]bool, len(keys))
- for i, k := range keys {
- v, ok := c.collections.Peek(k)
- if !ok {
- continue
- }
- ent := v.(*cachedCollection)
- n := len(ent.collection.ManifestText)
- size += int64(n)
- entsize[i] = n
- expired[i] = ent.expire.Before(now)
- }
- for i, k := range keys {
- if expired[i] {
- c.collections.Remove(k)
- size -= int64(entsize[i])
- }
- }
- for i, k := range keys {
- if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
- break
- }
- if expired[i] {
- // already removed this entry in the previous loop
- continue
- }
- c.collections.Remove(k)
- size -= int64(entsize[i])
- }
-}
-
// collectionBytes returns the approximate combined memory size of the
// collection cache and session filesystem cache.
func (c *cache) collectionBytes() uint64 {
var size uint64
- for _, k := range c.collections.Keys() {
- v, ok := c.collections.Peek(k)
- if !ok {
- continue
- }
- size += uint64(len(v.(*cachedCollection).collection.ManifestText))
- }
for _, token := range c.sessions.Keys() {
ent, ok := c.sessions.Peek(token)
if !ok {
}
return size
}
-
-func (c *cache) lookupCollection(key string) *arvados.Collection {
- e, cached := c.collections.Get(key)
- if !cached {
- return nil
- }
- ent := e.(*cachedCollection)
- if ent.expire.Before(time.Now()) {
- c.collections.Remove(key)
- return nil
- }
- c.metrics.collectionHits.Inc()
- return ent.collection
-}
-
-func (c *cache) GetTokenUser(token string) (*arvados.User, error) {
- // Get and cache user record associated with this
- // token. We need to know their UUID for logging, and
- // whether they are an admin or not for certain
- // permission checks.
-
- // Get/create session entry
- _, sess, err := c.GetSession(token)
- if err != nil {
- return nil, err
- }
-
- // See if the user is already set, and if so, return it
- user, _ := sess.user.Load().(*arvados.User)
- if user != nil {
- return user, nil
- }
-
- // Fetch the user record
- c.metrics.apiCalls.Inc()
- var current arvados.User
-
- err = sess.client.RequestAndDecode(¤t, "GET", "arvados/v1/users/current", nil, nil)
- if err != nil {
- return nil, err
- }
-
- // Stash the user record for next time
- sess.user.Store(¤t)
- return ¤t, nil
-}
import (
"bytes"
+ "net/http"
+ "net/http/httptest"
+ "regexp"
+ "strings"
+ "time"
"git.arvados.org/arvados.git/sdk/go/arvados"
- "git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
- "git.arvados.org/arvados.git/sdk/go/ctxlog"
- "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/expfmt"
"gopkg.in/check.v1"
)
-func (s *UnitSuite) checkCacheMetrics(c *check.C, reg *prometheus.Registry, regs ...string) {
+func (s *IntegrationSuite) checkCacheMetrics(c *check.C, regs ...string) {
+ s.handler.Cache.updateGauges()
+ reg := s.handler.Cache.registry
mfs, err := reg.Gather()
c.Check(err, check.IsNil)
buf := &bytes.Buffer{}
c.Check(enc.Encode(mf), check.IsNil)
}
mm := buf.String()
+ // Remove comments to make the "value vs. regexp" failure
+ // output easier to read.
+ mm = regexp.MustCompile(`(?m)^#.*\n`).ReplaceAllString(mm, "")
for _, reg := range regs {
- c.Check(mm, check.Matches, `(?ms).*collectioncache_`+reg+`\n.*`)
+ c.Check(mm, check.Matches, `(?ms).*keepweb_sessions_`+reg+`\n.*`)
}
}
-func (s *UnitSuite) TestCache(c *check.C) {
- arv, err := arvadosclient.MakeArvadosClient()
- c.Assert(err, check.Equals, nil)
-
- cache := &cache{
- cluster: s.cluster,
- logger: ctxlog.TestLogger(c),
- registry: prometheus.NewRegistry(),
- }
-
+func (s *IntegrationSuite) TestCache(c *check.C) {
// Hit the same collection 5 times using the same token. Only
// the first req should cause an API call; the next 4 should
// hit all caches.
- arv.ApiToken = arvadostest.AdminToken
- var coll *arvados.Collection
+ u := mustParseURL("http://" + arvadostest.FooCollection + ".keep-web.example/foo")
+ req := &http.Request{
+ Method: "GET",
+ Host: u.Host,
+ URL: u,
+ RequestURI: u.RequestURI(),
+ Header: http.Header{
+ "Authorization": {"Bearer " + arvadostest.ActiveToken},
+ },
+ }
for i := 0; i < 5; i++ {
- coll, err = cache.Get(arv, arvadostest.FooCollection, false)
- c.Check(err, check.Equals, nil)
- c.Assert(coll, check.NotNil)
- c.Check(coll.PortableDataHash, check.Equals, arvadostest.FooCollectionPDH)
- c.Check(coll.ManifestText[:2], check.Equals, ". ")
+ resp := httptest.NewRecorder()
+ s.handler.ServeHTTP(resp, req)
+ c.Check(resp.Code, check.Equals, http.StatusOK)
}
- s.checkCacheMetrics(c, cache.registry,
- "requests 5",
+ s.checkCacheMetrics(c,
"hits 4",
- "pdh_hits 4",
- "api_calls 1")
-
- // Hit the same collection 2 more times, this time requesting
- // it by PDH and using a different token. The first req should
- // miss the permission cache and fetch the new manifest; the
- // second should hit the Collection cache and skip the API
- // lookup.
- arv.ApiToken = arvadostest.ActiveToken
-
- coll2, err := cache.Get(arv, arvadostest.FooCollectionPDH, false)
- c.Check(err, check.Equals, nil)
- c.Assert(coll2, check.NotNil)
- c.Check(coll2.PortableDataHash, check.Equals, arvadostest.FooCollectionPDH)
- c.Check(coll2.ManifestText[:2], check.Equals, ". ")
- c.Check(coll2.ManifestText, check.Not(check.Equals), coll.ManifestText)
-
- s.checkCacheMetrics(c, cache.registry,
- "requests 6",
- "hits 4",
- "pdh_hits 4",
- "api_calls 2")
-
- coll2, err = cache.Get(arv, arvadostest.FooCollectionPDH, false)
- c.Check(err, check.Equals, nil)
- c.Assert(coll2, check.NotNil)
- c.Check(coll2.PortableDataHash, check.Equals, arvadostest.FooCollectionPDH)
- c.Check(coll2.ManifestText[:2], check.Equals, ". ")
-
- s.checkCacheMetrics(c, cache.registry,
- "requests 7",
- "hits 5",
- "pdh_hits 4",
- "api_calls 2")
-
- // Alternating between two collections N times should produce
- // only 2 more API calls.
- arv.ApiToken = arvadostest.AdminToken
- for i := 0; i < 20; i++ {
- var target string
- if i%2 == 0 {
- target = arvadostest.HelloWorldCollection
- } else {
- target = arvadostest.FooBarDirCollection
- }
- _, err := cache.Get(arv, target, false)
- c.Check(err, check.Equals, nil)
+ "misses 1",
+ "active 1")
+
+ // Hit a shared collection 3 times using PDH, using a
+ // different token.
+ u2 := mustParseURL("http://" + strings.Replace(arvadostest.BarFileCollectionPDH, "+", "-", 1) + ".keep-web.example/bar")
+ req2 := &http.Request{
+ Method: "GET",
+ Host: u2.Host,
+ URL: u2,
+ RequestURI: u2.RequestURI(),
+ Header: http.Header{
+ "Authorization": {"Bearer " + arvadostest.SpectatorToken},
+ },
}
- s.checkCacheMetrics(c, cache.registry,
- "requests 27",
- "hits 23",
- "pdh_hits 22",
- "api_calls 4")
-}
-
-func (s *UnitSuite) TestCacheForceReloadByPDH(c *check.C) {
- arv, err := arvadosclient.MakeArvadosClient()
- c.Assert(err, check.Equals, nil)
-
- cache := &cache{
- cluster: s.cluster,
- logger: ctxlog.TestLogger(c),
- registry: prometheus.NewRegistry(),
+ for i := 0; i < 3; i++ {
+ resp2 := httptest.NewRecorder()
+ s.handler.ServeHTTP(resp2, req2)
+ c.Check(resp2.Code, check.Equals, http.StatusOK)
}
-
- for _, forceReload := range []bool{false, true, false, true} {
- _, err := cache.Get(arv, arvadostest.FooCollectionPDH, forceReload)
- c.Check(err, check.Equals, nil)
+ s.checkCacheMetrics(c,
+ "hits 6",
+ "misses 2",
+ "active 2")
+
+ // Alternating between two collections/tokens N times should
+ // use the existing sessions.
+ for i := 0; i < 7; i++ {
+ resp := httptest.NewRecorder()
+ s.handler.ServeHTTP(resp, req)
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+
+ resp2 := httptest.NewRecorder()
+ s.handler.ServeHTTP(resp2, req2)
+ c.Check(resp2.Code, check.Equals, http.StatusOK)
}
-
- s.checkCacheMetrics(c, cache.registry,
- "requests 4",
- "hits 3",
- "pdh_hits 0",
- "api_calls 1")
+ s.checkCacheMetrics(c,
+ "hits 20",
+ "misses 2",
+ "active 2")
}
-func (s *UnitSuite) TestCacheForceReloadByUUID(c *check.C) {
- arv, err := arvadosclient.MakeArvadosClient()
- c.Assert(err, check.Equals, nil)
-
- cache := &cache{
- cluster: s.cluster,
- logger: ctxlog.TestLogger(c),
- registry: prometheus.NewRegistry(),
- }
-
- for _, forceReload := range []bool{false, true, false, true} {
- _, err := cache.Get(arv, arvadostest.FooCollection, forceReload)
- c.Check(err, check.Equals, nil)
- }
+func (s *IntegrationSuite) TestForceReloadPDH(c *check.C) {
+ filename := strings.Replace(time.Now().Format(time.RFC3339Nano), ":", ".", -1)
+ manifest := ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:" + filename + "\n"
+ pdh := arvados.PortableDataHash(manifest)
+ client := arvados.NewClientFromEnv()
+ client.AuthToken = arvadostest.ActiveToken
+
+ _, resp := s.do("GET", "http://"+strings.Replace(pdh, "+", "-", 1)+".keep-web.example/"+filename, arvadostest.ActiveToken, nil)
+ c.Check(resp.Code, check.Equals, http.StatusNotFound)
+
+ var coll arvados.Collection
+ err := client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
+ "collection": map[string]string{
+ "manifest_text": manifest,
+ },
+ })
+ c.Assert(err, check.IsNil)
+ defer client.RequestAndDecode(nil, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil)
+ c.Assert(coll.PortableDataHash, check.Equals, pdh)
+
+ _, resp = s.do("GET", "http://"+strings.Replace(pdh, "+", "-", 1)+".keep-web.example/"+filename, "", http.Header{
+ "Authorization": {"Bearer " + arvadostest.ActiveToken},
+ "Cache-Control": {"must-revalidate"},
+ })
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+
+ _, resp = s.do("GET", "http://"+strings.Replace(pdh, "+", "-", 1)+".keep-web.example/missingfile", "", http.Header{
+ "Authorization": {"Bearer " + arvadostest.ActiveToken},
+ "Cache-Control": {"must-revalidate"},
+ })
+ c.Check(resp.Code, check.Equals, http.StatusNotFound)
+}
- s.checkCacheMetrics(c, cache.registry,
- "requests 4",
- "hits 3",
- "pdh_hits 3",
- "api_calls 3")
+func (s *IntegrationSuite) TestForceReloadUUID(c *check.C) {
+ client := arvados.NewClientFromEnv()
+ client.AuthToken = arvadostest.ActiveToken
+ var coll arvados.Collection
+ err := client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
+ "collection": map[string]string{
+ "manifest_text": ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:oldfile\n",
+ },
+ })
+ c.Assert(err, check.IsNil)
+ defer client.RequestAndDecode(nil, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil)
+
+ _, resp := s.do("GET", "http://"+coll.UUID+".keep-web.example/newfile", arvadostest.ActiveToken, nil)
+ c.Check(resp.Code, check.Equals, http.StatusNotFound)
+ _, resp = s.do("GET", "http://"+coll.UUID+".keep-web.example/oldfile", arvadostest.ActiveToken, nil)
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+ _, resp = s.do("GET", "http://"+coll.UUID+".keep-web.example/newfile", arvadostest.ActiveToken, nil)
+ c.Check(resp.Code, check.Equals, http.StatusNotFound)
+ err = client.RequestAndDecode(&coll, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
+ "collection": map[string]string{
+ "manifest_text": ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:oldfile 0:0:newfile\n",
+ },
+ })
+ c.Assert(err, check.IsNil)
+ _, resp = s.do("GET", "http://"+coll.UUID+".keep-web.example/newfile", arvadostest.ActiveToken, nil)
+ c.Check(resp.Code, check.Equals, http.StatusNotFound)
+ _, resp = s.do("GET", "http://"+coll.UUID+".keep-web.example/newfile", "", http.Header{
+ "Authorization": {"Bearer " + arvadostest.ActiveToken},
+ "Cache-Control": {"must-revalidate"},
+ })
+ c.Check(resp.Code, check.Equals, http.StatusOK)
}
"net/http"
"net/url"
"os"
- "path/filepath"
"sort"
"strconv"
"strings"
"sync"
+ "git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/auth"
)
type handler struct {
- Cache cache
- Cluster *arvados.Cluster
- clientPool *arvadosclient.ClientPool
- setupOnce sync.Once
- webdavLS webdav.LockSystem
+ Cache cache
+ Cluster *arvados.Cluster
+ setupOnce sync.Once
+ webdavLS webdav.LockSystem
}
var urlPDHDecoder = strings.NewReplacer(" ", "+", "-", "+")
-var notFoundMessage = "404 Not found\r\n\r\nThe requested path was not found, or you do not have permission to access it.\r"
-var unauthorizedMessage = "401 Unauthorized\r\n\r\nA valid Arvados token must be provided to access this resource.\r"
+var notFoundMessage = "Not Found"
+var unauthorizedMessage = "401 Unauthorized\r\n\r\nA valid Arvados token must be provided to access this resource.\r\n"
// parseCollectionIDFromURL returns a UUID or PDH if s is a UUID or a
// PDH (even if it is a PDH with "+" replaced by " " or "-");
}
func (h *handler) setup() {
- // Errors will be handled at the client pool.
- arv, _ := arvados.NewClientFromConfig(h.Cluster)
- h.clientPool = arvadosclient.MakeClientPoolWith(arv)
-
keepclient.DefaultBlockCache.MaxBlocks = h.Cluster.Collections.WebDAVCache.MaxBlockEntries
// Even though we don't accept LOCK requests, every webdav
}
func (h *handler) serveStatus(w http.ResponseWriter, r *http.Request) {
- json.NewEncoder(w).Encode(struct{ Version string }{version})
+ json.NewEncoder(w).Encode(struct{ Version string }{cmd.Version.String()})
}
// updateOnSuccess wraps httpserver.ResponseWriter. If the handler
// sends an HTTP header indicating success, updateOnSuccess first
-// calls the provided update func. If the update func fails, a 500
-// response is sent, and the status code and body sent by the handler
-// are ignored (all response writes return the update error).
+// calls the provided update func. If the update func fails, an error
+// response is sent (using the error's HTTP status or 500 if none),
+// and the status code and body sent by the handler are ignored (all
+// response writes return the update error).
type updateOnSuccess struct {
httpserver.ResponseWriter
logger logrus.FieldLogger
if code >= 200 && code < 400 {
if uos.err = uos.update(); uos.err != nil {
code := http.StatusInternalServerError
- if err, ok := uos.err.(*arvados.TransactionError); ok {
- code = err.StatusCode
+ var he interface{ HTTPStatus() int }
+ if errors.As(uos.err, &he) {
+ code = he.HTTPStatus()
}
- uos.logger.WithError(uos.err).Errorf("update() returned error type %T, changing response to HTTP %d", uos.err, code)
+ uos.logger.WithError(uos.err).Errorf("update() returned %T error, changing response to HTTP %d", uos.err, code)
http.Error(uos.ResponseWriter, uos.err.Error(), code)
return
}
}
}
- if collectionID == "" && !useSiteFS {
- http.Error(w, notFoundMessage, http.StatusNotFound)
- return
- }
-
forceReload := false
if cc := r.Header.Get("Cache-Control"); strings.Contains(cc, "no-cache") || strings.Contains(cc, "must-revalidate") {
forceReload = true
return
}
- if useSiteFS {
- h.serveSiteFS(w, r, reqTokens, credentialsOK, attachment)
- return
- }
-
targetPath := pathParts[stripParts:]
if tokens == nil && len(targetPath) > 0 && strings.HasPrefix(targetPath[0], "t=") {
// http://ID.example/t=TOKEN/PATH...
stripParts++
}
+ fsprefix := ""
+ if useSiteFS {
+ if writeMethod[r.Method] {
+ http.Error(w, errReadOnly.Error(), http.StatusMethodNotAllowed)
+ return
+ }
+ if len(reqTokens) == 0 {
+ w.Header().Add("WWW-Authenticate", "Basic realm=\"collections\"")
+ http.Error(w, unauthorizedMessage, http.StatusUnauthorized)
+ return
+ }
+ tokens = reqTokens
+ } else if collectionID == "" {
+ http.Error(w, notFoundMessage, http.StatusNotFound)
+ return
+ } else {
+ fsprefix = "by_id/" + collectionID + "/"
+ }
+
if tokens == nil {
tokens = reqTokens
if h.Cluster.Users.AnonymousUserToken != "" {
stripParts++
}
- arv := h.clientPool.Get()
- if arv == nil {
- http.Error(w, "client pool error: "+h.clientPool.Err().Error(), http.StatusInternalServerError)
- return
+ dirOpenMode := os.O_RDONLY
+ if writeMethod[r.Method] {
+ dirOpenMode = os.O_RDWR
}
- defer h.clientPool.Put(arv)
- var collection *arvados.Collection
+ validToken := make(map[string]bool)
+ var token string
var tokenUser *arvados.User
- tokenResult := make(map[string]int)
- for _, arv.ApiToken = range tokens {
- var err error
- collection, err = h.Cache.Get(arv, collectionID, forceReload)
- if err == nil {
- // Success
- break
+ var sessionFS arvados.CustomFileSystem
+ var session *cachedSession
+ var collectionDir arvados.File
+ for _, token = range tokens {
+ var statusErr interface{ HTTPStatus() int }
+ fs, sess, user, err := h.Cache.GetSession(token)
+ if errors.As(err, &statusErr) && statusErr.HTTPStatus() == http.StatusUnauthorized {
+ // bad token
+ continue
+ } else if err != nil {
+ http.Error(w, "cache error: "+err.Error(), http.StatusInternalServerError)
+ return
+ }
+ f, err := fs.OpenFile(fsprefix, dirOpenMode, 0)
+ if errors.As(err, &statusErr) && statusErr.HTTPStatus() == http.StatusForbidden {
+ // collection id is outside token scope
+ validToken[token] = true
+ continue
+ }
+ validToken[token] = true
+ if os.IsNotExist(err) {
+ // collection does not exist or is not
+ // readable using this token
+ continue
+ } else if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
}
- if srvErr, ok := err.(arvadosclient.APIServerError); ok {
- switch srvErr.HttpStatusCode {
- case 404, 401:
- // Token broken or insufficient to
- // retrieve collection
- tokenResult[arv.ApiToken] = srvErr.HttpStatusCode
- continue
+ defer f.Close()
+
+ collectionDir, sessionFS, session, tokenUser = f, fs, sess, user
+ break
+ }
+ if forceReload {
+ err := collectionDir.Sync()
+ if err != nil {
+ var statusErr interface{ HTTPStatus() int }
+ if errors.As(err, &statusErr) {
+ http.Error(w, err.Error(), statusErr.HTTPStatus())
+ } else {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
}
+ return
}
- // Something more serious is wrong
- http.Error(w, "cache error: "+err.Error(), http.StatusInternalServerError)
- return
}
- if collection == nil {
+ if session == nil {
if pathToken || !credentialsOK {
// Either the URL is a "secret sharing link"
// that didn't work out (and asking the client
return
}
for _, t := range reqTokens {
- if tokenResult[t] == 404 {
- // The client provided valid token(s), but the
- // collection was not found.
+ if validToken[t] {
+ // The client provided valid token(s),
+ // but the collection was not found.
http.Error(w, notFoundMessage, http.StatusNotFound)
return
}
return
}
- kc, err := keepclient.MakeKeepClient(arv)
- if err != nil {
- http.Error(w, "error setting up keep client: "+err.Error(), http.StatusInternalServerError)
- return
+ if r.Method == http.MethodGet || r.Method == http.MethodHead {
+ targetfnm := fsprefix + strings.Join(pathParts[stripParts:], "/")
+ if fi, err := sessionFS.Stat(targetfnm); err == nil && fi.IsDir() {
+ if !strings.HasSuffix(r.URL.Path, "/") {
+ h.seeOtherWithCookie(w, r, r.URL.Path+"/", credentialsOK)
+ } else {
+ h.serveDirectory(w, r, fi.Name(), sessionFS, targetfnm, !useSiteFS)
+ }
+ return
+ }
}
- kc.RequestID = r.Header.Get("X-Request-Id")
var basename string
if len(targetPath) > 0 {
basename = targetPath[len(targetPath)-1]
}
- applyContentDispositionHdr(w, r, basename, attachment)
-
- client := (&arvados.Client{
- APIHost: arv.ApiServer,
- AuthToken: arv.ApiToken,
- Insecure: arv.ApiInsecure,
- }).WithRequestID(r.Header.Get("X-Request-Id"))
-
- fs, err := collection.FileSystem(client, kc)
- if err != nil {
- http.Error(w, "error creating collection filesystem: "+err.Error(), http.StatusInternalServerError)
- return
- }
-
- writefs, writeOK := fs.(arvados.CollectionFileSystem)
- targetIsPDH := arvadosclient.PDHMatch(collectionID)
- if (targetIsPDH || !writeOK) && writeMethod[r.Method] {
+ if arvadosclient.PDHMatch(collectionID) && writeMethod[r.Method] {
http.Error(w, errReadOnly.Error(), http.StatusMethodNotAllowed)
return
}
-
- // Check configured permission
- _, sess, err := h.Cache.GetSession(arv.ApiToken)
- if err != nil {
- http.Error(w, "session cache: "+err.Error(), http.StatusInternalServerError)
- }
- tokenUser, err = h.Cache.GetTokenUser(arv.ApiToken)
- if e := (interface{ HTTPStatus() int })(nil); errors.As(err, &e) && e.HTTPStatus() == http.StatusForbidden {
- // Ignore expected error looking up user record when
- // using a scoped token that allows getting
- // collections/X but not users/current
- } else if err != nil {
- http.Error(w, "user lookup: "+err.Error(), http.StatusInternalServerError)
+ if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
+ http.Error(w, "Not permitted", http.StatusForbidden)
+ return
}
+ h.logUploadOrDownload(r, session.arvadosclient, sessionFS, fsprefix+strings.Join(targetPath, "/"), nil, tokenUser)
- if webdavMethod[r.Method] {
- if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
- http.Error(w, "Not permitted", http.StatusForbidden)
+ if writeMethod[r.Method] {
+ // Save the collection only if/when all
+ // webdav->filesystem operations succeed --
+ // and send a 500 error if the modified
+ // collection can't be saved.
+ //
+ // Perform the write in a separate sitefs, so
+ // concurrent read operations on the same
+ // collection see the previous saved
+ // state. After the write succeeds and the
+ // collection record is updated, we reset the
+ // session so the updates are visible in
+ // subsequent read requests.
+ client := session.client.WithRequestID(r.Header.Get("X-Request-Id"))
+ sessionFS = client.SiteFileSystem(session.keepclient)
+ writingDir, err := sessionFS.OpenFile(fsprefix, os.O_RDONLY, 0)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
- h.logUploadOrDownload(r, sess.arvadosclient, nil, strings.Join(targetPath, "/"), collection, tokenUser)
-
- if writeMethod[r.Method] {
- // Save the collection only if/when all
- // webdav->filesystem operations succeed --
- // and send a 500 error if the modified
- // collection can't be saved.
- w = &updateOnSuccess{
- ResponseWriter: w,
- logger: ctxlog.FromContext(r.Context()),
- update: func() error {
- return h.Cache.Update(client, *collection, writefs)
- }}
- }
- h := webdav.Handler{
- Prefix: "/" + strings.Join(pathParts[:stripParts], "/"),
- FileSystem: &webdavFS{
- collfs: fs,
- writing: writeMethod[r.Method],
- alwaysReadEOF: r.Method == "PROPFIND",
- },
- LockSystem: h.webdavLS,
- Logger: func(_ *http.Request, err error) {
+ defer writingDir.Close()
+ w = &updateOnSuccess{
+ ResponseWriter: w,
+ logger: ctxlog.FromContext(r.Context()),
+ update: func() error {
+ err := writingDir.Sync()
+ var te arvados.TransactionError
+ if errors.As(err, &te) {
+ err = te
+ }
if err != nil {
- ctxlog.FromContext(r.Context()).WithError(err).Error("error reported by webdav handler")
+ return err
}
- },
- }
- h.ServeHTTP(w, r)
- return
- }
-
- openPath := "/" + strings.Join(targetPath, "/")
- f, err := fs.Open(openPath)
- if os.IsNotExist(err) {
- // Requested non-existent path
- http.Error(w, notFoundMessage, http.StatusNotFound)
- return
- } else if err != nil {
- // Some other (unexpected) error
- http.Error(w, "open: "+err.Error(), http.StatusInternalServerError)
- return
- }
- defer f.Close()
- if stat, err := f.Stat(); err != nil {
- // Can't get Size/IsDir (shouldn't happen with a collectionFS!)
- http.Error(w, "stat: "+err.Error(), http.StatusInternalServerError)
- } else if stat.IsDir() && !strings.HasSuffix(r.URL.Path, "/") {
- // If client requests ".../dirname", redirect to
- // ".../dirname/". This way, relative links in the
- // listing for "dirname" can always be "fnm", never
- // "dirname/fnm".
- h.seeOtherWithCookie(w, r, r.URL.Path+"/", credentialsOK)
- } else if stat.IsDir() {
- h.serveDirectory(w, r, collection.Name, fs, openPath, true)
- } else {
- if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
- http.Error(w, "Not permitted", http.StatusForbidden)
- return
- }
- h.logUploadOrDownload(r, sess.arvadosclient, nil, strings.Join(targetPath, "/"), collection, tokenUser)
-
- http.ServeContent(w, r, basename, stat.ModTime(), f)
- if wrote := int64(w.WroteBodyBytes()); wrote != stat.Size() && w.WroteStatus() == http.StatusOK {
- // If we wrote fewer bytes than expected, it's
- // too late to change the real response code
- // or send an error message to the client, but
- // at least we can try to put some useful
- // debugging info in the logs.
- n, err := f.Read(make([]byte, 1024))
- ctxlog.FromContext(r.Context()).Errorf("stat.Size()==%d but only wrote %d bytes; read(1024) returns %d, %v", stat.Size(), wrote, n, err)
- }
- }
-}
-
-func (h *handler) getClients(reqID, token string) (arv *arvadosclient.ArvadosClient, kc *keepclient.KeepClient, client *arvados.Client, release func(), err error) {
- arv = h.clientPool.Get()
- if arv == nil {
- err = h.clientPool.Err()
- return
- }
- release = func() { h.clientPool.Put(arv) }
- arv.ApiToken = token
- kc, err = keepclient.MakeKeepClient(arv)
- if err != nil {
- release()
- return
- }
- kc.RequestID = reqID
- client = (&arvados.Client{
- APIHost: arv.ApiServer,
- AuthToken: arv.ApiToken,
- Insecure: arv.ApiInsecure,
- }).WithRequestID(reqID)
- return
-}
-
-func (h *handler) serveSiteFS(w http.ResponseWriter, r *http.Request, tokens []string, credentialsOK, attachment bool) {
- if len(tokens) == 0 {
- w.Header().Add("WWW-Authenticate", "Basic realm=\"collections\"")
- http.Error(w, unauthorizedMessage, http.StatusUnauthorized)
- return
- }
- if writeMethod[r.Method] {
- http.Error(w, errReadOnly.Error(), http.StatusMethodNotAllowed)
- return
- }
-
- fs, sess, err := h.Cache.GetSession(tokens[0])
- if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
- fs.ForwardSlashNameSubstitution(h.Cluster.Collections.ForwardSlashNameSubstitution)
- f, err := fs.Open(r.URL.Path)
- if os.IsNotExist(err) {
- http.Error(w, err.Error(), http.StatusNotFound)
- return
- } else if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
- defer f.Close()
- if fi, err := f.Stat(); err == nil && fi.IsDir() && r.Method == "GET" {
- if !strings.HasSuffix(r.URL.Path, "/") {
- h.seeOtherWithCookie(w, r, r.URL.Path+"/", credentialsOK)
- } else {
- h.serveDirectory(w, r, fi.Name(), fs, r.URL.Path, false)
- }
- return
- }
-
- tokenUser, err := h.Cache.GetTokenUser(tokens[0])
- if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
- http.Error(w, "Not permitted", http.StatusForbidden)
- return
+ // Sync the changes to the persistent
+ // sessionfs for this token.
+ snap, err := writingDir.Snapshot()
+ if err != nil {
+ return err
+ }
+ collectionDir.Splice(snap)
+ return nil
+ }}
}
- h.logUploadOrDownload(r, sess.arvadosclient, fs, r.URL.Path, nil, tokenUser)
-
- if r.Method == "GET" {
- _, basename := filepath.Split(r.URL.Path)
+ if r.Method == http.MethodGet {
applyContentDispositionHdr(w, r, basename, attachment)
}
wh := webdav.Handler{
- Prefix: "/",
+ Prefix: "/" + strings.Join(pathParts[:stripParts], "/"),
FileSystem: &webdavFS{
- collfs: fs,
+ collfs: sessionFS,
+ prefix: fsprefix,
writing: writeMethod[r.Method],
alwaysReadEOF: r.Method == "PROPFIND",
},
LockSystem: h.webdavLS,
- Logger: func(_ *http.Request, err error) {
+ Logger: func(r *http.Request, err error) {
if err != nil {
ctxlog.FromContext(r.Context()).WithError(err).Error("error reported by webdav handler")
}
},
}
wh.ServeHTTP(w, r)
+ if r.Method == http.MethodGet && w.WroteStatus() == http.StatusOK {
+ wrote := int64(w.WroteBodyBytes())
+ fnm := strings.Join(pathParts[stripParts:], "/")
+ fi, err := wh.FileSystem.Stat(r.Context(), fnm)
+ if err == nil && fi.Size() != wrote {
+ var n int
+ f, err := wh.FileSystem.OpenFile(r.Context(), fnm, os.O_RDONLY, 0)
+ if err == nil {
+ n, err = f.Read(make([]byte, 1024))
+ f.Close()
+ }
+ ctxlog.FromContext(r.Context()).Errorf("stat.Size()==%d but only wrote %d bytes; read(1024) returns %d, %v", fi.Size(), wrote, n, err)
+ }
+ }
}
var dirListingTemplate = `<!DOCTYPE HTML>
func (h *handler) determineCollection(fs arvados.CustomFileSystem, path string) (*arvados.Collection, string) {
target := strings.TrimSuffix(path, "/")
- for {
+ for cut := len(target); cut >= 0; cut = strings.LastIndexByte(target, '/') {
+ target = target[:cut]
fi, err := fs.Stat(target)
- if err != nil {
+ if os.IsNotExist(err) {
+ // creating a new file/dir, or download
+ // destined to fail
+ continue
+ } else if err != nil {
return nil, ""
}
switch src := fi.Sys().(type) {
return nil, ""
}
}
- // Try parent
- cut := strings.LastIndexByte(target, '/')
- if cut < 0 {
- return nil, ""
- }
- target = target[:cut]
}
+ return nil, ""
}
}
}
+func (s *IntegrationSuite) do(method string, urlstring string, token string, hdr http.Header) (*http.Request, *httptest.ResponseRecorder) {
+ u := mustParseURL(urlstring)
+ if hdr == nil && token != "" {
+ hdr = http.Header{"Authorization": {"Bearer " + token}}
+ } else if hdr == nil {
+ hdr = http.Header{}
+ } else if token != "" {
+ panic("must not pass both token and hdr")
+ }
+ return s.doReq(&http.Request{
+ Method: method,
+ Host: u.Host,
+ URL: u,
+ RequestURI: u.RequestURI(),
+ Header: hdr,
+ })
+}
+
func (s *IntegrationSuite) doReq(req *http.Request) (*http.Request, *httptest.ResponseRecorder) {
resp := httptest.NewRecorder()
s.handler.ServeHTTP(resp, req)
)
}
+func (s *IntegrationSuite) TestCollectionSharingToken(c *check.C) {
+ s.testVhostRedirectTokenToCookie(c, "GET",
+ "example.com/c="+arvadostest.FooFileCollectionUUID+"/t="+arvadostest.FooFileCollectionSharingToken+"/foo",
+ "",
+ nil,
+ "",
+ http.StatusOK,
+ "foo",
+ )
+ // Same valid sharing token, but requesting a different collection
+ s.testVhostRedirectTokenToCookie(c, "GET",
+ "example.com/c="+arvadostest.FooCollection+"/t="+arvadostest.FooFileCollectionSharingToken+"/foo",
+ "",
+ nil,
+ "",
+ http.StatusNotFound,
+ notFoundMessage+"\n",
+ )
+}
+
// Bad token in URL is 404 Not Found because it doesn't make sense to
// retry the same URL with different authorization.
func (s *IntegrationSuite) TestSingleOriginSecretLinkBadToken(c *check.C) {
}
func (s *IntegrationSuite) checkUploadDownloadRequest(c *check.C, req *http.Request,
- successCode int, direction string, perm bool, userUuid string, collectionUuid string, filepath string) {
+ successCode int, direction string, perm bool, userUuid, collectionUuid, collectionPDH, filepath string) {
client := arvados.NewClientFromEnv()
client.AuthToken = arvadostest.AdminToken
c.Check(err, check.IsNil)
c.Check(logentries.Items, check.HasLen, 1)
lastLogId := logentries.Items[0].ID
+ c.Logf("lastLogId: %d", lastLogId)
var logbuf bytes.Buffer
logger := logrus.New()
deadline := time.Now().Add(time.Second)
for {
c.Assert(time.Now().After(deadline), check.Equals, false, check.Commentf("timed out waiting for log entry"))
+ logentries = arvados.LogList{}
err = client.RequestAndDecode(&logentries, "GET", "arvados/v1/logs", nil,
arvados.ResourceListParams{
Filters: []arvados.Filter{
logentries.Items[0].ID > lastLogId &&
logentries.Items[0].ObjectUUID == userUuid &&
logentries.Items[0].Properties["collection_uuid"] == collectionUuid &&
+ (collectionPDH == "" || logentries.Items[0].Properties["portable_data_hash"] == collectionPDH) &&
logentries.Items[0].Properties["collection_file_path"] == filepath {
break
}
},
}
s.checkUploadDownloadRequest(c, req, http.StatusOK, "download", adminperm,
- arvadostest.AdminUserUUID, arvadostest.FooCollection, "foo")
+ arvadostest.AdminUserUUID, arvadostest.FooCollection, arvadostest.FooCollectionPDH, "foo")
// Test user permission
req = &http.Request{
},
}
s.checkUploadDownloadRequest(c, req, http.StatusOK, "download", userperm,
- arvadostest.ActiveUserUUID, arvadostest.FooCollection, "foo")
+ arvadostest.ActiveUserUUID, arvadostest.FooCollection, arvadostest.FooCollectionPDH, "foo")
}
}
},
}
s.checkUploadDownloadRequest(c, req, http.StatusOK, "download", true,
- arvadostest.ActiveUserUUID, arvadostest.MultilevelCollection1, "dir1/subdir/file1")
+ arvadostest.ActiveUserUUID, arvadostest.MultilevelCollection1, arvadostest.MultilevelCollection1PDH, "dir1/subdir/file1")
}
u = mustParseURL("http://" + strings.Replace(arvadostest.FooCollectionPDH, "+", "-", 1) + ".keep-web.example/foo")
},
}
s.checkUploadDownloadRequest(c, req, http.StatusOK, "download", true,
- arvadostest.ActiveUserUUID, arvadostest.FooCollection, "foo")
+ arvadostest.ActiveUserUUID, "", arvadostest.FooCollectionPDH, "foo")
}
func (s *IntegrationSuite) TestUploadLoggingPermission(c *check.C) {
Body: io.NopCloser(bytes.NewReader([]byte("bar"))),
}
s.checkUploadDownloadRequest(c, req, http.StatusCreated, "upload", adminperm,
- arvadostest.AdminUserUUID, coll.UUID, "bar")
+ arvadostest.AdminUserUUID, coll.UUID, "", "bar")
// Test user permission
req = &http.Request{
Body: io.NopCloser(bytes.NewReader([]byte("bar"))),
}
s.checkUploadDownloadRequest(c, req, http.StatusCreated, "upload", userperm,
- arvadostest.ActiveUserUUID, coll.UUID, "bar")
+ arvadostest.ActiveUserUUID, coll.UUID, "", "bar")
}
}
}
"github.com/prometheus/client_golang/prometheus"
)
-var (
- version = "dev"
-)
-
var Command = service.Command(arvados.ServiceNameKeepweb, newHandlerOrErrorHandler)
func newHandlerOrErrorHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler {
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
- "git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
- "git.arvados.org/arvados.git/sdk/go/keepclient"
"github.com/AdRoll/goamz/s3"
)
return false
}
- var err error
- var fs arvados.CustomFileSystem
- var arvclient *arvadosclient.ArvadosClient
- if r.Method == http.MethodGet || r.Method == http.MethodHead {
- // Use a single session (cached FileSystem) across
- // multiple read requests.
- var sess *cachedSession
- fs, sess, err = h.Cache.GetSession(token)
- if err != nil {
- s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
- return true
- }
- arvclient = sess.arvadosclient
- } else {
+ fs, sess, tokenUser, err := h.Cache.GetSession(token)
+ if err != nil {
+ s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
+ return true
+ }
+ readfs := fs
+ if writeMethod[r.Method] {
// Create a FileSystem for this request, to avoid
// exposing incomplete write operations to concurrent
// requests.
- var kc *keepclient.KeepClient
- var release func()
- var client *arvados.Client
- arvclient, kc, client, release, err = h.getClients(r.Header.Get("X-Request-Id"), token)
- if err != nil {
- s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
- return true
- }
- defer release()
- fs = client.SiteFileSystem(kc)
+ client := sess.client.WithRequestID(r.Header.Get("X-Request-Id"))
+ fs = client.SiteFileSystem(sess.keepclient)
fs.ForwardSlashNameSubstitution(h.Cluster.Collections.ForwardSlashNameSubstitution)
}
return true
}
- tokenUser, err := h.Cache.GetTokenUser(token)
if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
http.Error(w, "Not permitted", http.StatusForbidden)
return true
}
- h.logUploadOrDownload(r, arvclient, fs, fspath, nil, tokenUser)
+ h.logUploadOrDownload(r, sess.arvadosclient, fs, fspath, nil, tokenUser)
// shallow copy r, and change URL path
r := *r
}
defer f.Close()
- tokenUser, err := h.Cache.GetTokenUser(token)
if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
http.Error(w, "Not permitted", http.StatusForbidden)
return true
}
- h.logUploadOrDownload(r, arvclient, fs, fspath, nil, tokenUser)
+ h.logUploadOrDownload(r, sess.arvadosclient, fs, fspath, nil, tokenUser)
_, err = io.Copy(f, r.Body)
if err != nil {
return true
}
}
- err = fs.Sync()
+ err = h.syncCollection(fs, readfs, fspath)
if err != nil {
err = fmt.Errorf("sync failed: %w", err)
s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
return true
}
- // Ensure a subsequent read operation will see the changes.
- h.Cache.ResetSession(token)
w.WriteHeader(http.StatusOK)
return true
case r.Method == http.MethodDelete:
s3ErrorResponse(w, InvalidArgument, err.Error(), r.URL.Path, http.StatusBadRequest)
return true
}
- err = fs.Sync()
+ err = h.syncCollection(fs, readfs, fspath)
if err != nil {
err = fmt.Errorf("sync failed: %w", err)
s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
return true
}
- // Ensure a subsequent read operation will see the changes.
- h.Cache.ResetSession(token)
w.WriteHeader(http.StatusNoContent)
return true
default:
}
}
+// Save modifications to the indicated collection in srcfs, then (if
+// successful) ensure they are also reflected in dstfs.
+func (h *handler) syncCollection(srcfs, dstfs arvados.CustomFileSystem, path string) error {
+ coll, _ := h.determineCollection(srcfs, path)
+ if coll == nil || coll.UUID == "" {
+ return errors.New("could not determine collection to sync")
+ }
+ d, err := srcfs.OpenFile("by_id/"+coll.UUID, os.O_RDWR, 0777)
+ if err != nil {
+ return err
+ }
+ defer d.Close()
+ err = d.Sync()
+ if err != nil {
+ return err
+ }
+ snap, err := d.Snapshot()
+ if err != nil {
+ return err
+ }
+ dstd, err := dstfs.OpenFile("by_id/"+coll.UUID, os.O_RDWR, 0777)
+ if err != nil {
+ return err
+ }
+ defer dstd.Close()
+ return dstd.Splice(snap)
+}
+
func setFileInfoHeaders(header http.Header, fs arvados.CustomFileSystem, path string) error {
maybeEncode := func(s string) string {
for _, c := range s {
func (s *IntegrationSuite) TestS3CollectionPutObjectSuccess(c *check.C) {
stage := s.s3setup(c)
defer stage.teardown(c)
- s.testS3PutObjectSuccess(c, stage.collbucket, "")
+ s.testS3PutObjectSuccess(c, stage.collbucket, "", stage.coll.UUID)
}
func (s *IntegrationSuite) TestS3ProjectPutObjectSuccess(c *check.C) {
stage := s.s3setup(c)
defer stage.teardown(c)
- s.testS3PutObjectSuccess(c, stage.projbucket, stage.coll.Name+"/")
+ s.testS3PutObjectSuccess(c, stage.projbucket, stage.coll.Name+"/", stage.coll.UUID)
}
-func (s *IntegrationSuite) testS3PutObjectSuccess(c *check.C, bucket *s3.Bucket, prefix string) {
+func (s *IntegrationSuite) testS3PutObjectSuccess(c *check.C, bucket *s3.Bucket, prefix string, collUUID string) {
for _, trial := range []struct {
path string
size int
if !c.Check(err, check.NotNil) {
continue
}
- c.Check(err.(*s3.Error).StatusCode, check.Equals, 404)
+ c.Check(err.(*s3.Error).StatusCode, check.Equals, http.StatusNotFound)
c.Check(err.(*s3.Error).Code, check.Equals, `NoSuchKey`)
if !c.Check(err, check.ErrorMatches, `The specified key does not exist.`) {
continue
c.Check(err, check.IsNil)
c.Check(buf2, check.HasLen, len(buf))
c.Check(bytes.Equal(buf, buf2), check.Equals, true)
+
+ // Check that the change is immediately visible via
+ // (non-S3) webdav request.
+ _, resp := s.do("GET", "http://"+collUUID+".keep-web.example/"+trial.path, arvadostest.ActiveTokenV2, nil)
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+ if !strings.HasSuffix(trial.path, "/") {
+ c.Check(resp.Body.Len(), check.Equals, trial.size)
+ }
}
}
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/httpserver"
"git.arvados.org/arvados.git/sdk/go/keepclient"
+ "github.com/prometheus/client_golang/prometheus"
check "gopkg.in/check.v1"
)
} {
hdr, body, _ := s.runCurl(c, token, "collections.example.com", "/collections/"+arvadostest.FooCollection+"/foo")
c.Check(hdr, check.Matches, `(?s)HTTP/1.1 404 Not Found\r\n.*`)
- c.Check(body, check.Equals, notFoundMessage+"\n")
+ c.Check(strings.TrimSpace(body), check.Equals, notFoundMessage)
if token != "" {
hdr, body, _ = s.runCurl(c, token, "collections.example.com", "/collections/download/"+arvadostest.FooCollection+"/"+token+"/foo")
c.Check(hdr, check.Matches, `(?s)HTTP/1.1 404 Not Found\r\n.*`)
- c.Check(body, check.Equals, notFoundMessage+"\n")
+ c.Check(strings.TrimSpace(body), check.Equals, notFoundMessage)
}
hdr, body, _ = s.runCurl(c, token, "collections.example.com", "/bad-route")
c.Check(hdr, check.Matches, `(?s)HTTP/1.1 404 Not Found\r\n.*`)
- c.Check(body, check.Equals, notFoundMessage+"\n")
+ c.Check(strings.TrimSpace(body), check.Equals, notFoundMessage)
}
}
hdr, body, _ := s.runCurl(c, arvadostest.ActiveToken, "collections.example.com", uri)
c.Check(hdr, check.Matches, "(?s)HTTP/1.1 404 Not Found\r\n.*")
if len(body) > 0 {
- c.Check(body, check.Equals, notFoundMessage+"\n")
+ c.Check(strings.TrimSpace(body), check.Equals, notFoundMessage)
}
}
}
c.Check(summaries["request_duration_seconds/get/200"].SampleCount, check.Equals, "3")
c.Check(summaries["request_duration_seconds/get/404"].SampleCount, check.Equals, "1")
c.Check(summaries["time_to_status_seconds/get/404"].SampleCount, check.Equals, "1")
- c.Check(counters["arvados_keepweb_collectioncache_requests//"].Value, check.Equals, int64(2))
- c.Check(counters["arvados_keepweb_collectioncache_api_calls//"].Value, check.Equals, int64(2))
- c.Check(counters["arvados_keepweb_collectioncache_hits//"].Value, check.Equals, int64(1))
- c.Check(counters["arvados_keepweb_collectioncache_pdh_hits//"].Value, check.Equals, int64(1))
- c.Check(gauges["arvados_keepweb_collectioncache_cached_manifests//"].Value, check.Equals, float64(1))
- // FooCollection's cached manifest size is 45 ("1f4b0....+45")
- // plus one 51-byte blob signature; session fs counts 3 inodes
- // * 64 bytes.
- c.Check(gauges["arvados_keepweb_sessions_cached_collection_bytes//"].Value, check.Equals, float64(45+51+64*3))
+ c.Check(gauges["arvados_keepweb_sessions_cached_session_bytes//"].Value, check.Equals, float64(469))
// If the Host header indicates a collection, /metrics.json
// refers to a file in the collection -- the metrics handler
ctx := ctxlog.Context(context.Background(), logger)
- s.handler = newHandlerOrErrorHandler(ctx, cluster, cluster.SystemRootToken, nil).(*handler)
+ s.handler = newHandlerOrErrorHandler(ctx, cluster, cluster.SystemRootToken, prometheus.NewRegistry()).(*handler)
s.testServer = httptest.NewUnstartedServer(
httpserver.AddRequestIDs(
httpserver.LogRequests(
// existence automatically so sequences like "mkcol foo; put foo/bar"
// work as expected.
type webdavFS struct {
- collfs arvados.FileSystem
+ collfs arvados.FileSystem
+ // prefix works like fs.Sub: Stat(name) calls
+ // Stat(prefix+name) in the wrapped filesystem.
+ prefix string
writing bool
// webdav PROPFIND reads the first few bytes of each file
// whose filename extension isn't recognized, which is
}
dir = dir[:len(dir)-1]
fs.makeparents(dir)
- fs.collfs.Mkdir(dir, 0755)
+ fs.collfs.Mkdir(fs.prefix+dir, 0755)
}
func (fs *webdavFS) Mkdir(ctx context.Context, name string, perm os.FileMode) error {
}
name = strings.TrimRight(name, "/")
fs.makeparents(name)
- return fs.collfs.Mkdir(name, 0755)
+ return fs.collfs.Mkdir(fs.prefix+name, 0755)
}
func (fs *webdavFS) OpenFile(ctx context.Context, name string, flag int, perm os.FileMode) (f webdav.File, err error) {
if writing {
fs.makeparents(name)
}
- f, err = fs.collfs.OpenFile(name, flag, perm)
+ f, err = fs.collfs.OpenFile(fs.prefix+name, flag, perm)
if !fs.writing {
// webdav module returns 404 on all OpenFile errors,
// but returns 405 Method Not Allowed if OpenFile()
}
func (fs *webdavFS) RemoveAll(ctx context.Context, name string) error {
- return fs.collfs.RemoveAll(name)
+ return fs.collfs.RemoveAll(fs.prefix + name)
}
func (fs *webdavFS) Rename(ctx context.Context, oldName, newName string) error {
newName = strings.TrimSuffix(newName, "/")
}
fs.makeparents(newName)
- return fs.collfs.Rename(oldName, newName)
+ return fs.collfs.Rename(fs.prefix+oldName, fs.prefix+newName)
}
func (fs *webdavFS) Stat(ctx context.Context, name string) (os.FileInfo, error) {
if fs.writing {
fs.makeparents(name)
}
- return fs.collfs.Stat(name)
+ return fs.collfs.Stat(fs.prefix + name)
}
type writeFailer struct {
defer srv.Close()
hash, _, err := kc.PutB([]byte("shareddata"))
c.Check(err, IsNil)
- kc.Arvados.ApiToken = arvadostest.FooCollectionSharingToken
+ kc.Arvados.ApiToken = arvadostest.FooFileCollectionSharingToken
rdr, _, _, err := kc.Get(hash)
c.Assert(err, IsNil)
data, err := ioutil.ReadAll(rdr)
collectionNameCache = {}
def getCollectionName(arv, uuid, pdh):
lookupField = uuid
- filters = [["uuid","=",uuid]]
+ filters = [["uuid", "=", uuid]]
cached = uuid in collectionNameCache
# look up by uuid if it is available, fall back to look up by pdh
- if len(uuid) != 27:
+ if uuid is None or len(uuid) != 27:
# Look up by pdh. Note that this can be misleading; the download could
# have happened from a collection with the same pdh but different name.
# We arbitrarily pick the oldest collection with the pdh to lookup the
# name, if the uuid for the request is not known.
lookupField = pdh
- filters = [["portable_data_hash","=",pdh]]
+ filters = [["portable_data_hash", "=", pdh]]
cached = pdh in collectionNameCache
if not cached:
- u = arv.collections().list(filters=filters,order="created_at",limit=1).execute().get("items")
+ u = arv.collections().list(filters=filters, order="created_at", limit=1).execute().get("items")
if len(u) < 1:
return "(deleted)"
collectionNameCache[lookupField] = u[0]["name"]
users[owner].append([loguuid, event_at, "Deleted collection %s" % (getname(e["properties"]["old_attributes"]))])
elif e["event_type"] == "file_download":
- users.setdefault(e["object_uuid"], [])
- users[e["object_uuid"]].append([loguuid, event_at, "Downloaded file \"%s\" from \"%s\" (%s) (%s)" % (
- e["properties"].get("collection_file_path") or e["properties"].get("reqPath"),
- getCollectionName(arv, e["properties"].get("collection_uuid"), e["properties"].get("portable_data_hash")),
- e["properties"].get("collection_uuid"),
- e["properties"].get("portable_data_hash"))])
-
+ users.setdefault(e["object_uuid"], [])
+ users[e["object_uuid"]].append([loguuid, event_at, "Downloaded file \"%s\" from \"%s\" (%s) (%s)" % (
+ e["properties"].get("collection_file_path") or e["properties"].get("reqPath"),
+ getCollectionName(arv, e["properties"].get("collection_uuid"), e["properties"].get("portable_data_hash")),
+ e["properties"].get("collection_uuid"),
+ e["properties"].get("portable_data_hash"))])
elif e["event_type"] == "file_upload":
- users.setdefault(e["object_uuid"], [])
- users[e["object_uuid"]].append([loguuid, event_at, "Uploaded file \"%s\" to \"%s\" (%s)" % (
- e["properties"].get("collection_file_path") or e["properties"].get("reqPath"),
- getCollectionName(arv, e["properties"].get("collection_uuid"), e["properties"].get("portable_data_hash")),
- e["properties"].get("collection_uuid"))])
+ users.setdefault(e["object_uuid"], [])
+ users[e["object_uuid"]].append([loguuid, event_at, "Uploaded file \"%s\" to \"%s\" (%s)" % (
+ e["properties"].get("collection_file_path") or e["properties"].get("reqPath"),
+ getCollectionName(arv, e["properties"].get("collection_uuid"), e["properties"].get("portable_data_hash")),
+ e["properties"].get("collection_uuid"))])
else:
users[owner].append([loguuid, event_at, "%s %s %s" % (e["event_type"], e["object_kind"], e["object_uuid"])])