Merge branch '19400-login-sync-logincluster'. Closes #19400
authorLucas Di Pentima <lucas.dipentima@curii.com>
Thu, 13 Oct 2022 15:54:15 +0000 (12:54 -0300)
committerLucas Di Pentima <lucas.dipentima@curii.com>
Thu, 13 Oct 2022 15:54:15 +0000 (12:54 -0300)
Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <lucas.dipentima@curii.com>

53 files changed:
doc/_config.yml
doc/admin/group-management.html.textile.liquid
doc/admin/upgrading.html.textile.liquid
doc/api/methods/collections.html.textile.liquid
doc/install/crunch2-lsf/install-dispatch.html.textile.liquid
doc/sdk/python/cookbook.html.textile.liquid
lib/config/config.default.yml
lib/config/deprecated.go
lib/config/deprecated_test.go
lib/config/export.go
lib/controller/federation/conn.go
lib/controller/handler.go
lib/controller/handler_test.go
lib/controller/localdb/collection.go
lib/controller/localdb/conn.go
lib/controller/localdb/container_request.go
lib/controller/localdb/group.go
lib/controller/localdb/log_activity.go [new file with mode: 0644]
lib/controller/localdb/log_activity_test.go [new file with mode: 0644]
lib/ctrlctx/auth.go [new file with mode: 0644]
lib/ctrlctx/auth_test.go [new file with mode: 0644]
lib/ctrlctx/db.go
lib/diagnostics/cmd.go
lib/lsf/dispatch.go
lib/lsf/dispatch_test.go
sdk/cwl/arvados_cwl/executor.py
sdk/go/arvados/client.go
sdk/go/arvados/config.go
sdk/go/arvados/fs_base.go
sdk/go/arvados/fs_collection.go
sdk/go/arvados/fs_collection_test.go
sdk/go/arvados/fs_deferred.go
sdk/go/arvados/fs_project.go
sdk/go/arvados/fs_project_test.go
sdk/go/arvados/fs_site.go
sdk/go/arvados/fs_site_test.go
sdk/go/arvados/fs_users.go
sdk/go/arvadostest/db.go
sdk/go/arvadostest/fixtures.go
sdk/go/health/aggregator.go
sdk/go/keepclient/block_cache.go
services/api/test/fixtures/collections.yml
services/keep-web/cache.go
services/keep-web/cache_test.go
services/keep-web/handler.go
services/keep-web/handler_test.go
services/keep-web/main.go
services/keep-web/s3.go
services/keep-web/s3_test.go
services/keep-web/server_test.go
services/keep-web/webdav.go
services/keepproxy/keepproxy_test.go
tools/user-activity/arvados_user_activity/main.py

index 148e1a166e0ac6d96499969e54cb7e5c0c1a3c1d..a6f7e608639309d23fe6f81da82f8b17474f0333 100644 (file)
@@ -89,6 +89,7 @@ navbar:
       - sdk/cli/index.html.textile.liquid
       - sdk/cli/reference.html.textile.liquid
       - sdk/cli/subcommands.html.textile.liquid
+      - sdk/cli/project-management.html.textile.liquid
     - Go:
       - sdk/go/index.html.textile.liquid
       - sdk/go/example.html.textile.liquid
@@ -172,21 +173,21 @@ navbar:
     - Users and Groups:
       - admin/user-management.html.textile.liquid
       - admin/user-management-cli.html.textile.liquid
+      - admin/group-management.html.textile.liquid
       - admin/reassign-ownership.html.textile.liquid
       - admin/link-accounts.html.textile.liquid
-      - admin/group-management.html.textile.liquid
       - admin/federation.html.textile.liquid
       - admin/merge-remote-account.html.textile.liquid
       - admin/migrating-providers.html.textile.liquid
       - user/topics/arvados-sync-external-sources.html.textile.liquid
       - admin/scoped-tokens.html.textile.liquid
       - admin/token-expiration-policy.html.textile.liquid
-      - admin/user-activity.html.textile.liquid
     - Monitoring:
       - admin/logging.html.textile.liquid
       - admin/metrics.html.textile.liquid
       - admin/health-checks.html.textile.liquid
       - admin/management-token.html.textile.liquid
+      - admin/user-activity.html.textile.liquid
     - Data Management:
       - admin/collection-versioning.html.textile.liquid
       - admin/collection-managed-properties.html.textile.liquid
index dddfe13ac85d4aca8b0407f9f7b1f562f15aae8a..43319e873a577717ae0aee65cf6e023c08770939 100644 (file)
@@ -1,7 +1,7 @@
 ---
 layout: default
 navsection: admin
-title: Group management
+title: Role group management at the CLI
 ...
 
 {% comment %}
@@ -12,7 +12,7 @@ SPDX-License-Identifier: CC-BY-SA-3.0
 
 This page describes how to manage groups at the command line.  You should be familiar with the "permission system":{{site.baseurl}}/api/permission-model.html .
 
-h2. Create a group
+h2. Create a role group
 
 User groups are entries in the "groups" table with @"group_class": "role"@.
 
@@ -20,7 +20,7 @@ User groups are entries in the "groups" table with @"group_class": "role"@.
 arv group create --group '{"name": "My new group", "group_class": "role"}'
 </pre>
 
-h2(#add). Add a user to a group
+h2(#add). Add a user to a role group
 
 There are two separate permissions associated with group membership.  The first link grants the user @can_manage@ permission to manage things that the group can manage.  The second link grants permission for other users of the group to see that this user is part of the group.
 
@@ -40,13 +40,13 @@ arv link create --link '{
 
 A user can also be given read-only access to a group.  In that case, the first link should be created with @can_read@ instead of @can_manage@.
 
-h2. List groups
+h2. List role groups
 
 <pre>
 arv group list --filters '[["group_class", "=", "role"]]'
 </pre>
 
-h2. List members of a group
+h2. List members of a role group
 
 Use the command "jq":https://stedolan.github.io/jq/ to extract the tail_uuid of each permission link which has the user uuid.
 
@@ -55,9 +55,9 @@ arv link list --filters '[["link_class", "=", "permission"],
   ["head_uuid", "=", "the_group_uuid"]]' | jq .items[].tail_uuid
 </pre>
 
-h2. Share a project with a group
+h2(#share-project). Share a project with a role group
 
-This will give all members of the group @can_manage@ access.
+Members of the role group will have access to the project based on their level of access to the role group.
 
 <pre>
 arv link create --link '{
@@ -67,7 +67,7 @@ arv link create --link '{
   "head_uuid": "the_project_uuid"}'
 </pre>
 
-A project can also be shared read-only.  In that case, the first link should be created with @can_read@ instead of @can_manage@.
+A project can also be shared read-only.  In that case, the link @name@ should be @can_read@ instead of @can_manage@.
 
 h2. List things shared with the group
 
@@ -78,7 +78,7 @@ arv link list --filters '[["link_class", "=", "permission"],
   ["tail_uuid", "=", "the_group_uuid"]]' | jq .items[].head_uuid
 </pre>
 
-h2. Stop sharing a project with a group
+h2(#stop-sharing-project). Stop sharing a project with a group
 
 This will remove access for members of the group.
 
@@ -91,7 +91,7 @@ arv --format=uuid link list --filters '[["link_class", "=", "permission"],
 arv link delete --uuid each_link_uuid
 </pre>
 
-h2. Remove user from a group
+h2. Remove user from a role group
 
 The first step is to find the permission link objects.  The second step is to delete them.
 
index 8ed5af19c525d375e4f1a41801e4c765fe30858b..a3717e3c567934f12826438bb3b97e5fe3855ba9 100644 (file)
@@ -28,14 +28,37 @@ TODO: extract this information based on git commit messages and generate changel
 <div class="releasenotes">
 </notextile>
 
-h2(#main). development main (as of 2022-08-26)
 
-"previous: Upgrading to 2.4.2":#v2_4_2
+h2(#main). development main (as of 2022-09-21)
+
+"previous: Upgrading to 2.4.3":#v2_4_3
 
 h3. Login-sync script requires configuration update on LoginCluster federations
 
 If you have @arvados-login-sync@ running on a satellite cluster, please update the environment variable settings by removing the @LOGINCLUSTER_ARVADOS_API_*@ variables and setting @ARVADOS_API_TOKEN@ to a LoginCluster's admin token, as described on the "updated install page":{{site.baseurl}}/install/install-shell-server.html#arvados-login-sync.
 
+h3. Renamed keep-web metrics and WebDAV configs
+
+Metrics previously reported by keep-web (@arvados_keepweb_collectioncache_requests@, @..._hits@, @..._pdh_hits@, @..._api_calls@, @..._cached_manifests@, and @arvados_keepweb_sessions_cached_collection_bytes@) have been replaced with @arvados_keepweb_cached_session_bytes@.
+
+The config entries @Collections.WebDAVCache.UUIDTTL@, @...MaxCollectionEntries@, and @...MaxUUIDEntries@ are no longer used, and should be removed from your config file.
+
+h2(#v2_4_3). v2.4.3 (2022-09-21)
+
+"previous: Upgrading to 2.4.2":#v2_4_2
+
+h3. Fixed PAM authentication security vulnerability
+
+In Arvados 2.4.2 and earlier, when using PAM authentication, if a user
+presented valid credentials but the account is disabled or otherwise
+not allowed to access the host, it would still be accepted for access
+to Arvados.  From 2.4.3 onwards, Arvados now also checks that the
+account is permitted to access the host before completing the PAM login
+process.
+
+Other authentication methods (LDAP, OpenID Connect) are not affected
+by this flaw.
+
 h2(#v2_4_2). v2.4.2 (2022-08-09)
 
 "previous: Upgrading to 2.4.1":#v2_4_1
index a2a6a77e19f6350c1be3c770a1560f42c9cc6402..5871337b0ae8b14760d9d32292a27f1eed29ecc8 100644 (file)
@@ -88,7 +88,7 @@ table(table table-bordered table-condensed).
 
 h3. get
 
-Gets a Collection's metadata by UUID or portable data hash.  When making a request by portable data hash, attributes other than @portable_data_hash@ and @manifest_text@ are not returned, even when requested explicitly using the @select@ parameter.
+Gets a Collection's metadata by UUID or portable data hash.  When making a request by portable data hash, attributes other than @portable_data_hash@, @manifest_text@, and @trash_at@ are not returned, even when requested explicitly using the @select@ parameter.
 
 Arguments:
 
index 37adffd18d4e9bef5162614b015a3155df3333a5..ded244046dde211ea2b18dab7779d5159ffc100e 100644 (file)
@@ -62,7 +62,7 @@ Alternatively, you can arrange for the arvados-dispatch-lsf process to run as an
 </notextile>
 
 
-h3(#SbatchArguments). Containers.LSF.BsubArgumentsList
+h3(#BsubArgumentsList). Containers.LSF.BsubArgumentsList
 
 When arvados-dispatch-lsf invokes @bsub@, you can add arguments to the command by specifying @BsubArgumentsList@.  You can use this to send the jobs to specific cluster partitions or add resource requests.  Set @BsubArgumentsList@ to an array of strings.
 
@@ -87,7 +87,7 @@ For example:
 
 Note that the default value for @BsubArgumentsList@ uses the @-o@ and @-e@ arguments to write stdout/stderr data to files in @/tmp@ on the compute nodes, which is helpful for troubleshooting installation/configuration problems. Ensure you have something in place to delete old files from @/tmp@, or adjust these arguments accordingly.
 
-h3(#SbatchArguments). Containers.LSF.BsubCUDAArguments
+h3(#BsubCUDAArguments). Containers.LSF.BsubCUDAArguments
 
 If the container requests access to GPUs (@runtime_constraints.cuda.device_count@ of the container request is greater than zero), the command line arguments in @BsubCUDAArguments@ will be added to the command line _after_ @BsubArgumentsList@.  This should consist of the additional @bsub@ flags your site requires to schedule the job on a node with GPU support.  Set @BsubCUDAArguments@ to an array of strings.  For example:
 
@@ -98,7 +98,7 @@ If the container requests access to GPUs (@runtime_constraints.cuda.device_count
 </pre>
 </notextile>
 
-h3(#PollPeriod). Containers.PollInterval
+h3(#PollInterval). Containers.PollInterval
 
 arvados-dispatch-lsf polls the API server periodically for new containers to run.  The @PollInterval@ option controls how often this poll happens.  Set this to a string of numbers suffixed with one of the time units @s@, @m@, or @h@.  For example:
 
@@ -122,7 +122,7 @@ Supports suffixes @KB@, @KiB@, @MB@, @MiB@, @GB@, @GiB@, @TB@, @TiB@, @PB@, @PiB
 </notextile>
 
 
-h3(#CrunchRunCommand-network). Containers.CrunchRunArgumentList: Using host networking for containers
+h3(#CrunchRunArgumentList). Containers.CrunchRunArgumentList: Using host networking for containers
 
 Older Linux kernels (prior to 3.18) have bugs in network namespace handling which can lead to compute node lockups.  This by is indicated by blocked kernel tasks in "Workqueue: netns cleanup_net".   If you are experiencing this problem, as a workaround you can disable use of network namespaces by Docker across the cluster.  Be aware this reduces container isolation, which may be a security risk.
 
@@ -134,6 +134,37 @@ Older Linux kernels (prior to 3.18) have bugs in network namespace handling whic
 </pre>
 </notextile>
 
+
+h3(#InstanceTypes). InstanceTypes: Avoid submitting jobs with unsatisfiable resource constraints
+
+LSF does not provide feedback when a submitted job's RAM, CPU, or disk space constraints cannot be satisfied by any node: the job will wait in the queue indefinitely with "pending" status, reported by Arvados as "queued".
+
+As a workaround, you can configure @InstanceTypes@ with your LSF cluster's compute node sizes. Arvados will use these sizes to determine when a container is impossible to run, and cancel it instead of submitting an LSF job.
+
+Apart from detecting non-runnable containers, the configured instance types will not have any effect on scheduling.
+
+<notextile>
+<pre>    InstanceTypes:
+      most-ram:
+        VCPUs: 8
+        RAM: 640GiB
+        IncludedScratch: 640GB
+      most-cpus:
+        VCPUs: 32
+        RAM: 256GiB
+        IncludedScratch: 640GB
+      gpu:
+        VCPUs: 8
+        RAM: 256GiB
+        IncludedScratch: 640GB
+        CUDA:
+          DriverVersion: "11.4"
+          HardwareCapability: "7.5"
+          DeviceCount: 1
+</pre>
+</notextile>
+
+
 {% assign arvados_component = 'arvados-dispatch-lsf' %}
 
 {% include 'install_packages' %}
index f3186ebbb6d76d66221860f669960cb9737688eb..53330dcbebbaf762cd1a557200e2b204ad8774a6 100644 (file)
@@ -10,14 +10,36 @@ Copyright (C) The Arvados Authors. All rights reserved.
 SPDX-License-Identifier: CC-BY-SA-3.0
 {% endcomment %}
 
-h2. Cancel a container request
+# "Cancel a container request":#cancel-a-container-request
+# "Cancel all container requests":#cancel-all-container-requests
+# "List completed container requests":#list-completed-container-requests
+# "Get input of a CWL workflow":#get-input-of-a-cwl-workflow
+# "Get output of a CWL workflow":#get-output-of-a-cwl-workflow
+# "Get state of a CWL workflow":#get-state-of-a-cwl-workflow
+# "List input of child requests":#list-input-of-child-requests
+# "List output of child requests":#list-output-of-child-requests
+# "List failed child requests":#list-failed-child-requests
+# "Get log of a child request":#get-log-of-a-child-request
+# "Create a collection sharing link":#sharing-link
+# "Combine two or more collections":#combine-two-or-more-collections
+# "Upload a file into a new collection":#upload-a-file-into-a-new-collection
+# "Download a file from a collection":#download-a-file-from-a-collection
+# "Copy files from a collection to a new collection":#copy-files-from-a-collection-to-a-new-collection
+# "Copy files from a collection to another collection":#copy-files-from-a-collection-to-another-collection
+# "Delete a file from an existing collection":#delete-a-file-from-an-existing-collection
+# "Listing records with paging":#listing-records-with-paging
+# "Querying the vocabulary definition":#querying-the-vocabulary-definition
+# "Translating between vocabulary identifiers and labels":#translating-between-vocabulary-identifiers-and-labels
+# "Create a Project":#create-a-project
+
+h2(#cancel-a-container-request). Cancel a container request
 
 {% codeblock as python %}
 import arvados
 arvados.api().container_requests().update(uuid=container_request_uuid, body={"priority": 0}).execute()
 {% endcodeblock %}
 
-h2. Cancel all container requests
+h2(#cancel-all-container-requests). Cancel all container requests
 
 {% codeblock as python %}
 import arvados
@@ -27,7 +49,7 @@ for container_request in result:
     api.container_requests().update(uuid=container_request["uuid"], body={"priority": 0}).execute()
 {% endcodeblock %}
 
-h2. List completed container requests
+h2(#list-completed-container-requests). List completed container requests
 
 {% codeblock as python %}
 import arvados
@@ -42,7 +64,7 @@ for container_request in result:
     print("%s, %s, %s" % (container_request["uuid"], container_request["name"], "Success" if container["exit_code"] == 0 else "Failed"))
 {% endcodeblock %}
 
-h2. Get input of a CWL workflow
+h2(#get-input-of-a-cwl-workflow). Get input of a CWL workflow
 
 {% codeblock as python %}
 import arvados
@@ -52,7 +74,7 @@ container_request = api.container_requests().get(uuid=container_request_uuid).ex
 print(container_request["mounts"]["/var/lib/cwl/cwl.input.json"])
 {% endcodeblock %}
 
-h2. Get output of a CWL workflow
+h2(#get-output-of-a-cwl-workflow). Get output of a CWL workflow
 
 {% codeblock as python %}
 import arvados
@@ -64,7 +86,7 @@ collection = arvados.collection.CollectionReader(container_request["output_uuid"
 print(collection.open("cwl.output.json").read())
 {% endcodeblock %}
 
-h2. Get state of a CWL workflow
+h2(#get-state-of-a-cwl-workflow). Get state of a CWL workflow
 
 {% codeblock as python %}
 import arvados
@@ -93,7 +115,7 @@ container_request_uuid = 'zzzzz-xvhdp-zzzzzzzzzzzzzzz'
 print(get_cr_state(container_request_uuid))
 {% endcodeblock %}
 
-h2. List input of child requests
+h2(#list-input-of-child-requests). List input of child requests
 
 {% codeblock as python %}
 import arvados
@@ -112,7 +134,7 @@ for c in child_requests["items"]:
             print("  %s" % m["portable_data_hash"])
 {% endcodeblock %}
 
-h2. List output of child requests
+h2(#list-output-of-child-requests). List output of child requests
 
 {% codeblock as python %}
 import arvados
@@ -131,7 +153,7 @@ for c in child_requests["items"]:
     print("%s -> %s" % (c["name"], uuid_to_pdh[c["output_uuid"]]))
 {% endcodeblock %}
 
-h2. List failed child requests
+h2(#list-failed-child-requests). List failed child requests
 
 {% codeblock as python %}
 import arvados
@@ -149,7 +171,7 @@ for c in cancelled_child_containers["items"]:
     print("%s (%s)" % (child_containers[c["uuid"]]["name"], child_containers[c["uuid"]]["uuid"]))
 {% endcodeblock %}
 
-h2. Get log of a child request
+h2(#get-log-of-a-child-request). Get log of a child request
 
 {% codeblock as python %}
 import arvados
@@ -177,7 +199,7 @@ token = api.api_client_authorizations().create(body={"api_client_authorization":
 print("%s/c=%s/t=%s/_/" % (download, collection_uuid, token["api_token"]))
 {% endcodeblock %}
 
-h2. Combine two or more collections
+h2(#combine-two-or-more-collections). Combine two or more collections
 
 Note, if two collections have files of the same name, the contents will be concatenated in the resulting manifest.
 
@@ -185,7 +207,7 @@ Note, if two collections have files of the same name, the contents will be conca
 import arvados
 import arvados.collection
 api = arvados.api()
-project_uuid = "zzzzz-tpzed-zzzzzzzzzzzzzzz"
+project_uuid = "zzzzz-j7d0g-zzzzzzzzzzzzzzz"
 collection_uuids = ["zzzzz-4zz18-aaaaaaaaaaaaaaa", "zzzzz-4zz18-bbbbbbbbbbbbbbb"]
 combined_manifest = ""
 for u in collection_uuids:
@@ -195,7 +217,7 @@ newcol = arvados.collection.Collection(combined_manifest)
 newcol.save_new(name="My combined collection", owner_uuid=project_uuid)
 {% endcodeblock %}
 
-h2. Upload a file into a new collection
+h2(#upload-a-file-into-a-new-collection). Upload a file into a new collection
 
 {% codeblock as python %}
 import arvados
@@ -217,7 +239,7 @@ c.save_new(name=collection_name, owner_uuid=project_uuid)
 print("Saved %s to %s" % (collection_name, c.manifest_locator()))
 {% endcodeblock %}
 
-h2. Download a file from a collection
+h2(#download-a-file-from-a-collection). Download a file from a collection
 
 {% codeblock as python %}
 import arvados
@@ -237,7 +259,7 @@ with c.open(filename, "rb") as reader:
 print("Finished downloading %s" % filename)
 {% endcodeblock %}
 
-h2. Copy files from a collection to a new collection
+h2(#copy-files-from-a-collection-to-a-new-collection). Copy files from a collection to a new collection
 
 {% codeblock as python %}
 import arvados.collection
@@ -258,7 +280,7 @@ target.save_new(name=target_name, owner_uuid=target_project)
 print("Created collection %s" % target.manifest_locator())
 {% endcodeblock %}
 
-h2. Copy files from a collection to another collection
+h2(#copy-files-from-a-collection-to-another-collection). Copy files from a collection to another collection
 
 {% codeblock as python %}
 import arvados.collection
@@ -277,7 +299,7 @@ for f in files_to_copy:
 target.save()
 {% endcodeblock %}
 
-h2. Delete a file from an existing collection
+h2(#delete-a-file-from-an-existing-collection). Delete a file from an existing collection
 
 {% codeblock as python %}
 import arvados
@@ -287,7 +309,7 @@ c.remove("file2.txt")
 c.save()
 {% endcodeblock %}
 
-h2. Listing records with paging
+h2(#listing-records-with-paging). Listing records with paging
 
 Use the @arvados.util.keyset_list_all@ helper method to iterate over all the records matching an optional filter.  This method handles paging internally and returns results incrementally using a Python iterator.  The first parameter of the method takes a @list@ method of an Arvados resource (@collections@, @container_requests@, etc).
 
@@ -299,7 +321,7 @@ for c in arvados.util.keyset_list_all(api.collections().list, filters=[["name",
     print("got collection " + c["uuid"])
 {% endcodeblock %}
 
-h2. Querying the vocabulary definition
+h2(#querying-the-vocabulary-definition). Querying the vocabulary definition
 
 The Python SDK provides facilities to interact with the "active metadata vocabulary":{{ site.baseurl }}/admin/metadata-vocabulary.html in the system. The developer can do key and value lookups in a case-insensitive manner:
 
@@ -319,7 +341,7 @@ voc['size']['Small'].identifier
 # Example output: 'IDVALSIZES2'
 {% endcodeblock %}
 
-h2. Translating between vocabulary identifiers and labels
+h2(#translating-between-vocabulary-identifiers-and-labels). Translating between vocabulary identifiers and labels
 
 Client software might need to present properties to the user in a human-readable form or take input from the user without requiring them to remember identifiers. For these cases, there're a couple of conversion methods that take a dictionary as input like this:
 
@@ -331,4 +353,22 @@ voc.convert_to_labels({'IDTAGIMPORTANCES': 'IDVALIMPORTANCES1'})
 # Example output: {'Importance': 'Critical'}
 voc.convert_to_identifiers({'creature': 'elephant'})
 # Example output: {'IDTAGANIMALS': 'IDVALANIMALS3'}
-{% endcodeblock %}
\ No newline at end of file
+{% endcodeblock %}
+
+h2(#create-a-project). Create a Project
+
+{% codeblock as python %}
+import arvados
+
+parent_project_uuid = "zzzzz-j7d0g-zzzzzzzzzzzzzzz"
+project_name = "My project"
+
+g = arvados.api().groups().create(body={
+  "group": {
+    "group_class": "project",
+    "owner_uuid": parent_project_uuid,
+    "name": project_name,
+  }}).execute()
+
+print("New project uuid is", g["uuid"])
+{% endcodeblock %}
index b23c6a12745088fc02c65cb670fc0a1936e30a45..816d0f99e2f685285f79de86c3952e6b88a4352f 100644 (file)
@@ -373,6 +373,18 @@ Clusters:
       # cluster.
       RoleGroupsVisibleToAll: true
 
+      # During each period, a log entry with event_type="activity"
+      # will be recorded for each user who is active during that
+      # period. The object_uuid attribute will indicate the user's
+      # UUID.
+      #
+      # Multiple log entries for the same user may be generated during
+      # a period if there are multiple controller processes or a
+      # controller process is restarted.
+      #
+      # Use 0 to disable activity logging.
+      ActivityLoggingPeriod: 24h
+
     AuditLogs:
       # Time to keep audit logs, in seconds. (An audit log is a row added
       # to the "logs" table in the PostgreSQL database each time an
@@ -611,21 +623,17 @@ Clusters:
         # Time to cache manifests, permission checks, and sessions.
         TTL: 300s
 
-        # Time to cache collection state.
-        UUIDTTL: 5s
-
         # Block cache entries. Each block consumes up to 64 MiB RAM.
         MaxBlockEntries: 20
 
-        # Collection cache entries.
-        MaxCollectionEntries: 1000
-
-        # Approximate memory limit (in bytes) for collection cache.
+        # Approximate memory limit (in bytes) for session cache.
+        #
+        # Note this applies to the in-memory representation of
+        # projects and collections -- metadata, block locators,
+        # filenames, etc. -- excluding cached file content, which is
+        # limited by MaxBlockEntries.
         MaxCollectionBytes: 100000000
 
-        # UUID cache entries.
-        MaxUUIDEntries: 1000
-
         # Persistent sessions.
         MaxSessions: 100
 
index c0a7921b36fdef66112591b18d450d11383afb50..d5c09d67061115a44cb5c8b5ef2a195fa7652734 100644 (file)
@@ -494,18 +494,9 @@ func (ldr *Loader) loadOldKeepWebConfig(cfg *arvados.Config) error {
        if oc.Cache.TTL != nil {
                cluster.Collections.WebDAVCache.TTL = *oc.Cache.TTL
        }
-       if oc.Cache.UUIDTTL != nil {
-               cluster.Collections.WebDAVCache.UUIDTTL = *oc.Cache.UUIDTTL
-       }
-       if oc.Cache.MaxCollectionEntries != nil {
-               cluster.Collections.WebDAVCache.MaxCollectionEntries = *oc.Cache.MaxCollectionEntries
-       }
        if oc.Cache.MaxCollectionBytes != nil {
                cluster.Collections.WebDAVCache.MaxCollectionBytes = *oc.Cache.MaxCollectionBytes
        }
-       if oc.Cache.MaxUUIDEntries != nil {
-               cluster.Collections.WebDAVCache.MaxUUIDEntries = *oc.Cache.MaxUUIDEntries
-       }
        if oc.AnonymousTokens != nil {
                if len(*oc.AnonymousTokens) > 0 {
                        cluster.Users.AnonymousUserToken = (*oc.AnonymousTokens)[0]
index 4206ef57717eebc494cd3593bdf2551cf2956178..f9b1d1661b1f3c16b745c7e7c6e9304f060e2c64 100644 (file)
@@ -199,10 +199,7 @@ func (s *LoadSuite) TestLegacyKeepWebConfig(c *check.C) {
        c.Check(cluster.SystemRootToken, check.Equals, "abcdefg")
 
        c.Check(cluster.Collections.WebDAVCache.TTL, check.Equals, arvados.Duration(60*time.Second))
-       c.Check(cluster.Collections.WebDAVCache.UUIDTTL, check.Equals, arvados.Duration(time.Second))
-       c.Check(cluster.Collections.WebDAVCache.MaxCollectionEntries, check.Equals, 42)
        c.Check(cluster.Collections.WebDAVCache.MaxCollectionBytes, check.Equals, int64(1234567890))
-       c.Check(cluster.Collections.WebDAVCache.MaxUUIDEntries, check.Equals, 100)
 
        c.Check(cluster.Services.WebDAVDownload.ExternalURL, check.Equals, arvados.URL{Host: "download.example.com", Path: "/"})
        c.Check(cluster.Services.WebDAVDownload.InternalURLs[arvados.URL{Host: ":80"}], check.NotNil)
index a55295d1268b7cc0829ec2fe5073eb276c5b4564..fb17a45c84a82fe4568777065fe23684c49a4603 100644 (file)
@@ -226,6 +226,7 @@ var whitelist = map[string]bool{
        "TLS.Key":                                             false,
        "Users":                                               true,
        "Users.ActivatedUsersAreVisibleToOthers":              false,
+       "Users.ActivityLoggingPeriod":                         false,
        "Users.AdminNotifierEmailFrom":                        false,
        "Users.AnonymousUserToken":                            true,
        "Users.AutoAdminFirstUser":                            false,
index ffb150bf26aa148b511f4bbde98305469ffef5df..89f68a5ef1848aab0579ace235a60c92a3c05879 100644 (file)
@@ -276,6 +276,9 @@ func (conn *Conn) CollectionGet(ctx context.Context, options arvados.GetOptions)
                }
                return c, err
        }
+       if len(options.UUID) < 34 || options.UUID[32] != '+' {
+               return arvados.Collection{}, httpErrorf(http.StatusNotFound, "invalid UUID or PDH %q", options.UUID)
+       }
        // UUID is a PDH
        first := make(chan arvados.Collection, 1)
        err := conn.tryLocalThenRemotes(ctx, options.ForwardedFor, func(ctx context.Context, remoteID string, be backend) error {
index 665fd5c636372fc4a21bd7de68c5d886aafbcc7c..e9c56db4d4b112b906dbaf36dd21b9a7a1300d98 100644 (file)
@@ -101,7 +101,10 @@ func (h *Handler) setup() {
        h.federation = federation.New(h.Cluster, &healthFuncs)
        rtr := router.New(h.federation, router.Config{
                MaxRequestSize: h.Cluster.API.MaxRequestSize,
-               WrapCalls:      api.ComposeWrappers(ctrlctx.WrapCallsInTransactions(h.db), oidcAuthorizer.WrapCalls),
+               WrapCalls: api.ComposeWrappers(
+                       ctrlctx.WrapCallsInTransactions(h.db),
+                       oidcAuthorizer.WrapCalls,
+                       ctrlctx.WrapCallsWithAuth(h.Cluster)),
        })
 
        healthRoutes := health.Routes{"ping": func() error { _, err := h.db(context.TODO()); return err }}
index 39c2b1c68e5c82921e10bc9125d54e17846a8fed..127e6c34c6238ca48487f5cbb72ca1107bfed7da 100644 (file)
@@ -19,6 +19,7 @@ import (
        "testing"
        "time"
 
+       "git.arvados.org/arvados.git/lib/controller/rpc"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadostest"
        "git.arvados.org/arvados.git/sdk/go/auth"
@@ -494,3 +495,38 @@ func (s *HandlerSuite) TestTrashSweep(c *check.C) {
                time.Sleep(time.Second / 10)
        }
 }
+
+func (s *HandlerSuite) TestLogActivity(c *check.C) {
+       s.cluster.SystemRootToken = arvadostest.SystemRootToken
+       s.cluster.Users.ActivityLoggingPeriod = arvados.Duration(24 * time.Hour)
+       s.handler.CheckHealth()
+
+       testServer := newServerFromIntegrationTestEnv(c)
+       testServer.Server.Handler = httpserver.AddRequestIDs(httpserver.LogRequests(s.handler))
+       c.Assert(testServer.Start(), check.IsNil)
+       defer testServer.Close()
+
+       u, _ := url.Parse("http://" + testServer.Addr)
+       client := rpc.NewConn(s.cluster.ClusterID, u, true, rpc.PassthroughTokenProvider)
+
+       starttime := time.Now()
+       for i := 0; i < 4; i++ {
+               for _, token := range []string{
+                       arvadostest.ActiveTokenV2,
+                       arvadostest.ActiveToken,
+                       arvadostest.SpectatorToken,
+               } {
+                       ctx := auth.NewContext(s.ctx, &auth.Credentials{Tokens: []string{token}})
+                       _, err := client.CollectionList(ctx, arvados.ListOptions{})
+                       c.Assert(err, check.IsNil)
+               }
+       }
+       db, err := s.handler.db(s.ctx)
+       c.Assert(err, check.IsNil)
+       for _, userUUID := range []string{arvadostest.ActiveUserUUID, arvadostest.SpectatorUserUUID} {
+               var rows int
+               err = db.QueryRowContext(s.ctx, `select count(uuid) from logs where object_uuid = $1 and event_at > $2`, arvadostest.ActiveUserUUID, starttime.UTC()).Scan(&rows)
+               c.Assert(err, check.IsNil)
+               c.Check(rows, check.Equals, 1, check.Commentf("expect 1 row for user uuid %s", userUUID))
+       }
+}
index 868e466e9e281bf7f4f5eaf8b4f7a530956653cf..581595e5e3818a56b4194adc47834e87035a3ce8 100644 (file)
@@ -22,6 +22,7 @@ import (
 // CollectionGet defers to railsProxy for everything except blob
 // signatures.
 func (conn *Conn) CollectionGet(ctx context.Context, opts arvados.GetOptions) (arvados.Collection, error) {
+       conn.logActivity(ctx)
        if len(opts.Select) > 0 {
                // We need to know IsTrashed and TrashAt to implement
                // signing properly, even if the caller doesn't want
@@ -39,6 +40,7 @@ func (conn *Conn) CollectionGet(ctx context.Context, opts arvados.GetOptions) (a
 // CollectionList defers to railsProxy for everything except blob
 // signatures.
 func (conn *Conn) CollectionList(ctx context.Context, opts arvados.ListOptions) (arvados.CollectionList, error) {
+       conn.logActivity(ctx)
        if len(opts.Select) > 0 {
                // We need to know IsTrashed and TrashAt to implement
                // signing properly, even if the caller doesn't want
@@ -58,6 +60,7 @@ func (conn *Conn) CollectionList(ctx context.Context, opts arvados.ListOptions)
 // CollectionCreate defers to railsProxy for everything except blob
 // signatures and vocabulary checking.
 func (conn *Conn) CollectionCreate(ctx context.Context, opts arvados.CreateOptions) (arvados.Collection, error) {
+       conn.logActivity(ctx)
        err := conn.checkProperties(ctx, opts.Attrs["properties"])
        if err != nil {
                return arvados.Collection{}, err
@@ -82,6 +85,7 @@ func (conn *Conn) CollectionCreate(ctx context.Context, opts arvados.CreateOptio
 // CollectionUpdate defers to railsProxy for everything except blob
 // signatures and vocabulary checking.
 func (conn *Conn) CollectionUpdate(ctx context.Context, opts arvados.UpdateOptions) (arvados.Collection, error) {
+       conn.logActivity(ctx)
        err := conn.checkProperties(ctx, opts.Attrs["properties"])
        if err != nil {
                return arvados.Collection{}, err
index a36822ad6b1f5df1f73ffbc3536d76a7215f1817..5a3faa72790899aa69ba4408fbf47df7997c27af 100644 (file)
@@ -10,7 +10,6 @@ import (
        "fmt"
        "net/http"
        "os"
-       "strings"
        "sync"
        "time"
 
@@ -33,8 +32,11 @@ type Conn struct {
        lastVocabularyRefreshCheck time.Time
        lastVocabularyError        error
        loginController
-       gwTunnels     map[string]*yamux.Session
-       gwTunnelsLock sync.Mutex
+       gwTunnels        map[string]*yamux.Session
+       gwTunnelsLock    sync.Mutex
+       activeUsers      map[string]bool
+       activeUsersLock  sync.Mutex
+       activeUsersReset time.Time
 }
 
 func NewConn(cluster *arvados.Cluster) *Conn {
@@ -163,54 +165,6 @@ func (conn *Conn) UserAuthenticate(ctx context.Context, opts arvados.UserAuthent
        return conn.loginController.UserAuthenticate(ctx, opts)
 }
 
-func (conn *Conn) GroupContents(ctx context.Context, options arvados.GroupContentsOptions) (arvados.ObjectList, error) {
-       // The requested UUID can be a user (virtual home project), which we just pass on to
-       // the API server.
-       if strings.Index(options.UUID, "-j7d0g-") != 5 {
-               return conn.railsProxy.GroupContents(ctx, options)
-       }
-
-       var resp arvados.ObjectList
-
-       // Get the group object
-       respGroup, err := conn.GroupGet(ctx, arvados.GetOptions{UUID: options.UUID})
-       if err != nil {
-               return resp, err
-       }
-
-       // If the group has groupClass 'filter', apply the filters before getting the contents.
-       if respGroup.GroupClass == "filter" {
-               if filters, ok := respGroup.Properties["filters"].([]interface{}); ok {
-                       for _, f := range filters {
-                               // f is supposed to be a []string
-                               tmp, ok2 := f.([]interface{})
-                               if !ok2 || len(tmp) < 3 {
-                                       return resp, fmt.Errorf("filter unparsable: %T, %+v, original field: %T, %+v\n", tmp, tmp, f, f)
-                               }
-                               var filter arvados.Filter
-                               if attr, ok2 := tmp[0].(string); ok2 {
-                                       filter.Attr = attr
-                               } else {
-                                       return resp, fmt.Errorf("filter unparsable: attribute must be string: %T, %+v, filter: %T, %+v\n", tmp[0], tmp[0], f, f)
-                               }
-                               if operator, ok2 := tmp[1].(string); ok2 {
-                                       filter.Operator = operator
-                               } else {
-                                       return resp, fmt.Errorf("filter unparsable: operator must be string: %T, %+v, filter: %T, %+v\n", tmp[1], tmp[1], f, f)
-                               }
-                               filter.Operand = tmp[2]
-                               options.Filters = append(options.Filters, filter)
-                       }
-               } else {
-                       return resp, fmt.Errorf("filter unparsable: not an array\n")
-               }
-               // Use the generic /groups/contents endpoint for filter groups
-               options.UUID = ""
-       }
-
-       return conn.railsProxy.GroupContents(ctx, options)
-}
-
 func httpErrorf(code int, format string, args ...interface{}) error {
        return httpserver.ErrorWithStatus(fmt.Errorf(format, args...), code)
 }
index 5b2ce95da99c2c8195ce2ceac6aaa925a75d2e0f..49e21840ea206f69684738e2f9aec98f0f6c2fd3 100644 (file)
@@ -13,6 +13,7 @@ import (
 // ContainerRequestCreate defers to railsProxy for everything except
 // vocabulary checking.
 func (conn *Conn) ContainerRequestCreate(ctx context.Context, opts arvados.CreateOptions) (arvados.ContainerRequest, error) {
+       conn.logActivity(ctx)
        err := conn.checkProperties(ctx, opts.Attrs["properties"])
        if err != nil {
                return arvados.ContainerRequest{}, err
@@ -27,6 +28,7 @@ func (conn *Conn) ContainerRequestCreate(ctx context.Context, opts arvados.Creat
 // ContainerRequestUpdate defers to railsProxy for everything except
 // vocabulary checking.
 func (conn *Conn) ContainerRequestUpdate(ctx context.Context, opts arvados.UpdateOptions) (arvados.ContainerRequest, error) {
+       conn.logActivity(ctx)
        err := conn.checkProperties(ctx, opts.Attrs["properties"])
        if err != nil {
                return arvados.ContainerRequest{}, err
@@ -37,3 +39,18 @@ func (conn *Conn) ContainerRequestUpdate(ctx context.Context, opts arvados.Updat
        }
        return resp, nil
 }
+
+func (conn *Conn) ContainerRequestGet(ctx context.Context, opts arvados.GetOptions) (arvados.ContainerRequest, error) {
+       conn.logActivity(ctx)
+       return conn.railsProxy.ContainerRequestGet(ctx, opts)
+}
+
+func (conn *Conn) ContainerRequestList(ctx context.Context, opts arvados.ListOptions) (arvados.ContainerRequestList, error) {
+       conn.logActivity(ctx)
+       return conn.railsProxy.ContainerRequestList(ctx, opts)
+}
+
+func (conn *Conn) ContainerRequestDelete(ctx context.Context, opts arvados.DeleteOptions) (arvados.ContainerRequest, error) {
+       conn.logActivity(ctx)
+       return conn.railsProxy.ContainerRequestDelete(ctx, opts)
+}
index 0d77bdbd9ce792a28ba436ae05ba95bc7d1ce0f0..418fd6b8b7ec5bcc955c62c0d1207b66284d587b 100644 (file)
@@ -6,6 +6,8 @@ package localdb
 
 import (
        "context"
+       "fmt"
+       "strings"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
 )
@@ -13,6 +15,7 @@ import (
 // GroupCreate defers to railsProxy for everything except vocabulary
 // checking.
 func (conn *Conn) GroupCreate(ctx context.Context, opts arvados.CreateOptions) (arvados.Group, error) {
+       conn.logActivity(ctx)
        err := conn.checkProperties(ctx, opts.Attrs["properties"])
        if err != nil {
                return arvados.Group{}, err
@@ -24,9 +27,15 @@ func (conn *Conn) GroupCreate(ctx context.Context, opts arvados.CreateOptions) (
        return resp, nil
 }
 
+func (conn *Conn) GroupGet(ctx context.Context, opts arvados.GetOptions) (arvados.Group, error) {
+       conn.logActivity(ctx)
+       return conn.railsProxy.GroupGet(ctx, opts)
+}
+
 // GroupUpdate defers to railsProxy for everything except vocabulary
 // checking.
 func (conn *Conn) GroupUpdate(ctx context.Context, opts arvados.UpdateOptions) (arvados.Group, error) {
+       conn.logActivity(ctx)
        err := conn.checkProperties(ctx, opts.Attrs["properties"])
        if err != nil {
                return arvados.Group{}, err
@@ -37,3 +46,63 @@ func (conn *Conn) GroupUpdate(ctx context.Context, opts arvados.UpdateOptions) (
        }
        return resp, nil
 }
+
+func (conn *Conn) GroupList(ctx context.Context, opts arvados.ListOptions) (arvados.GroupList, error) {
+       conn.logActivity(ctx)
+       return conn.railsProxy.GroupList(ctx, opts)
+}
+
+func (conn *Conn) GroupDelete(ctx context.Context, opts arvados.DeleteOptions) (arvados.Group, error) {
+       conn.logActivity(ctx)
+       return conn.railsProxy.GroupDelete(ctx, opts)
+}
+
+func (conn *Conn) GroupContents(ctx context.Context, options arvados.GroupContentsOptions) (arvados.ObjectList, error) {
+       conn.logActivity(ctx)
+
+       // The requested UUID can be a user (virtual home project), which we just pass on to
+       // the API server.
+       if strings.Index(options.UUID, "-j7d0g-") != 5 {
+               return conn.railsProxy.GroupContents(ctx, options)
+       }
+
+       var resp arvados.ObjectList
+
+       // Get the group object
+       respGroup, err := conn.GroupGet(ctx, arvados.GetOptions{UUID: options.UUID})
+       if err != nil {
+               return resp, err
+       }
+
+       // If the group has groupClass 'filter', apply the filters before getting the contents.
+       if respGroup.GroupClass == "filter" {
+               if filters, ok := respGroup.Properties["filters"].([]interface{}); ok {
+                       for _, f := range filters {
+                               // f is supposed to be a []string
+                               tmp, ok2 := f.([]interface{})
+                               if !ok2 || len(tmp) < 3 {
+                                       return resp, fmt.Errorf("filter unparsable: %T, %+v, original field: %T, %+v\n", tmp, tmp, f, f)
+                               }
+                               var filter arvados.Filter
+                               if attr, ok2 := tmp[0].(string); ok2 {
+                                       filter.Attr = attr
+                               } else {
+                                       return resp, fmt.Errorf("filter unparsable: attribute must be string: %T, %+v, filter: %T, %+v\n", tmp[0], tmp[0], f, f)
+                               }
+                               if operator, ok2 := tmp[1].(string); ok2 {
+                                       filter.Operator = operator
+                               } else {
+                                       return resp, fmt.Errorf("filter unparsable: operator must be string: %T, %+v, filter: %T, %+v\n", tmp[1], tmp[1], f, f)
+                               }
+                               filter.Operand = tmp[2]
+                               options.Filters = append(options.Filters, filter)
+                       }
+               } else {
+                       return resp, fmt.Errorf("filter unparsable: not an array\n")
+               }
+               // Use the generic /groups/contents endpoint for filter groups
+               options.UUID = ""
+       }
+
+       return conn.railsProxy.GroupContents(ctx, options)
+}
diff --git a/lib/controller/localdb/log_activity.go b/lib/controller/localdb/log_activity.go
new file mode 100644 (file)
index 0000000..9c9660a
--- /dev/null
@@ -0,0 +1,117 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package localdb
+
+import (
+       "context"
+       "time"
+
+       "git.arvados.org/arvados.git/lib/ctrlctx"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+)
+
+func (conn *Conn) logActivity(ctx context.Context) {
+       p := conn.cluster.Users.ActivityLoggingPeriod.Duration()
+       if p < 1 {
+               ctxlog.FromContext(ctx).Debug("logActivity disabled by config")
+               return
+       }
+       user, _, err := ctrlctx.CurrentAuth(ctx)
+       if err == ctrlctx.ErrUnauthenticated {
+               ctxlog.FromContext(ctx).Debug("logActivity skipped for unauthenticated request")
+               return
+       } else if err != nil {
+               ctxlog.FromContext(ctx).WithError(err).Error("logActivity CurrentAuth failed")
+               return
+       }
+       now := time.Now()
+       conn.activeUsersLock.Lock()
+       if conn.activeUsers == nil || conn.activeUsersReset.IsZero() || conn.activeUsersReset.Before(now) {
+               conn.activeUsersReset = alignedPeriod(now, p)
+               conn.activeUsers = map[string]bool{}
+       }
+       logged := conn.activeUsers[user.UUID]
+       if !logged {
+               // Prevent other concurrent calls from logging about
+               // this user until we finish.
+               conn.activeUsers[user.UUID] = true
+       }
+       conn.activeUsersLock.Unlock()
+       if logged {
+               return
+       }
+       defer func() {
+               // If we return without logging, reset the flag so we
+               // try again on the user's next API call.
+               if !logged {
+                       conn.activeUsersLock.Lock()
+                       conn.activeUsers[user.UUID] = false
+                       conn.activeUsersLock.Unlock()
+               }
+       }()
+
+       tx, err := ctrlctx.NewTx(ctx)
+       if err != nil {
+               ctxlog.FromContext(ctx).WithError(err).Error("logActivity NewTx failed")
+               return
+       }
+       defer tx.Rollback()
+       _, err = tx.ExecContext(ctx, `
+insert into logs
+ (uuid,
+  owner_uuid, modified_by_user_uuid, object_owner_uuid,
+  event_type,
+  summary,
+  object_uuid,
+  properties,
+  event_at, created_at, updated_at, modified_at)
+ values
+ ($1, $2, $2, $2, $3, $4, $5, $6,
+  current_timestamp at time zone 'UTC',
+  current_timestamp at time zone 'UTC',
+  current_timestamp at time zone 'UTC',
+  current_timestamp at time zone 'UTC')
+ returning id`,
+               arvados.RandomUUID(conn.cluster.ClusterID, "57u5n"),
+               conn.cluster.ClusterID+"-tpzed-000000000000000", // both modified_by and object_owner
+               "activity",
+               "activity of "+user.UUID,
+               user.UUID,
+               "{}")
+       if err != nil {
+               ctxlog.FromContext(ctx).WithError(err).Error("logActivity query failed")
+               return
+       }
+       err = tx.Commit()
+       if err != nil {
+               ctxlog.FromContext(ctx).WithError(err).Error("logActivity commit failed")
+               return
+       }
+       logged = true
+}
+
+// alignedPeriod computes a time interval that includes now and aligns
+// to local clock times that are multiples of p. For example, if local
+// time is UTC-5 and ActivityLoggingPeriod=4h, periodStart and
+// periodEnd will be 0000-0400, 0400-0800, etc., in local time. If p
+// is a multiple of 24h, periods will start and end at midnight.
+//
+// If DST starts or ends during this period, the boundaries will be
+// aligned based on either DST or non-DST time depending on whether
+// now is before or after the DST transition. The consequences are
+// presumed to be inconsequential, e.g., logActivity may unnecessarily
+// log activity more than once in a period that includes a DST
+// transition.
+//
+// In all cases, the period ends in the future.
+//
+// Only the end of the period is returned.
+func alignedPeriod(now time.Time, p time.Duration) time.Time {
+       _, tzsec := now.Zone()
+       tzoff := time.Duration(tzsec) * time.Second
+       periodStart := now.Add(tzoff).Truncate(p).Add(-tzoff)
+       return periodStart.Add(p)
+}
diff --git a/lib/controller/localdb/log_activity_test.go b/lib/controller/localdb/log_activity_test.go
new file mode 100644 (file)
index 0000000..ea7f234
--- /dev/null
@@ -0,0 +1,87 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package localdb
+
+import (
+       "context"
+       "database/sql"
+       "time"
+
+       "git.arvados.org/arvados.git/lib/controller/api"
+       "git.arvados.org/arvados.git/lib/ctrlctx"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadostest"
+       "git.arvados.org/arvados.git/sdk/go/auth"
+       "github.com/jmoiron/sqlx"
+       check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&activityPeriodSuite{})
+
+type activityPeriodSuite struct{}
+
+// The important thing is that, even when daylight savings time is
+// making things difficult, the current period ends in the future.
+func (*activityPeriodSuite) TestPeriod(c *check.C) {
+       toronto, err := time.LoadLocation("America/Toronto")
+       c.Assert(err, check.IsNil)
+
+       format := "2006-01-02 15:04:05 MST"
+       dststartday, err := time.ParseInLocation(format, "2022-03-13 00:00:00 EST", toronto)
+       c.Assert(err, check.IsNil)
+       dstendday, err := time.ParseInLocation(format, "2022-11-06 00:00:00 EDT", toronto)
+       c.Assert(err, check.IsNil)
+
+       for _, period := range []time.Duration{
+               time.Minute * 13,
+               time.Minute * 49,
+               time.Hour,
+               4 * time.Hour,
+               48 * time.Hour,
+       } {
+               for offset := time.Duration(0); offset < 48*time.Hour; offset += 3 * time.Minute {
+                       t := dststartday.Add(offset)
+                       end := alignedPeriod(t, period)
+                       c.Check(end.After(t), check.Equals, true, check.Commentf("period %v offset %v", period, offset))
+
+                       t = dstendday.Add(offset)
+                       end = alignedPeriod(t, period)
+                       c.Check(end.After(t), check.Equals, true, check.Commentf("period %v offset %v", period, offset))
+               }
+       }
+}
+
+func (s *CollectionSuite) TestLogActivity(c *check.C) {
+       starttime := time.Now()
+       s.localdb.activeUsersLock.Lock()
+       s.localdb.activeUsersReset = starttime
+       s.localdb.activeUsersLock.Unlock()
+       db := arvadostest.DB(c, s.cluster)
+       wrap := api.ComposeWrappers(
+               ctrlctx.WrapCallsInTransactions(func(ctx context.Context) (*sqlx.DB, error) { return db, nil }),
+               ctrlctx.WrapCallsWithAuth(s.cluster))
+       collectionCreate := wrap(func(ctx context.Context, opts interface{}) (interface{}, error) {
+               return s.localdb.CollectionCreate(ctx, opts.(arvados.CreateOptions))
+       })
+       ctx := auth.NewContext(context.Background(), &auth.Credentials{Tokens: []string{arvadostest.ActiveTokenV2}})
+       for i := 0; i < 2; i++ {
+               logthreshold := time.Now()
+               _, err := collectionCreate(ctx, arvados.CreateOptions{
+                       Attrs: map[string]interface{}{
+                               "name": "test collection",
+                       },
+                       EnsureUniqueName: true,
+               })
+               c.Assert(err, check.IsNil)
+               var uuid string
+               err = db.QueryRowContext(ctx, `select uuid from logs where object_uuid = $1 and event_at > $2`, arvadostest.ActiveUserUUID, logthreshold.UTC()).Scan(&uuid)
+               if i == 0 {
+                       c.Check(err, check.IsNil)
+                       c.Check(uuid, check.HasLen, 27)
+               } else {
+                       c.Check(err, check.Equals, sql.ErrNoRows)
+               }
+       }
+}
diff --git a/lib/ctrlctx/auth.go b/lib/ctrlctx/auth.go
new file mode 100644 (file)
index 0000000..f4c472f
--- /dev/null
@@ -0,0 +1,190 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package ctrlctx
+
+import (
+       "context"
+       "crypto/hmac"
+       "crypto/sha256"
+       "database/sql"
+       "errors"
+       "fmt"
+       "io"
+       "strings"
+       "sync"
+       "time"
+
+       "git.arvados.org/arvados.git/lib/controller/api"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/auth"
+       "github.com/ghodss/yaml"
+)
+
+var (
+       ErrNoAuthContext   = errors.New("bug: there is no authorization in this context")
+       ErrUnauthenticated = errors.New("unauthenticated request")
+)
+
+// WrapCallsWithAuth returns a call wrapper (suitable for assigning to
+// router.router.WrapCalls) that makes CurrentUser(ctx) et al. work
+// from inside the wrapped functions.
+//
+// The incoming context must come from WrapCallsInTransactions or
+// NewWithTransaction.
+func WrapCallsWithAuth(cluster *arvados.Cluster) func(api.RoutableFunc) api.RoutableFunc {
+       var authcache authcache
+       return func(origFunc api.RoutableFunc) api.RoutableFunc {
+               return func(ctx context.Context, opts interface{}) (_ interface{}, err error) {
+                       var tokens []string
+                       if creds, ok := auth.FromContext(ctx); ok {
+                               tokens = creds.Tokens
+                       }
+                       return origFunc(context.WithValue(ctx, contextKeyAuth, &authcontext{
+                               authcache: &authcache,
+                               cluster:   cluster,
+                               tokens:    tokens,
+                       }), opts)
+               }
+       }
+}
+
+// CurrentAuth returns the arvados.User whose privileges should be
+// used in the given context, and the arvados.APIClientAuthorization
+// the caller presented in order to authenticate the current request.
+//
+// Returns ErrUnauthenticated if the current request was not
+// authenticated (no token provided, token is expired, etc).
+func CurrentAuth(ctx context.Context) (*arvados.User, *arvados.APIClientAuthorization, error) {
+       ac, ok := ctx.Value(contextKeyAuth).(*authcontext)
+       if !ok {
+               return nil, nil, ErrNoAuthContext
+       }
+       ac.lookupOnce.Do(func() {
+               // We only validate/lookup the token once per API
+               // call, even though authcache should be efficient
+               // enough to do a lookup each time. This guarantees we
+               // always return the same result when called multiple
+               // times in the course of handling a single API call.
+               for _, token := range ac.tokens {
+                       user, aca, err := ac.authcache.lookup(ctx, ac.cluster, token)
+                       if err != nil {
+                               ac.err = err
+                               return
+                       }
+                       if user != nil {
+                               ac.user, ac.apiClientAuthorization = user, aca
+                               return
+                       }
+               }
+               ac.err = ErrUnauthenticated
+       })
+       return ac.user, ac.apiClientAuthorization, ac.err
+}
+
+type contextKeyA string
+
+var contextKeyAuth = contextKeyT("auth")
+
+type authcontext struct {
+       authcache              *authcache
+       cluster                *arvados.Cluster
+       tokens                 []string
+       user                   *arvados.User
+       apiClientAuthorization *arvados.APIClientAuthorization
+       err                    error
+       lookupOnce             sync.Once
+}
+
+var authcacheTTL = time.Minute
+
+type authcacheent struct {
+       expireTime             time.Time
+       apiClientAuthorization arvados.APIClientAuthorization
+       user                   arvados.User
+}
+
+type authcache struct {
+       mtx         sync.Mutex
+       entries     map[string]*authcacheent
+       nextCleanup time.Time
+}
+
+// lookup returns the user and aca info for a given token. Returns nil
+// if the token is not valid. Returns a non-nil error if there was an
+// unexpected error from the database, etc.
+func (ac *authcache) lookup(ctx context.Context, cluster *arvados.Cluster, token string) (*arvados.User, *arvados.APIClientAuthorization, error) {
+       ac.mtx.Lock()
+       ent := ac.entries[token]
+       ac.mtx.Unlock()
+       if ent != nil && ent.expireTime.After(time.Now()) {
+               return &ent.user, &ent.apiClientAuthorization, nil
+       }
+       if token == "" {
+               return nil, nil, nil
+       }
+       tx, err := CurrentTx(ctx)
+       if err != nil {
+               return nil, nil, err
+       }
+       var aca arvados.APIClientAuthorization
+       var user arvados.User
+
+       var cond string
+       var args []interface{}
+       if len(token) > 30 && strings.HasPrefix(token, "v2/") && token[30] == '/' {
+               fields := strings.Split(token, "/")
+               cond = `aca.uuid = $1 and aca.api_token = $2`
+               args = []interface{}{fields[1], fields[2]}
+       } else {
+               // Bare token or OIDC access token
+               mac := hmac.New(sha256.New, []byte(cluster.SystemRootToken))
+               io.WriteString(mac, token)
+               hmac := fmt.Sprintf("%x", mac.Sum(nil))
+               cond = `aca.api_token in ($1, $2)`
+               args = []interface{}{token, hmac}
+       }
+       var expiresAt sql.NullTime
+       var scopesYAML []byte
+       err = tx.QueryRowContext(ctx, `
+select aca.uuid, aca.expires_at, aca.api_token, aca.scopes, users.uuid, users.is_active, users.is_admin
+ from api_client_authorizations aca
+ left join users on aca.user_id = users.id
+ where `+cond+`
+ and (expires_at is null or expires_at > current_timestamp at time zone 'UTC')`, args...).Scan(
+               &aca.UUID, &expiresAt, &aca.APIToken, &scopesYAML,
+               &user.UUID, &user.IsActive, &user.IsAdmin)
+       if err == sql.ErrNoRows {
+               return nil, nil, nil
+       } else if err != nil {
+               return nil, nil, err
+       }
+       aca.ExpiresAt = expiresAt.Time
+       if len(scopesYAML) > 0 {
+               err = yaml.Unmarshal(scopesYAML, &aca.Scopes)
+               if err != nil {
+                       return nil, nil, fmt.Errorf("loading scopes for %s: %w", aca.UUID, err)
+               }
+       }
+       ent = &authcacheent{
+               expireTime:             time.Now().Add(authcacheTTL),
+               apiClientAuthorization: aca,
+               user:                   user,
+       }
+       ac.mtx.Lock()
+       defer ac.mtx.Unlock()
+       if ac.entries == nil {
+               ac.entries = map[string]*authcacheent{}
+       }
+       if ac.nextCleanup.IsZero() || ac.nextCleanup.Before(time.Now()) {
+               for token, ent := range ac.entries {
+                       if !ent.expireTime.After(time.Now()) {
+                               delete(ac.entries, token)
+                       }
+               }
+               ac.nextCleanup = time.Now().Add(authcacheTTL)
+       }
+       ac.entries[token] = ent
+       return &ent.user, &ent.apiClientAuthorization, nil
+}
diff --git a/lib/ctrlctx/auth_test.go b/lib/ctrlctx/auth_test.go
new file mode 100644 (file)
index 0000000..e6803e5
--- /dev/null
@@ -0,0 +1,83 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package ctrlctx
+
+import (
+       "context"
+
+       "git.arvados.org/arvados.git/lib/config"
+       "git.arvados.org/arvados.git/sdk/go/arvadostest"
+       "git.arvados.org/arvados.git/sdk/go/auth"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "github.com/jmoiron/sqlx"
+       _ "github.com/lib/pq"
+       check "gopkg.in/check.v1"
+)
+
+func (*DatabaseSuite) TestAuthContext(c *check.C) {
+       cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+       c.Assert(err, check.IsNil)
+       cluster, err := cfg.GetCluster("")
+       c.Assert(err, check.IsNil)
+
+       getter := func(context.Context) (*sqlx.DB, error) {
+               return sqlx.Open("postgres", cluster.PostgreSQL.Connection.String())
+       }
+       authwrapper := WrapCallsWithAuth(cluster)
+       dbwrapper := WrapCallsInTransactions(getter)
+
+       // valid tokens
+       for _, token := range []string{
+               arvadostest.ActiveToken,
+               arvadostest.ActiveTokenV2,
+               arvadostest.ActiveTokenV2 + "/asdfasdfasdf",
+               arvadostest.ActiveTokenV2, // cached
+       } {
+               ok, err := dbwrapper(authwrapper(func(ctx context.Context, opts interface{}) (interface{}, error) {
+                       user, aca, err := CurrentAuth(ctx)
+                       if c.Check(err, check.IsNil) {
+                               c.Check(user.UUID, check.Equals, "zzzzz-tpzed-xurymjxw79nv3jz")
+                               c.Check(aca.UUID, check.Equals, "zzzzz-gj3su-077z32aux8dg2s1")
+                               c.Check(aca.Scopes, check.DeepEquals, []string{"all"})
+                       }
+                       return true, nil
+               }))(auth.NewContext(context.Background(), auth.NewCredentials(token)), "blah")
+               c.Check(ok, check.Equals, true)
+               c.Check(err, check.IsNil)
+       }
+
+       // bad tokens
+       for _, token := range []string{
+               arvadostest.ActiveToken + "X",
+               arvadostest.ActiveTokenV2 + "X",
+               arvadostest.ActiveTokenV2[:30], // "v2/{uuid}"
+               arvadostest.ActiveTokenV2[:31], // "v2/{uuid}/"
+               "bogus",
+               "",
+       } {
+               ok, err := dbwrapper(authwrapper(func(ctx context.Context, opts interface{}) (interface{}, error) {
+                       user, aca, err := CurrentAuth(ctx)
+                       c.Check(err, check.Equals, ErrUnauthenticated)
+                       c.Check(user, check.IsNil)
+                       c.Check(aca, check.IsNil)
+                       return true, err
+               }))(auth.NewContext(context.Background(), auth.NewCredentials(token)), "blah")
+               c.Check(ok, check.Equals, true)
+               c.Check(err, check.Equals, ErrUnauthenticated)
+       }
+
+       // no auth context
+       {
+               ok, err := dbwrapper(authwrapper(func(ctx context.Context, opts interface{}) (interface{}, error) {
+                       user, aca, err := CurrentAuth(ctx)
+                       c.Check(err, check.Equals, ErrUnauthenticated)
+                       c.Check(user, check.IsNil)
+                       c.Check(aca, check.IsNil)
+                       return true, err
+               }))(context.Background(), "blah")
+               c.Check(ok, check.Equals, true)
+               c.Check(err, check.Equals, ErrUnauthenticated)
+       }
+}
index 36d79d3d2ef89ac9819d12e3f4e2f175c96426bd..a76420860604b9a6fb9823bdc6b3775c70f85ff4 100644 (file)
@@ -12,6 +12,7 @@ import (
        "git.arvados.org/arvados.git/lib/controller/api"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "github.com/jmoiron/sqlx"
+
        // sqlx needs lib/pq to talk to PostgreSQL
        _ "github.com/lib/pq"
 )
@@ -107,6 +108,26 @@ func New(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) (co
        }
 }
 
+// NewTx starts a new transaction. The caller is responsible for
+// calling Commit or Rollback. This is suitable for database queries
+// that are separate from the API transaction (see CurrentTx), e.g.,
+// ones that will be committed even if the API call fails, or held
+// open after the API call finishes.
+func NewTx(ctx context.Context) (*sqlx.Tx, error) {
+       txn, ok := ctx.Value(contextKeyTransaction).(*transaction)
+       if !ok {
+               return nil, ErrNoTransaction
+       }
+       db, err := txn.getdb(ctx)
+       if err != nil {
+               return nil, err
+       }
+       return db.Beginx()
+}
+
+// CurrentTx returns a transaction that will be committed after the
+// current API call completes, or rolled back if the current API call
+// returns an error.
 func CurrentTx(ctx context.Context) (*sqlx.Tx, error) {
        txn, ok := ctx.Value(contextKeyTransaction).(*transaction)
        if !ok {
index 799abf9da4e278bc7f2f4150e7f284c991c677c4..3a2ebe0c280bf68f6e7e397e65489c70196f91ae 100644 (file)
@@ -16,12 +16,15 @@ import (
        "net"
        "net/http"
        "net/url"
+       "os"
        "strings"
        "time"
 
        "git.arvados.org/arvados.git/lib/cmd"
+       "git.arvados.org/arvados.git/lib/config"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "git.arvados.org/arvados.git/sdk/go/health"
        "github.com/sirupsen/logrus"
 )
 
@@ -125,6 +128,30 @@ func (diag *diagnoser) runtests() {
                return
        }
 
+       diag.dotest(5, "running health check (same as `arvados-server check`)", func() error {
+               ldr := config.NewLoader(&bytes.Buffer{}, ctxlog.New(&bytes.Buffer{}, "text", "info"))
+               ldr.SetupFlags(flag.NewFlagSet("diagnostics", flag.ContinueOnError))
+               cfg, err := ldr.Load()
+               if err != nil {
+                       diag.infof("skipping because config could not be loaded: %s", err)
+                       return nil
+               }
+               cluster, err := cfg.GetCluster("")
+               if err != nil {
+                       return err
+               }
+               if cluster.SystemRootToken != os.Getenv("ARVADOS_API_TOKEN") {
+                       diag.infof("skipping because provided token is not SystemRootToken")
+               }
+               agg := &health.Aggregator{Cluster: cluster}
+               resp := agg.ClusterHealth()
+               for _, e := range resp.Errors {
+                       diag.errorf("health check: %s", e)
+               }
+               diag.infof("health check: reported clock skew %v", resp.ClockSkew)
+               return nil
+       })
+
        var dd arvados.DiscoveryDocument
        ddpath := "discovery/v1/apis/arvados/v1/rest"
        diag.dotest(10, fmt.Sprintf("getting discovery document from https://%s/%s", client.APIHost, ddpath), func() error {
index e2348337e62992eb4463947690e809e1927bb232..d362f66d14b3ee12b9a4fb6b197b9a34747d944c 100644 (file)
@@ -170,6 +170,19 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
        if ctr.State != dispatch.Locked {
                // already started by prior invocation
        } else if _, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok {
+               if _, err := dispatchcloud.ChooseInstanceType(disp.Cluster, &ctr); errors.As(err, &dispatchcloud.ConstraintsNotSatisfiableError{}) {
+                       err := disp.arvDispatcher.Arv.Update("containers", ctr.UUID, arvadosclient.Dict{
+                               "container": map[string]interface{}{
+                                       "runtime_status": map[string]string{
+                                               "error": err.Error(),
+                                       },
+                               },
+                       }, nil)
+                       if err != nil {
+                               return fmt.Errorf("error setting runtime_status on %s: %s", ctr.UUID, err)
+                       }
+                       return disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
+               }
                disp.logger.Printf("Submitting container %s to LSF", ctr.UUID)
                cmd := []string{disp.Cluster.Containers.CrunchRunCommand}
                cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine)
@@ -184,9 +197,8 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
        defer disp.logger.Printf("Done monitoring container %s", ctr.UUID)
 
        go func(uuid string) {
-               cancelled := false
                for ctx.Err() == nil {
-                       qent, ok := disp.lsfqueue.Lookup(uuid)
+                       _, ok := disp.lsfqueue.Lookup(uuid)
                        if !ok {
                                // If the container disappears from
                                // the lsf queue, there is no point in
@@ -196,25 +208,6 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
                                cancel()
                                return
                        }
-                       if !cancelled && qent.Stat == "PEND" && strings.Contains(qent.PendReason, "There are no suitable hosts for the job") {
-                               disp.logger.Printf("container %s: %s", uuid, qent.PendReason)
-                               err := disp.arvDispatcher.Arv.Update("containers", uuid, arvadosclient.Dict{
-                                       "container": map[string]interface{}{
-                                               "runtime_status": map[string]string{
-                                                       "error": qent.PendReason,
-                                               },
-                                       },
-                               }, nil)
-                               if err != nil {
-                                       disp.logger.Printf("error setting runtime_status on %s: %s", uuid, err)
-                                       continue // retry
-                               }
-                               err = disp.arvDispatcher.UpdateState(uuid, dispatch.Cancelled)
-                               if err != nil {
-                                       continue // retry (UpdateState() already logged the error)
-                               }
-                               cancelled = true
-                       }
                }
        }(ctr.UUID)
 
index a99983f34a8ae4163f9a91ba59c43ab9e57c3e00..e51e719066cbdf2b3f71d245eea9a7fc326fcbc3 100644 (file)
@@ -32,6 +32,7 @@ var _ = check.Suite(&suite{})
 type suite struct {
        disp          *dispatcher
        crTooBig      arvados.ContainerRequest
+       crPending     arvados.ContainerRequest
        crCUDARequest arvados.ContainerRequest
 }
 
@@ -46,6 +47,13 @@ func (s *suite) SetUpTest(c *check.C) {
        c.Assert(err, check.IsNil)
        cluster.Containers.CloudVMs.PollInterval = arvados.Duration(time.Second / 4)
        cluster.Containers.MinRetryPeriod = arvados.Duration(time.Second / 4)
+       cluster.InstanceTypes = arvados.InstanceTypeMap{
+               "biggest_available_node": arvados.InstanceType{
+                       RAM:             100 << 30, // 100 GiB
+                       VCPUs:           4,
+                       IncludedScratch: 100 << 30,
+                       Scratch:         100 << 30,
+               }}
        s.disp = newHandler(context.Background(), cluster, arvadostest.Dispatch1Token, prometheus.NewRegistry()).(*dispatcher)
        s.disp.lsfcli.stubCommand = func(string, ...string) *exec.Cmd {
                return exec.Command("bash", "-c", "echo >&2 unimplemented stub; false")
@@ -67,6 +75,23 @@ func (s *suite) SetUpTest(c *check.C) {
        })
        c.Assert(err, check.IsNil)
 
+       err = arvados.NewClientFromEnv().RequestAndDecode(&s.crPending, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
+               "container_request": map[string]interface{}{
+                       "runtime_constraints": arvados.RuntimeConstraints{
+                               RAM:   100000000,
+                               VCPUs: 2,
+                       },
+                       "container_image":     arvadostest.DockerImage112PDH,
+                       "command":             []string{"sleep", "1"},
+                       "mounts":              map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
+                       "output_path":         "/mnt/out",
+                       "state":               arvados.ContainerRequestStateCommitted,
+                       "priority":            1,
+                       "container_count_max": 1,
+               },
+       })
+       c.Assert(err, check.IsNil)
+
        err = arvados.NewClientFromEnv().RequestAndDecode(&s.crCUDARequest, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
                "container_request": map[string]interface{}{
                        "runtime_constraints": arvados.RuntimeConstraints{
@@ -150,15 +175,15 @@ func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ...
                                fakejobq[nextjobid] = args[1]
                                nextjobid++
                                mtx.Unlock()
-                       case s.crTooBig.ContainerUUID:
+                       case s.crPending.ContainerUUID:
                                c.Check(args, check.DeepEquals, []string{
-                                       "-J", s.crTooBig.ContainerUUID,
-                                       "-n", "1",
-                                       "-D", "954187MB",
-                                       "-R", "rusage[mem=954187MB:tmp=256MB] span[hosts=1]",
-                                       "-R", "select[mem>=954187MB]",
+                                       "-J", s.crPending.ContainerUUID,
+                                       "-n", "2",
+                                       "-D", "608MB",
+                                       "-R", "rusage[mem=608MB:tmp=256MB] span[hosts=1]",
+                                       "-R", "select[mem>=608MB]",
                                        "-R", "select[tmp>=256MB]",
-                                       "-R", "select[ncpus>=1]"})
+                                       "-R", "select[ncpus>=2]"})
                                mtx.Lock()
                                fakejobq[nextjobid] = args[1]
                                nextjobid++
@@ -187,7 +212,7 @@ func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ...
                        var records []map[string]interface{}
                        for jobid, uuid := range fakejobq {
                                stat, reason := "RUN", ""
-                               if uuid == s.crTooBig.ContainerUUID {
+                               if uuid == s.crPending.ContainerUUID {
                                        // The real bjobs output includes a trailing ';' here:
                                        stat, reason = "PEND", "There are no suitable hosts for the job;"
                                }
@@ -242,23 +267,28 @@ func (s *suite) TestSubmit(c *check.C) {
                        c.Error("timed out")
                        break
                }
+               // "crTooBig" should never be submitted to lsf because
+               // it is bigger than any configured instance type
+               if ent, ok := s.disp.lsfqueue.Lookup(s.crTooBig.ContainerUUID); ok {
+                       c.Errorf("Lookup(crTooBig) == true, ent = %#v", ent)
+                       break
+               }
                // "queuedcontainer" should be running
                if _, ok := s.disp.lsfqueue.Lookup(arvadostest.QueuedContainerUUID); !ok {
                        c.Log("Lookup(queuedcontainer) == false")
                        continue
                }
+               // "crPending" should be pending
+               if ent, ok := s.disp.lsfqueue.Lookup(s.crPending.ContainerUUID); !ok {
+                       c.Logf("Lookup(crPending) == false", ent)
+                       continue
+               }
                // "lockedcontainer" should be cancelled because it
                // has priority 0 (no matching container requests)
                if ent, ok := s.disp.lsfqueue.Lookup(arvadostest.LockedContainerUUID); ok {
                        c.Logf("Lookup(lockedcontainer) == true, ent = %#v", ent)
                        continue
                }
-               // "crTooBig" should be cancelled because lsf stub
-               // reports there is no suitable instance type
-               if ent, ok := s.disp.lsfqueue.Lookup(s.crTooBig.ContainerUUID); ok {
-                       c.Logf("Lookup(crTooBig) == true, ent = %#v", ent)
-                       continue
-               }
                var ctr arvados.Container
                if err := s.disp.arvDispatcher.Arv.Get("containers", arvadostest.LockedContainerUUID, nil, &ctr); err != nil {
                        c.Logf("error getting container state for %s: %s", arvadostest.LockedContainerUUID, err)
@@ -275,7 +305,7 @@ func (s *suite) TestSubmit(c *check.C) {
                        c.Logf("container %s is not in the LSF queue but its arvados record has not been updated to state==Cancelled (state is %q)", s.crTooBig.ContainerUUID, ctr.State)
                        continue
                } else {
-                       c.Check(ctr.RuntimeStatus["error"], check.Equals, "There are no suitable hosts for the job;")
+                       c.Check(ctr.RuntimeStatus["error"], check.Equals, "constraints not satisfiable by any configured instance type")
                }
                c.Log("reached desired state")
                break
index 5c74eb1f9855fa548a4b936b987c85b2ab9461a5..694f77baf246ecb56e6116e99dd5461deb9f6e53 100644 (file)
@@ -542,10 +542,10 @@ The 'jobs' API is no longer supported.
             git_commit = subprocess.run(["git", "log", "--format=%H", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
             git_date = subprocess.run(["git", "log", "--format=%cD", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
             git_committer = subprocess.run(["git", "log", "--format=%cn <%ce>", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
-            git_branch = subprocess.run(["git", "branch", "--show-current"], cwd=cwd, capture_output=True, text=True).stdout
+            git_branch = subprocess.run(["git", "rev-parse", "--abbrev-ref", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
             git_origin = subprocess.run(["git", "remote", "get-url", "origin"], cwd=cwd, capture_output=True, text=True).stdout
             git_status = subprocess.run(["git", "status", "--untracked-files=no", "--porcelain"], cwd=cwd, capture_output=True, text=True).stdout
-            git_describe = subprocess.run(["git", "describe", "--always"], cwd=cwd, capture_output=True, text=True).stdout
+            git_describe = subprocess.run(["git", "describe", "--always", "--tags"], cwd=cwd, capture_output=True, text=True).stdout
             git_toplevel = subprocess.run(["git", "rev-parse", "--show-toplevel"], cwd=cwd, capture_output=True, text=True).stdout
             git_path = filepath[len(git_toplevel):]
 
index cdc07bb0afd2c80b09985ad28f18c6c0fa1abcde..4dead0ada9143231a1b34c1700174279e64cfe83 100644 (file)
@@ -7,6 +7,7 @@ package arvados
 import (
        "bytes"
        "context"
+       "crypto/rand"
        "crypto/tls"
        "encoding/json"
        "errors"
@@ -15,6 +16,7 @@ import (
        "io/fs"
        "io/ioutil"
        "log"
+       "math/big"
        "net"
        "net/http"
        "net/url"
@@ -599,3 +601,13 @@ func (c *Client) PathForUUID(method, uuid string) (string, error) {
        }
        return path, nil
 }
+
+var maxUUIDInt = (&big.Int{}).Exp(big.NewInt(36), big.NewInt(15), nil)
+
+func RandomUUID(clusterID, infix string) string {
+       n, err := rand.Int(rand.Reader, maxUUIDInt)
+       if err != nil {
+               panic(err)
+       }
+       return clusterID + "-" + infix + "-" + n.Text(36)
+}
index 6d8f39dfb316fbaba1cf64f71ef5f5f778f91e8e..a1fc2e89f44331f005e2b28609a32dffe7c5480a 100644 (file)
@@ -61,13 +61,10 @@ func (sc *Config) GetCluster(clusterID string) (*Cluster, error) {
 }
 
 type WebDAVCacheConfig struct {
-       TTL                  Duration
-       UUIDTTL              Duration
-       MaxBlockEntries      int
-       MaxCollectionEntries int
-       MaxCollectionBytes   int64
-       MaxUUIDEntries       int
-       MaxSessions          int
+       TTL                Duration
+       MaxBlockEntries    int
+       MaxCollectionBytes int64
+       MaxSessions        int
 }
 
 type UploadDownloadPermission struct {
@@ -251,6 +248,7 @@ type Cluster struct {
                PreferDomainForUsername               string
                UserSetupMailText                     string
                RoleGroupsVisibleToAll                bool
+               ActivityLoggingPeriod                 Duration
        }
        StorageClasses map[string]StorageClassConfig
        Volumes        map[string]Volume
index 6da639edaf73c3731501246de556f1c7c1932e16..274d20702287ed464d4ea8f3796e528c3f61b30b 100644 (file)
@@ -641,7 +641,15 @@ func (fs *fileSystem) Rename(oldname, newname string) error {
        }
        locked := map[sync.Locker]bool{}
        for i := len(needLock) - 1; i >= 0; i-- {
-               if n := needLock[i]; !locked[n] {
+               n := needLock[i]
+               if fs, ok := n.(interface{ rootnode() inode }); ok {
+                       // Lock the fs's root dir directly, not
+                       // through the fs. Otherwise our "locked" map
+                       // would not reliably prevent double-locking
+                       // the fs's root dir.
+                       n = fs.rootnode()
+               }
+               if !locked[n] {
                        n.Lock()
                        defer n.Unlock()
                        locked[n] = true
index f09c60a57192a7e654c03a63407045ac302c6d65..a26c876b932304ab6fdfefbafe36145665cbac90 100644 (file)
@@ -290,44 +290,70 @@ func (fs *collectionFileSystem) Truncate(int64) error {
        return ErrInvalidOperation
 }
 
-// Check for and incorporate upstream changes -- unless that has
-// already been done recently, in which case this func is a no-op.
-func (fs *collectionFileSystem) checkChangesOnServer() error {
+// Check for and incorporate upstream changes. If force==false, this
+// is a no-op except once every ttl/100 or so.
+//
+// Return value is true if new content was loaded from upstream and
+// any unsaved local changes have been discarded.
+func (fs *collectionFileSystem) checkChangesOnServer(force bool) (bool, error) {
        if fs.uuid == "" && fs.savedPDH.Load() == "" {
-               return nil
+               return false, nil
        }
 
-       // First try UUID if any, then last known PDH. Stop if all
-       // signatures are new enough.
-       checkingAll := false
-       for _, id := range []string{fs.uuid, fs.savedPDH.Load().(string)} {
-               if id == "" {
-                       continue
-               }
-
-               fs.lockCheckChanges.Lock()
-               if !checkingAll && fs.holdCheckChanges.After(time.Now()) {
-                       fs.lockCheckChanges.Unlock()
-                       return nil
-               }
-               remain, ttl := fs.signatureTimeLeft()
-               if remain > 0.01 && !checkingAll {
-                       fs.holdCheckChanges = time.Now().Add(ttl / 100)
-               }
+       fs.lockCheckChanges.Lock()
+       if !force && fs.holdCheckChanges.After(time.Now()) {
                fs.lockCheckChanges.Unlock()
+               return false, nil
+       }
+       remain, ttl := fs.signatureTimeLeft()
+       if remain > 0.01 {
+               fs.holdCheckChanges = time.Now().Add(ttl / 100)
+       }
+       fs.lockCheckChanges.Unlock()
 
-               if remain >= 0.5 {
-                       break
+       if !force && remain >= 0.5 {
+               // plenty of time left on current signatures
+               return false, nil
+       }
+
+       getparams := map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}}
+       if fs.uuid != "" {
+               var coll Collection
+               err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+fs.uuid, nil, getparams)
+               if err != nil {
+                       return false, err
                }
-               checkingAll = true
+               if coll.PortableDataHash != fs.savedPDH.Load().(string) {
+                       // collection has changed upstream since we
+                       // last loaded or saved. Refresh local data,
+                       // losing any unsaved local changes.
+                       newfs, err := coll.FileSystem(fs.fileSystem.fsBackend, fs.fileSystem.fsBackend)
+                       if err != nil {
+                               return false, err
+                       }
+                       snap, err := Snapshot(newfs, "/")
+                       if err != nil {
+                               return false, err
+                       }
+                       err = Splice(fs, "/", snap)
+                       if err != nil {
+                               return false, err
+                       }
+                       fs.savedPDH.Store(coll.PortableDataHash)
+                       return true, nil
+               }
+               fs.updateSignatures(coll.ManifestText)
+               return false, nil
+       }
+       if pdh := fs.savedPDH.Load().(string); pdh != "" {
                var coll Collection
-               err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}})
+               err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+pdh, nil, getparams)
                if err != nil {
-                       continue
+                       return false, err
                }
                fs.updateSignatures(coll.ManifestText)
        }
-       return nil
+       return false, nil
 }
 
 // Refresh signature on a single locator, if necessary. Assume caller
@@ -339,7 +365,7 @@ func (fs *collectionFileSystem) refreshSignature(locator string) string {
        if err != nil || exp.Sub(time.Now()) > time.Minute {
                // Synchronous update is not needed. Start an
                // asynchronous update if needed.
-               go fs.checkChangesOnServer()
+               go fs.checkChangesOnServer(false)
                return locator
        }
        var manifests string
@@ -368,11 +394,11 @@ func (fs *collectionFileSystem) refreshSignature(locator string) string {
 }
 
 func (fs *collectionFileSystem) Sync() error {
-       err := fs.checkChangesOnServer()
+       refreshed, err := fs.checkChangesOnServer(true)
        if err != nil {
                return err
        }
-       if fs.uuid == "" {
+       if refreshed || fs.uuid == "" {
                return nil
        }
        txt, err := fs.MarshalManifest(".")
@@ -403,7 +429,7 @@ func (fs *collectionFileSystem) Sync() error {
                "select": selectFields,
        })
        if err != nil {
-               return fmt.Errorf("sync failed: update %s: %s", fs.uuid, err)
+               return fmt.Errorf("sync failed: update %s: %w", fs.uuid, err)
        }
        fs.updateSignatures(coll.ManifestText)
        fs.savedPDH.Store(coll.PortableDataHash)
@@ -579,10 +605,7 @@ func (fn *filenode) MemorySize() (size int64) {
        defer fn.RUnlock()
        size = 64
        for _, seg := range fn.segments {
-               size += 64
-               if seg, ok := seg.(*memSegment); ok {
-                       size += int64(seg.Len())
-               }
+               size += seg.memorySize()
        }
        return
 }
@@ -1629,6 +1652,7 @@ type segment interface {
        // Return a new segment with a subsection of the data from this
        // one. length<0 means length=Len()-off.
        Slice(off int, length int) segment
+       memorySize() int64
 }
 
 type memSegment struct {
@@ -1707,6 +1731,10 @@ func (me *memSegment) ReadAt(p []byte, off int64) (n int, err error) {
        return
 }
 
+func (me *memSegment) memorySize() int64 {
+       return 64 + int64(len(me.buf))
+}
+
 type storedSegment struct {
        kc      fsBackend
        locator string
@@ -1744,6 +1772,10 @@ func (se storedSegment) ReadAt(p []byte, off int64) (n int, err error) {
        return se.kc.ReadAt(se.locator, p, int(off)+se.offset)
 }
 
+func (se storedSegment) memorySize() int64 {
+       return 64 + int64(len(se.locator))
+}
+
 func canonicalName(name string) string {
        name = path.Clean("/" + name)
        if name == "/" || name == "./" {
index c2cac3c6ce2e963b36b7654729e56524ba9bc2db..73689e4eadf620f12babdb3ba61979e05995d387 100644 (file)
@@ -1209,11 +1209,12 @@ func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
        }
 
        nDirs := int64(8)
+       nFiles := int64(67)
        megabyte := make([]byte, 1<<20)
        for i := int64(0); i < nDirs; i++ {
                dir := fmt.Sprintf("dir%d", i)
                fs.Mkdir(dir, 0755)
-               for j := 0; j < 67; j++ {
+               for j := int64(0); j < nFiles; j++ {
                        f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
                        c.Assert(err, check.IsNil)
                        defer f.Close()
@@ -1221,8 +1222,8 @@ func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
                        c.Assert(err, check.IsNil)
                }
        }
-       inodebytes := int64((nDirs*(67*2+1) + 1) * 64)
-       c.Check(fs.MemorySize(), check.Equals, int64(nDirs*67<<20)+inodebytes)
+       inodebytes := int64((nDirs*(nFiles+1) + 1) * 64)
+       c.Check(fs.MemorySize(), check.Equals, nDirs*nFiles*(1<<20+64)+inodebytes)
        c.Check(flushed, check.Equals, int64(0))
 
        waitForFlush := func(expectUnflushed, expectFlushed int64) {
@@ -1233,27 +1234,29 @@ func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
        }
 
        // Nothing flushed yet
-       waitForFlush((nDirs*67)<<20+inodebytes, 0)
+       waitForFlush(nDirs*nFiles*(1<<20+64)+inodebytes, 0)
 
        // Flushing a non-empty dir "/" is non-recursive and there are
        // no top-level files, so this has no effect
        fs.Flush("/", false)
-       waitForFlush((nDirs*67)<<20+inodebytes, 0)
+       waitForFlush(nDirs*nFiles*(1<<20+64)+inodebytes, 0)
 
        // Flush the full block in dir0
        fs.Flush("dir0", false)
-       waitForFlush((nDirs*67-64)<<20+inodebytes, 64<<20)
+       bigloclen := int64(32 + 9 + 51 + 64) // md5 + "+" + "67xxxxxx" + "+Axxxxxx..." + 64 (see (storedSegment)memorySize)
+       waitForFlush((nDirs*nFiles-64)*(1<<20+64)+inodebytes+bigloclen*64, 64<<20)
 
        err = fs.Flush("dir-does-not-exist", false)
        c.Check(err, check.NotNil)
 
        // Flush full blocks in all dirs
        fs.Flush("", false)
-       waitForFlush(nDirs*3<<20+inodebytes, nDirs*64<<20)
+       waitForFlush(nDirs*3*(1<<20+64)+inodebytes+bigloclen*64*nDirs, nDirs*64<<20)
 
        // Flush non-full blocks, too
        fs.Flush("", true)
-       waitForFlush(inodebytes, nDirs*67<<20)
+       smallloclen := int64(32 + 8 + 51 + 64) // md5 + "+" + "3xxxxxx" + "+Axxxxxx..." + 64 (see (storedSegment)memorySize)
+       waitForFlush(inodebytes+bigloclen*64*nDirs+smallloclen*3*nDirs, nDirs*67<<20)
 }
 
 // Even when writing lots of files/dirs from different goroutines, as
index 1dfa2df6e4005f0b6d93f497a657e81e583bad14..e85446098f8c9b68120bacd42e5ece6c2a1f08b0 100644 (file)
@@ -5,45 +5,10 @@
 package arvados
 
 import (
-       "log"
        "os"
        "sync"
-       "time"
 )
 
-func deferredCollectionFS(fs FileSystem, parent inode, coll Collection) inode {
-       modTime := coll.ModifiedAt
-       if modTime.IsZero() {
-               modTime = time.Now()
-       }
-       placeholder := &treenode{
-               fs:     fs,
-               parent: parent,
-               inodes: nil,
-               fileinfo: fileinfo{
-                       name:    coll.Name,
-                       modTime: modTime,
-                       mode:    0755 | os.ModeDir,
-                       sys:     func() interface{} { return &coll },
-               },
-       }
-       return &deferrednode{wrapped: placeholder, create: func() inode {
-               err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+coll.UUID, nil, nil)
-               if err != nil {
-                       log.Printf("BUG: unhandled error: %s", err)
-                       return placeholder
-               }
-               newfs, err := coll.FileSystem(fs, fs)
-               if err != nil {
-                       log.Printf("BUG: unhandled error: %s", err)
-                       return placeholder
-               }
-               cfs := newfs.(*collectionFileSystem)
-               cfs.SetParent(parent, coll.Name)
-               return cfs
-       }}
-}
-
 // A deferrednode wraps an inode that's expensive to build. Initially,
 // it responds to basic directory functions by proxying to the given
 // placeholder. If a caller uses a read/write/lock operation,
index faab6e4f04d36b7caffdc9ce39abf946e9e6c2a4..a68e83945e348a0f3e740b7c0a313155917b795a 100644 (file)
@@ -6,7 +6,9 @@ package arvados
 
 import (
        "log"
+       "os"
        "strings"
+       "time"
 )
 
 func (fs *customFileSystem) defaultUUID(uuid string) (string, error) {
@@ -64,9 +66,18 @@ func (fs *customFileSystem) projectsLoadOne(parent inode, uuid, name string) (in
        if strings.Contains(coll.UUID, "-j7d0g-") {
                // Group item was loaded into a Collection var -- but
                // we only need the Name and UUID anyway, so it's OK.
-               return fs.newProjectNode(parent, coll.Name, coll.UUID, nil), nil
+               return &hardlink{
+                       inode: fs.projectSingleton(coll.UUID, &Group{
+                               UUID:       coll.UUID,
+                               Name:       coll.Name,
+                               ModifiedAt: coll.ModifiedAt,
+                               Properties: coll.Properties,
+                       }),
+                       parent: parent,
+                       name:   coll.Name,
+               }, nil
        } else if strings.Contains(coll.UUID, "-4zz18-") {
-               return deferredCollectionFS(fs, parent, coll), nil
+               return fs.newDeferredCollectionDir(parent, name, coll.UUID, coll.ModifiedAt, coll.Properties), nil
        } else {
                log.Printf("group contents: unrecognized UUID in response: %q", coll.UUID)
                return nil, ErrInvalidArgument
@@ -105,10 +116,14 @@ func (fs *customFileSystem) projectsLoadAll(parent inode, uuid string) ([]inode,
                }
 
                for {
-                       // The groups content endpoint returns Collection and Group (project)
-                       // objects. This function only accesses the UUID and Name field. Both
-                       // collections and groups have those fields, so it is easier to just treat
-                       // the ObjectList that comes back as a CollectionList.
+                       // The groups content endpoint returns
+                       // Collection and Group (project)
+                       // objects. This function only accesses the
+                       // UUID, Name, and ModifiedAt fields. Both
+                       // collections and groups have those fields,
+                       // so it is easier to just treat the
+                       // ObjectList that comes back as a
+                       // CollectionList.
                        var resp CollectionList
                        err = fs.RequestAndDecode(&resp, "GET", "arvados/v1/groups/"+uuid+"/contents", nil, params)
                        if err != nil {
@@ -125,14 +140,14 @@ func (fs *customFileSystem) projectsLoadAll(parent inode, uuid string) ([]inode,
                                        continue
                                }
                                if strings.Contains(i.UUID, "-j7d0g-") {
-                                       inodes = append(inodes, fs.newProjectNode(parent, i.Name, i.UUID, &Group{
+                                       inodes = append(inodes, fs.newProjectDir(parent, i.Name, i.UUID, &Group{
                                                UUID:       i.UUID,
                                                Name:       i.Name,
                                                ModifiedAt: i.ModifiedAt,
                                                Properties: i.Properties,
                                        }))
                                } else if strings.Contains(i.UUID, "-4zz18-") {
-                                       inodes = append(inodes, deferredCollectionFS(fs, parent, i))
+                                       inodes = append(inodes, fs.newDeferredCollectionDir(parent, i.Name, i.UUID, i.ModifiedAt, i.Properties))
                                } else {
                                        log.Printf("group contents: unrecognized UUID in response: %q", i.UUID)
                                        return nil, ErrInvalidArgument
@@ -143,3 +158,32 @@ func (fs *customFileSystem) projectsLoadAll(parent inode, uuid string) ([]inode,
        }
        return inodes, nil
 }
+
+func (fs *customFileSystem) newProjectDir(parent inode, name, uuid string, proj *Group) inode {
+       return &hardlink{inode: fs.projectSingleton(uuid, proj), parent: parent, name: name}
+}
+
+func (fs *customFileSystem) newDeferredCollectionDir(parent inode, name, uuid string, modTime time.Time, props map[string]interface{}) inode {
+       if modTime.IsZero() {
+               modTime = time.Now()
+       }
+       placeholder := &treenode{
+               fs:     fs,
+               parent: parent,
+               inodes: nil,
+               fileinfo: fileinfo{
+                       name:    name,
+                       modTime: modTime,
+                       mode:    0755 | os.ModeDir,
+                       sys:     func() interface{} { return &Collection{UUID: uuid, Name: name, ModifiedAt: modTime, Properties: props} },
+               },
+       }
+       return &deferrednode{wrapped: placeholder, create: func() inode {
+               node, err := fs.collectionSingleton(uuid)
+               if err != nil {
+                       log.Printf("BUG: unhandled error: %s", err)
+                       return placeholder
+               }
+               return &hardlink{inode: node, parent: parent, name: name}
+       }}
+}
index 8e7f58815629478f243396472fb92b258fe3ac98..d3dac7a14f7424539c29c89811ee900eb47f603d 100644 (file)
@@ -10,7 +10,6 @@ import (
        "errors"
        "io"
        "os"
-       "path/filepath"
        "strings"
 
        check "gopkg.in/check.v1"
@@ -102,14 +101,16 @@ func (s *SiteFSSuite) TestFilterGroup(c *check.C) {
 
 func (s *SiteFSSuite) TestCurrentUserHome(c *check.C) {
        s.fs.MountProject("home", "")
-       s.testHomeProject(c, "/home")
+       s.testHomeProject(c, "/home", "home")
 }
 
 func (s *SiteFSSuite) TestUsersDir(c *check.C) {
-       s.testHomeProject(c, "/users/active")
+       // /users/active is a hardlink to a dir whose name is the UUID
+       // of the active user
+       s.testHomeProject(c, "/users/active", fixtureActiveUserUUID)
 }
 
-func (s *SiteFSSuite) testHomeProject(c *check.C, path string) {
+func (s *SiteFSSuite) testHomeProject(c *check.C, path, expectRealName string) {
        f, err := s.fs.Open(path)
        c.Assert(err, check.IsNil)
        fis, err := f.Readdir(-1)
@@ -130,8 +131,7 @@ func (s *SiteFSSuite) testHomeProject(c *check.C, path string) {
        fi, err := f.Stat()
        c.Assert(err, check.IsNil)
        c.Check(fi.IsDir(), check.Equals, true)
-       _, basename := filepath.Split(path)
-       c.Check(fi.Name(), check.Equals, basename)
+       c.Check(fi.Name(), check.Equals, expectRealName)
 
        f, err = s.fs.Open(path + "/A Project/A Subproject")
        c.Assert(err, check.IsNil)
@@ -263,14 +263,10 @@ func (s *SiteFSSuite) TestProjectUpdatedByOther(c *check.C) {
 
        err = project.Sync()
        c.Check(err, check.IsNil)
-       _, err = s.fs.Open("/home/A Project/oob/test.txt")
-       c.Check(err, check.IsNil)
-
-       // Sync again to mark the project dir as stale, so the
-       // collection gets reloaded from the controller on next
-       // lookup.
-       err = project.Sync()
-       c.Check(err, check.IsNil)
+       f, err = s.fs.Open("/home/A Project/oob/test.txt")
+       if c.Check(err, check.IsNil) {
+               f.Close()
+       }
 
        // Ensure collection was flushed by Sync
        var latest Collection
@@ -288,10 +284,17 @@ func (s *SiteFSSuite) TestProjectUpdatedByOther(c *check.C) {
        })
        c.Assert(err, check.IsNil)
 
+       // Sync again to reload collection.
+       err = project.Sync()
+       c.Check(err, check.IsNil)
+
+       // Check test.txt deletion is reflected in fs.
        _, err = s.fs.Open("/home/A Project/oob/test.txt")
        c.Check(err, check.NotNil)
-       _, err = s.fs.Open("/home/A Project/oob")
-       c.Check(err, check.IsNil)
+       f, err = s.fs.Open("/home/A Project/oob")
+       if c.Check(err, check.IsNil) {
+               f.Close()
+       }
 
        err = s.client.RequestAndDecode(nil, "DELETE", "arvados/v1/collections/"+oob.UUID, nil, nil)
        c.Assert(err, check.IsNil)
index bb2eee77925fd2c682c7d42e1e8e175c4f2f1489..a4a18837e00e7074521ce3e562fb30c21b84c1eb 100644 (file)
@@ -5,6 +5,7 @@
 package arvados
 
 import (
+       "net/http"
        "os"
        "strings"
        "sync"
@@ -28,6 +29,10 @@ type customFileSystem struct {
        staleLock      sync.Mutex
 
        forwardSlashNameSubstitution string
+
+       byID     map[string]inode
+       byIDLock sync.Mutex
+       byIDRoot *treenode
 }
 
 func (c *Client) CustomFileSystem(kc keepClient) CustomFileSystem {
@@ -50,6 +55,17 @@ func (c *Client) CustomFileSystem(kc keepClient) CustomFileSystem {
                },
                inodes: make(map[string]inode),
        }
+       fs.byID = map[string]inode{}
+       fs.byIDRoot = &treenode{
+               fs:     fs,
+               parent: root,
+               inodes: make(map[string]inode),
+               fileinfo: fileinfo{
+                       name:    "_internal_by_id",
+                       modTime: time.Now(),
+                       mode:    0755 | os.ModeDir,
+               },
+       }
        return fs
 }
 
@@ -68,7 +84,7 @@ func (fs *customFileSystem) MountByID(mount string) {
                                        mode:    0755 | os.ModeDir,
                                },
                        },
-                       create: fs.mountByID,
+                       create: fs.newCollectionOrProjectHardlink,
                }, nil
        })
 }
@@ -77,7 +93,7 @@ func (fs *customFileSystem) MountProject(mount, uuid string) {
        fs.root.treenode.Lock()
        defer fs.root.treenode.Unlock()
        fs.root.treenode.Child(mount, func(inode) (inode, error) {
-               return fs.newProjectNode(fs.root, mount, uuid, nil), nil
+               return fs.newProjectDir(fs.root, mount, uuid, nil), nil
        })
 }
 
@@ -121,7 +137,7 @@ func (c *Client) SiteFileSystem(kc keepClient) CustomFileSystem {
 }
 
 func (fs *customFileSystem) Sync() error {
-       return fs.root.Sync()
+       return fs.byIDRoot.Sync()
 }
 
 // Stale returns true if information obtained at time t should be
@@ -136,40 +152,58 @@ func (fs *customFileSystem) newNode(name string, perm os.FileMode, modTime time.
        return nil, ErrInvalidOperation
 }
 
-func (fs *customFileSystem) mountByID(parent inode, id string) inode {
+func (fs *customFileSystem) newCollectionOrProjectHardlink(parent inode, id string) (inode, error) {
        if strings.Contains(id, "-4zz18-") || pdhRegexp.MatchString(id) {
-               return fs.mountCollection(parent, id)
-       } else if strings.Contains(id, "-j7d0g-") {
-               return fs.newProjectNode(fs.root, id, id, nil)
+               node, err := fs.collectionSingleton(id)
+               if os.IsNotExist(err) {
+                       return nil, nil
+               } else if err != nil {
+                       return nil, err
+               }
+               return &hardlink{inode: node, parent: parent, name: id}, nil
+       } else if strings.Contains(id, "-j7d0g-") || strings.Contains(id, "-tpzed-") {
+               fs.byIDLock.Lock()
+               node := fs.byID[id]
+               fs.byIDLock.Unlock()
+               if node == nil {
+                       // Look up the project synchronously before
+                       // calling projectSingleton (otherwise we
+                       // wouldn't detect a nonexistent project until
+                       // it's too late to return ErrNotExist).
+                       proj, err := fs.getProject(id)
+                       if os.IsNotExist(err) {
+                               return nil, nil
+                       } else if err != nil {
+                               return nil, err
+                       }
+                       node = fs.projectSingleton(id, proj)
+               }
+               return &hardlink{inode: node, parent: parent, name: id}, nil
        } else {
-               return nil
+               return nil, nil
        }
 }
 
-func (fs *customFileSystem) mountCollection(parent inode, id string) inode {
-       var coll Collection
-       err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, nil)
-       if err != nil {
-               return nil
+func (fs *customFileSystem) projectSingleton(uuid string, proj *Group) inode {
+       fs.byIDLock.Lock()
+       defer fs.byIDLock.Unlock()
+       if n := fs.byID[uuid]; n != nil {
+               return n
        }
-       newfs, err := coll.FileSystem(fs, fs)
-       if err != nil {
-               return nil
+       name := uuid
+       if name == "" {
+               // special case uuid=="" implements the "home project"
+               // (owner_uuid == current user uuid)
+               name = "home"
        }
-       cfs := newfs.(*collectionFileSystem)
-       cfs.SetParent(parent, id)
-       return cfs
-}
-
-func (fs *customFileSystem) newProjectNode(root inode, name, uuid string, proj *Group) inode {
        var projLoading sync.Mutex
-       return &lookupnode{
+       n := &lookupnode{
                stale:   fs.Stale,
                loadOne: func(parent inode, name string) (inode, error) { return fs.projectsLoadOne(parent, uuid, name) },
                loadAll: func(parent inode) ([]inode, error) { return fs.projectsLoadAll(parent, uuid) },
                treenode: treenode{
                        fs:     fs,
-                       parent: root,
+                       parent: fs.byIDRoot,
                        inodes: make(map[string]inode),
                        fileinfo: fileinfo{
                                name:    name,
@@ -181,17 +215,90 @@ func (fs *customFileSystem) newProjectNode(root inode, name, uuid string, proj *
                                        if proj != nil {
                                                return proj
                                        }
-                                       var g Group
-                                       err := fs.RequestAndDecode(&g, "GET", "arvados/v1/groups/"+uuid, nil, nil)
+                                       g, err := fs.getProject(uuid)
                                        if err != nil {
                                                return err
                                        }
-                                       proj = &g
+                                       proj = g
                                        return proj
                                },
                        },
                },
        }
+       fs.byID[uuid] = n
+       return n
+}
+
+func (fs *customFileSystem) getProject(uuid string) (*Group, error) {
+       var g Group
+       err := fs.RequestAndDecode(&g, "GET", "arvados/v1/groups/"+uuid, nil, nil)
+       if statusErr, ok := err.(interface{ HTTPStatus() int }); ok && statusErr.HTTPStatus() == http.StatusNotFound {
+               return nil, os.ErrNotExist
+       } else if err != nil {
+               return nil, err
+       }
+       return &g, err
+}
+
+func (fs *customFileSystem) collectionSingleton(id string) (inode, error) {
+       // Return existing singleton, if we have it
+       fs.byIDLock.Lock()
+       existing := fs.byID[id]
+       fs.byIDLock.Unlock()
+       if existing != nil {
+               return existing, nil
+       }
+
+       coll, err := fs.getCollection(id)
+       if err != nil {
+               return nil, err
+       }
+       newfs, err := coll.FileSystem(fs, fs)
+       if err != nil {
+               return nil, err
+       }
+       cfs := newfs.(*collectionFileSystem)
+       cfs.SetParent(fs.byIDRoot, id)
+
+       // Check again in case another goroutine has added a node to
+       // fs.byID since we checked above.
+       fs.byIDLock.Lock()
+       defer fs.byIDLock.Unlock()
+       if existing = fs.byID[id]; existing != nil {
+               // Other goroutine won the race. Discard the node we
+               // just made, and return the race winner.
+               return existing, nil
+       }
+       // We won the race. Save the new node in fs.byID and
+       // fs.byIDRoot.
+       fs.byID[id] = cfs
+       fs.byIDRoot.Lock()
+       defer fs.byIDRoot.Unlock()
+       fs.byIDRoot.Child(id, func(inode) (inode, error) { return cfs, nil })
+       return cfs, nil
+}
+
+func (fs *customFileSystem) getCollection(id string) (*Collection, error) {
+       var coll Collection
+       err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, nil)
+       if statusErr, ok := err.(interface{ HTTPStatus() int }); ok && statusErr.HTTPStatus() == http.StatusNotFound {
+               return nil, os.ErrNotExist
+       } else if err != nil {
+               return nil, err
+       }
+       if len(id) != 27 {
+               // This means id is a PDH, and controller/railsapi
+               // returned one of (possibly) many collections with
+               // that PDH. Even if controller returns more fields
+               // besides PDH and manifest text (which are equal for
+               // all matching collections), we don't want to expose
+               // them (e.g., through Sys()).
+               coll = Collection{
+                       PortableDataHash: coll.PortableDataHash,
+                       ManifestText:     coll.ManifestText,
+               }
+       }
+       return &coll, nil
 }
 
 // vdirnode wraps an inode by rejecting (with ErrInvalidOperation)
@@ -202,15 +309,19 @@ func (fs *customFileSystem) newProjectNode(root inode, name, uuid string, proj *
 // treenode, or nil for ENOENT.
 type vdirnode struct {
        treenode
-       create func(parent inode, name string) inode
+       create func(parent inode, name string) (inode, error)
 }
 
 func (vn *vdirnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
        return vn.treenode.Child(name, func(existing inode) (inode, error) {
                if existing == nil && vn.create != nil {
-                       existing = vn.create(vn, name)
-                       if existing != nil {
-                               existing.SetParent(vn, name)
+                       newnode, err := vn.create(vn, name)
+                       if err != nil {
+                               return nil, err
+                       }
+                       if newnode != nil {
+                               newnode.SetParent(vn, name)
+                               existing = newnode
                                vn.treenode.fileinfo.modTime = time.Now()
                        }
                }
@@ -225,3 +336,53 @@ func (vn *vdirnode) Child(name string, replace func(inode) (inode, error)) (inod
                }
        })
 }
+
+// A hardlink can be used to mount an existing node at an additional
+// point in the same filesystem.
+type hardlink struct {
+       inode
+       parent inode
+       name   string
+}
+
+// If the wrapped inode is a filesystem, rootnode returns the wrapped
+// fs's rootnode, otherwise inode itself. This allows
+// (*fileSystem)Rename() to lock the root node of a hardlink-wrapped
+// filesystem.
+func (hl *hardlink) rootnode() inode {
+       if node, ok := hl.inode.(interface{ rootnode() inode }); ok {
+               return node.rootnode()
+       } else {
+               return hl.inode
+       }
+}
+
+func (hl *hardlink) Sync() error {
+       if node, ok := hl.inode.(syncer); ok {
+               return node.Sync()
+       } else {
+               return ErrInvalidOperation
+       }
+}
+
+func (hl *hardlink) SetParent(parent inode, name string) {
+       hl.Lock()
+       defer hl.Unlock()
+       hl.parent = parent
+       hl.name = name
+}
+
+func (hl *hardlink) Parent() inode {
+       hl.RLock()
+       defer hl.RUnlock()
+       return hl.parent
+}
+
+func (hl *hardlink) FileInfo() os.FileInfo {
+       fi := hl.inode.FileInfo()
+       if fi, ok := fi.(fileinfo); ok {
+               fi.name = hl.name
+               return fi
+       }
+       return fi
+}
index 3abe2b457f702b510a46f97ccf528f35f337eb92..c7d6b2a4646a33db1f0510b00d59bec812850fe6 100644 (file)
@@ -22,6 +22,7 @@ const (
        // Importing arvadostest would be an import cycle, so these
        // fixtures are duplicated here [until fs moves to a separate
        // package].
+       fixtureActiveUserUUID               = "zzzzz-tpzed-xurymjxw79nv3jz"
        fixtureActiveToken                  = "3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
        fixtureAProjectUUID                 = "zzzzz-j7d0g-v955i6s2oi1cbso"
        fixtureThisFilterGroupUUID          = "zzzzz-j7d0g-thisfiltergroup"
@@ -97,6 +98,55 @@ func (s *SiteFSSuite) TestUpdateStorageClasses(c *check.C) {
        c.Assert(err, check.ErrorMatches, `.*stub does not write storage class "archive"`)
 }
 
+func (s *SiteFSSuite) TestSameCollectionDifferentPaths(c *check.C) {
+       s.fs.MountProject("home", "")
+       var coll Collection
+       err := s.client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
+               "collection": map[string]interface{}{
+                       "owner_uuid": fixtureAProjectUUID,
+                       "name":       fmt.Sprintf("test collection %d", time.Now().UnixNano()),
+               },
+       })
+       c.Assert(err, check.IsNil)
+
+       viaProjID := "by_id/" + fixtureAProjectUUID + "/" + coll.Name
+       viaProjName := "home/A Project/" + coll.Name
+       viaCollID := "by_id/" + coll.UUID
+       for n, dirs := range [][]string{
+               {viaCollID, viaProjID, viaProjName},
+               {viaCollID, viaProjName, viaProjID},
+               {viaProjID, viaProjName, viaCollID},
+               {viaProjID, viaCollID, viaProjName},
+               {viaProjName, viaCollID, viaProjID},
+               {viaProjName, viaProjID, viaCollID},
+       } {
+               filename := fmt.Sprintf("file %d", n)
+               f := make([]File, 3)
+               for i, dir := range dirs {
+                       path := dir + "/" + filename
+                       mode := os.O_RDWR
+                       if i == 0 {
+                               mode |= os.O_CREATE
+                               c.Logf("create %s", path)
+                       } else {
+                               c.Logf("open %s", path)
+                       }
+                       f[i], err = s.fs.OpenFile(path, mode, 0777)
+                       c.Assert(err, check.IsNil, check.Commentf("n=%d i=%d path=%s", n, i, path))
+                       defer f[i].Close()
+               }
+               _, err = io.WriteString(f[0], filename)
+               c.Assert(err, check.IsNil)
+               _, err = f[1].Seek(0, io.SeekEnd)
+               c.Assert(err, check.IsNil)
+               _, err = io.WriteString(f[1], filename)
+               c.Assert(err, check.IsNil)
+               buf, err := io.ReadAll(f[2])
+               c.Assert(err, check.IsNil)
+               c.Check(string(buf), check.Equals, filename+filename)
+       }
+}
+
 func (s *SiteFSSuite) TestByUUIDAndPDH(c *check.C) {
        f, err := s.fs.Open("/by_id")
        c.Assert(err, check.IsNil)
index ae47414b7abe80b9c0e2a2ff0a5e7c36d7320dd2..5f9edb40fd308e2f06de5ccbb188757f12fd98ac 100644 (file)
@@ -20,7 +20,7 @@ func (fs *customFileSystem) usersLoadOne(parent inode, name string) (inode, erro
                return nil, os.ErrNotExist
        }
        user := resp.Items[0]
-       return fs.newProjectNode(parent, user.Username, user.UUID, nil), nil
+       return fs.newProjectDir(parent, user.Username, user.UUID, nil), nil
 }
 
 func (fs *customFileSystem) usersLoadAll(parent inode) ([]inode, error) {
@@ -41,7 +41,7 @@ func (fs *customFileSystem) usersLoadAll(parent inode) ([]inode, error) {
                        if user.Username == "" {
                                continue
                        }
-                       inodes = append(inodes, fs.newProjectNode(parent, user.Username, user.UUID, nil))
+                       inodes = append(inodes, fs.newProjectDir(parent, user.Username, user.UUID, nil))
                }
                params.Filters = []Filter{{"uuid", ">", resp.Items[len(resp.Items)-1].UUID}}
        }
index c20f61db26301be6be323d2097be5c55f3d17037..d39f3c6fcbfab8093649ab817612de6bee55b8f7 100644 (file)
@@ -5,11 +5,9 @@
 package arvadostest
 
 import (
-       "context"
-
-       "git.arvados.org/arvados.git/lib/ctrlctx"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/jmoiron/sqlx"
+
        // sqlx needs lib/pq to talk to PostgreSQL
        _ "github.com/lib/pq"
        "gopkg.in/check.v1"
@@ -21,14 +19,3 @@ func DB(c *check.C, cluster *arvados.Cluster) *sqlx.DB {
        c.Assert(err, check.IsNil)
        return db
 }
-
-// TransactionContext returns a context suitable for running a test
-// case in a new transaction, and a rollback func which the caller
-// should call after the test.
-func TransactionContext(c *check.C, db *sqlx.DB) (ctx context.Context, rollback func()) {
-       tx, err := db.Beginx()
-       c.Assert(err, check.IsNil)
-       return ctrlctx.NewWithTransaction(context.Background(), tx), func() {
-               c.Check(tx.Rollback(), check.IsNil)
-       }
-}
index ec55725412c381714014a09931441855f9393466..ac12f7ae13e93405b37a6814ed4e16bbda2b911e 100644 (file)
@@ -32,6 +32,7 @@ const (
        HelloWorldPdh           = "55713e6a34081eb03609e7ad5fcad129+62"
 
        MultilevelCollection1                        = "zzzzz-4zz18-pyw8yp9g3pr7irn"
+       MultilevelCollection1PDH                     = "f9ddda46bb293b6847da984e3aa735db+290"
        StorageClassesDesiredDefaultConfirmedDefault = "zzzzz-4zz18-3t236wr12769tga"
        StorageClassesDesiredArchiveConfirmedDefault = "zzzzz-4zz18-3t236wr12769qqa"
        EmptyCollectionUUID                          = "zzzzz-4zz18-gs9ooj1h9sd5mde"
@@ -83,8 +84,11 @@ const (
        Repository2UUID = "zzzzz-s0uqq-382brsig8rp3667"
        Repository2Name = "active/foo2"
 
-       FooCollectionSharingTokenUUID = "zzzzz-gj3su-gf02tdm4g1z3e3u"
-       FooCollectionSharingToken     = "iknqgmunrhgsyfok8uzjlwun9iscwm3xacmzmg65fa1j1lpdss"
+       FooFileCollectionUUID             = "zzzzz-4zz18-znfnqtbbv4spc3w"
+       FooFileCollectionSharingTokenUUID = "zzzzz-gj3su-gf02tdm4g1z3e3u"
+       FooFileCollectionSharingToken     = "iknqgmunrhgsyfok8uzjlwun9iscwm3xacmzmg65fa1j1lpdss"
+       BarFileCollectionUUID             = "zzzzz-4zz18-ehbhgtheo8909or"
+       BarFileCollectionPDH              = "fa7aeb5140e2848d39b416daeef4ffc5+45"
 
        WorkflowWithDefinitionYAMLUUID = "zzzzz-7fd4e-validworkfloyml"
 
index b5301dffe006ec280f379d56ca9330818eb029b6..caf99108a632ac44163a4e669fce9bb00366c078 100644 (file)
@@ -223,7 +223,8 @@ func (agg *Aggregator) ClusterHealth() ClusterHealthResponse {
        for svcName, sh := range resp.Services {
                switch svcName {
                case arvados.ServiceNameDispatchCloud,
-                       arvados.ServiceNameDispatchLSF:
+                       arvados.ServiceNameDispatchLSF,
+                       arvados.ServiceNameDispatchSLURM:
                        // ok to not run any given dispatcher
                case arvados.ServiceNameHealth,
                        arvados.ServiceNameWorkbench1,
index bac4a24fd5a037d9cdcafb663612677304184c97..89eecc6e276f843d2085b727193c0324ef8d4d03 100644 (file)
@@ -5,6 +5,7 @@
 package keepclient
 
 import (
+       "fmt"
        "io"
        "sort"
        "strconv"
@@ -93,8 +94,11 @@ func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
                                data = make([]byte, size, bufsize)
                                _, err = io.ReadFull(rdr, data)
                                err2 := rdr.Close()
-                               if err == nil {
-                                       err = err2
+                               if err == nil && err2 != nil {
+                                       err = fmt.Errorf("close(): %w", err2)
+                               }
+                               if err != nil {
+                                       err = fmt.Errorf("Get %s: %w", locator, err)
                                }
                        }
                        c.mtx.Lock()
index 1f2eab73afedd748070086e8083f8bccc2256af8..a5c3e63dde65ffcd935b016b7f255d6effffbe0e 100644 (file)
@@ -987,8 +987,8 @@ collection_with_list_prop_odd:
     listprop: [elem1, elem3, 5]
 
 collection_with_list_prop_even:
-  uuid: zzzzz-4zz18-listpropertyeven
-  current_version_uuid: zzzzz-4zz18-listpropertyeven
+  uuid: zzzzz-4zz18-listpropertyevn
+  current_version_uuid: zzzzz-4zz18-listpropertyevn
   portable_data_hash: fa7aeb5140e2848d39b416daeef4ffc5+45
   owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
   created_at: 2015-02-13T17:22:54Z
@@ -1002,8 +1002,8 @@ collection_with_list_prop_even:
     listprop: [elem2, 4, elem6, ELEM8]
 
 collection_with_listprop_elem1:
-  uuid: zzzzz-4zz18-listpropelem1
-  current_version_uuid: zzzzz-4zz18-listpropelem1
+  uuid: zzzzz-4zz18-listpropelemen1
+  current_version_uuid: zzzzz-4zz18-listpropelemen1
   portable_data_hash: fa7aeb5140e2848d39b416daeef4ffc5+45
   owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
   created_at: 2015-02-13T17:22:54Z
index 7ec8639abaa2cf1f99b22dd8d37cbb787475bcfe..db06d635092a3ee36daed2d3b72fe1b1b738a909 100644 (file)
@@ -5,6 +5,7 @@
 package keepweb
 
 import (
+       "net/http"
        "sync"
        "sync/atomic"
        "time"
@@ -20,74 +21,32 @@ import (
 const metricsUpdateInterval = time.Second / 10
 
 type cache struct {
-       cluster     *arvados.Cluster
-       logger      logrus.FieldLogger
-       registry    *prometheus.Registry
-       metrics     cacheMetrics
-       pdhs        *lru.TwoQueueCache
-       collections *lru.TwoQueueCache
-       sessions    *lru.TwoQueueCache
-       setupOnce   sync.Once
+       cluster   *arvados.Cluster
+       logger    logrus.FieldLogger
+       registry  *prometheus.Registry
+       metrics   cacheMetrics
+       sessions  *lru.TwoQueueCache
+       setupOnce sync.Once
 
-       chPruneSessions    chan struct{}
-       chPruneCollections chan struct{}
+       chPruneSessions chan struct{}
 }
 
 type cacheMetrics struct {
-       requests          prometheus.Counter
-       collectionBytes   prometheus.Gauge
-       collectionEntries prometheus.Gauge
-       sessionEntries    prometheus.Gauge
-       collectionHits    prometheus.Counter
-       pdhHits           prometheus.Counter
-       sessionHits       prometheus.Counter
-       sessionMisses     prometheus.Counter
-       apiCalls          prometheus.Counter
+       requests        prometheus.Counter
+       collectionBytes prometheus.Gauge
+       sessionEntries  prometheus.Gauge
+       sessionHits     prometheus.Counter
+       sessionMisses   prometheus.Counter
 }
 
 func (m *cacheMetrics) setup(reg *prometheus.Registry) {
-       m.requests = prometheus.NewCounter(prometheus.CounterOpts{
-               Namespace: "arvados",
-               Subsystem: "keepweb_collectioncache",
-               Name:      "requests",
-               Help:      "Number of targetID-to-manifest lookups handled.",
-       })
-       reg.MustRegister(m.requests)
-       m.collectionHits = prometheus.NewCounter(prometheus.CounterOpts{
-               Namespace: "arvados",
-               Subsystem: "keepweb_collectioncache",
-               Name:      "hits",
-               Help:      "Number of pdh-to-manifest cache hits.",
-       })
-       reg.MustRegister(m.collectionHits)
-       m.pdhHits = prometheus.NewCounter(prometheus.CounterOpts{
-               Namespace: "arvados",
-               Subsystem: "keepweb_collectioncache",
-               Name:      "pdh_hits",
-               Help:      "Number of uuid-to-pdh cache hits.",
-       })
-       reg.MustRegister(m.pdhHits)
-       m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
-               Namespace: "arvados",
-               Subsystem: "keepweb_collectioncache",
-               Name:      "api_calls",
-               Help:      "Number of outgoing API calls made by cache.",
-       })
-       reg.MustRegister(m.apiCalls)
        m.collectionBytes = prometheus.NewGauge(prometheus.GaugeOpts{
                Namespace: "arvados",
                Subsystem: "keepweb_sessions",
-               Name:      "cached_collection_bytes",
-               Help:      "Total size of all cached manifests and sessions.",
+               Name:      "cached_session_bytes",
+               Help:      "Total size of all cached sessions.",
        })
        reg.MustRegister(m.collectionBytes)
-       m.collectionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
-               Namespace: "arvados",
-               Subsystem: "keepweb_collectioncache",
-               Name:      "cached_manifests",
-               Help:      "Number of manifests in cache.",
-       })
-       reg.MustRegister(m.collectionEntries)
        m.sessionEntries = prometheus.NewGauge(prometheus.GaugeOpts{
                Namespace: "arvados",
                Subsystem: "keepweb_sessions",
@@ -111,21 +70,6 @@ func (m *cacheMetrics) setup(reg *prometheus.Registry) {
        reg.MustRegister(m.sessionMisses)
 }
 
-type cachedPDH struct {
-       expire  time.Time
-       refresh time.Time
-       pdh     string
-}
-
-type cachedCollection struct {
-       expire     time.Time
-       collection *arvados.Collection
-}
-
-type cachedPermission struct {
-       expire time.Time
-}
-
 type cachedSession struct {
        expire        time.Time
        fs            atomic.Value
@@ -137,14 +81,6 @@ type cachedSession struct {
 
 func (c *cache) setup() {
        var err error
-       c.pdhs, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxUUIDEntries)
-       if err != nil {
-               panic(err)
-       }
-       c.collections, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxCollectionEntries)
-       if err != nil {
-               panic(err)
-       }
        c.sessions, err = lru.New2Q(c.cluster.Collections.WebDAVCache.MaxSessions)
        if err != nil {
                panic(err)
@@ -160,12 +96,6 @@ func (c *cache) setup() {
                        c.updateGauges()
                }
        }()
-       c.chPruneCollections = make(chan struct{}, 1)
-       go func() {
-               for range c.chPruneCollections {
-                       c.pruneCollections()
-               }
-       }()
        c.chPruneSessions = make(chan struct{}, 1)
        go func() {
                for range c.chPruneSessions {
@@ -176,7 +106,6 @@ func (c *cache) setup() {
 
 func (c *cache) updateGauges() {
        c.metrics.collectionBytes.Set(float64(c.collectionBytes()))
-       c.metrics.collectionEntries.Set(float64(c.collections.Len()))
        c.metrics.sessionEntries.Set(float64(c.sessions.Len()))
 }
 
@@ -184,39 +113,6 @@ var selectPDH = map[string]interface{}{
        "select": []string{"portable_data_hash"},
 }
 
-// Update saves a modified version (fs) to an existing collection
-// (coll) and, if successful, updates the relevant cache entries so
-// subsequent calls to Get() reflect the modifications.
-func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
-       c.setupOnce.Do(c.setup)
-
-       m, err := fs.MarshalManifest(".")
-       if err != nil || m == coll.ManifestText {
-               return err
-       }
-       coll.ManifestText = m
-       var updated arvados.Collection
-       err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
-               "collection": map[string]string{
-                       "manifest_text": coll.ManifestText,
-               },
-       })
-       if err != nil {
-               c.pdhs.Remove(coll.UUID)
-               return err
-       }
-       c.collections.Add(client.AuthToken+"\000"+updated.PortableDataHash, &cachedCollection{
-               expire:     time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL)),
-               collection: &updated,
-       })
-       c.pdhs.Add(coll.UUID, &cachedPDH{
-               expire:  time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL)),
-               refresh: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.UUIDTTL)),
-               pdh:     updated.PortableDataHash,
-       })
-       return nil
-}
-
 // ResetSession unloads any potentially stale state. Should be called
 // after write operations, so subsequent reads don't return stale
 // data.
@@ -227,7 +123,7 @@ func (c *cache) ResetSession(token string) {
 
 // Get a long-lived CustomFileSystem suitable for doing a read operation
 // with the given token.
-func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, error) {
+func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, *arvados.User, error) {
        c.setupOnce.Do(c.setup)
        now := time.Now()
        ent, _ := c.sessions.Get(token)
@@ -241,12 +137,12 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSessi
                var err error
                sess.client, err = arvados.NewClientFromConfig(c.cluster)
                if err != nil {
-                       return nil, nil, err
+                       return nil, nil, nil, err
                }
                sess.client.AuthToken = token
                sess.arvadosclient, err = arvadosclient.New(sess.client)
                if err != nil {
-                       return nil, nil, err
+                       return nil, nil, nil, err
                }
                sess.keepclient = keepclient.New(sess.arvadosclient)
                c.sessions.Add(token, sess)
@@ -260,14 +156,29 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSessi
        case c.chPruneSessions <- struct{}{}:
        default:
        }
+
        fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
-       if fs != nil && !expired {
-               return fs, sess, nil
+       if fs == nil || expired {
+               fs = sess.client.SiteFileSystem(sess.keepclient)
+               fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
+               sess.fs.Store(fs)
        }
-       fs = sess.client.SiteFileSystem(sess.keepclient)
-       fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
-       sess.fs.Store(fs)
-       return fs, sess, nil
+
+       user, _ := sess.user.Load().(*arvados.User)
+       if user == nil || expired {
+               user = new(arvados.User)
+               err := sess.client.RequestAndDecode(user, "GET", "/arvados/v1/users/current", nil, nil)
+               if statusErr, ok := err.(interface{ HTTPStatus() int }); ok && statusErr.HTTPStatus() == http.StatusForbidden {
+                       // token is OK, but "get user id" api is out
+                       // of scope -- return nil, signifying unknown
+                       // user
+               } else if err != nil {
+                       return nil, nil, nil, err
+               }
+               sess.user.Store(user)
+       }
+
+       return fs, sess, user, nil
 }
 
 // Remove all expired session cache entries, then remove more entries
@@ -294,7 +205,7 @@ func (c *cache) pruneSessions() {
        }
        // Remove tokens until reaching size limit, starting with the
        // least frequently used entries (which Keys() returns last).
-       for i := len(keys) - 1; i >= 0 && size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2; i-- {
+       for i := len(keys) - 1; i >= 0 && size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes; i-- {
                if sizes[i] > 0 {
                        c.sessions.Remove(keys[i])
                        size -= sizes[i]
@@ -302,147 +213,10 @@ func (c *cache) pruneSessions() {
        }
 }
 
-func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
-       c.setupOnce.Do(c.setup)
-       c.metrics.requests.Inc()
-
-       var pdhRefresh bool
-       var pdh string
-       if arvadosclient.PDHMatch(targetID) {
-               pdh = targetID
-       } else if ent, cached := c.pdhs.Get(targetID); cached {
-               ent := ent.(*cachedPDH)
-               if ent.expire.Before(time.Now()) {
-                       c.pdhs.Remove(targetID)
-               } else {
-                       pdh = ent.pdh
-                       pdhRefresh = forceReload || time.Now().After(ent.refresh)
-                       c.metrics.pdhHits.Inc()
-               }
-       }
-
-       if pdh == "" {
-               // UUID->PDH mapping is not cached, might as well get
-               // the whole collection record and be done (below).
-               c.logger.Debugf("cache(%s): have no pdh", targetID)
-       } else if cached := c.lookupCollection(arv.ApiToken + "\000" + pdh); cached == nil {
-               // PDH->manifest is not cached, might as well get the
-               // whole collection record (below).
-               c.logger.Debugf("cache(%s): have pdh %s but manifest is not cached", targetID, pdh)
-       } else if !pdhRefresh {
-               // We looked up UUID->PDH very recently, and we still
-               // have the manifest for that PDH.
-               c.logger.Debugf("cache(%s): have pdh %s and refresh not needed", targetID, pdh)
-               return cached, nil
-       } else {
-               // Get current PDH for this UUID (and confirm we still
-               // have read permission).  Most likely, the cached PDH
-               // is still correct, in which case we can use our
-               // cached manifest.
-               c.metrics.apiCalls.Inc()
-               var current arvados.Collection
-               err := arv.Get("collections", targetID, selectPDH, &current)
-               if err != nil {
-                       return nil, err
-               }
-               if current.PortableDataHash == pdh {
-                       // PDH has not changed, cached manifest is
-                       // correct.
-                       c.logger.Debugf("cache(%s): verified cached pdh %s is still correct", targetID, pdh)
-                       return cached, nil
-               }
-               if cached := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); cached != nil {
-                       // PDH changed, and we already have the
-                       // manifest for that new PDH.
-                       c.logger.Debugf("cache(%s): cached pdh %s was stale, new pdh is %s and manifest is already in cache", targetID, pdh, current.PortableDataHash)
-                       return cached, nil
-               }
-       }
-
-       // Either UUID->PDH is not cached, or PDH->manifest is not
-       // cached.
-       var retrieved arvados.Collection
-       c.metrics.apiCalls.Inc()
-       err := arv.Get("collections", targetID, nil, &retrieved)
-       if err != nil {
-               return nil, err
-       }
-       c.logger.Debugf("cache(%s): retrieved manifest, caching with pdh %s", targetID, retrieved.PortableDataHash)
-       exp := time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.TTL))
-       if targetID != retrieved.PortableDataHash {
-               c.pdhs.Add(targetID, &cachedPDH{
-                       expire:  exp,
-                       refresh: time.Now().Add(time.Duration(c.cluster.Collections.WebDAVCache.UUIDTTL)),
-                       pdh:     retrieved.PortableDataHash,
-               })
-       }
-       c.collections.Add(arv.ApiToken+"\000"+retrieved.PortableDataHash, &cachedCollection{
-               expire:     exp,
-               collection: &retrieved,
-       })
-       if int64(len(retrieved.ManifestText)) > c.cluster.Collections.WebDAVCache.MaxCollectionBytes/int64(c.cluster.Collections.WebDAVCache.MaxCollectionEntries) {
-               select {
-               case c.chPruneCollections <- struct{}{}:
-               default:
-               }
-       }
-       return &retrieved, nil
-}
-
-// pruneCollections checks the total bytes occupied by manifest_text
-// in the collection cache and removes old entries as needed to bring
-// the total size down to CollectionBytes. It also deletes all expired
-// entries.
-//
-// pruneCollections does not aim to be perfectly correct when there is
-// concurrent cache activity.
-func (c *cache) pruneCollections() {
-       var size int64
-       now := time.Now()
-       keys := c.collections.Keys()
-       entsize := make([]int, len(keys))
-       expired := make([]bool, len(keys))
-       for i, k := range keys {
-               v, ok := c.collections.Peek(k)
-               if !ok {
-                       continue
-               }
-               ent := v.(*cachedCollection)
-               n := len(ent.collection.ManifestText)
-               size += int64(n)
-               entsize[i] = n
-               expired[i] = ent.expire.Before(now)
-       }
-       for i, k := range keys {
-               if expired[i] {
-                       c.collections.Remove(k)
-                       size -= int64(entsize[i])
-               }
-       }
-       for i, k := range keys {
-               if size <= c.cluster.Collections.WebDAVCache.MaxCollectionBytes/2 {
-                       break
-               }
-               if expired[i] {
-                       // already removed this entry in the previous loop
-                       continue
-               }
-               c.collections.Remove(k)
-               size -= int64(entsize[i])
-       }
-}
-
 // collectionBytes returns the approximate combined memory size of the
 // collection cache and session filesystem cache.
 func (c *cache) collectionBytes() uint64 {
        var size uint64
-       for _, k := range c.collections.Keys() {
-               v, ok := c.collections.Peek(k)
-               if !ok {
-                       continue
-               }
-               size += uint64(len(v.(*cachedCollection).collection.ManifestText))
-       }
        for _, token := range c.sessions.Keys() {
                ent, ok := c.sessions.Peek(token)
                if !ok {
@@ -454,49 +228,3 @@ func (c *cache) collectionBytes() uint64 {
        }
        return size
 }
-
-func (c *cache) lookupCollection(key string) *arvados.Collection {
-       e, cached := c.collections.Get(key)
-       if !cached {
-               return nil
-       }
-       ent := e.(*cachedCollection)
-       if ent.expire.Before(time.Now()) {
-               c.collections.Remove(key)
-               return nil
-       }
-       c.metrics.collectionHits.Inc()
-       return ent.collection
-}
-
-func (c *cache) GetTokenUser(token string) (*arvados.User, error) {
-       // Get and cache user record associated with this
-       // token.  We need to know their UUID for logging, and
-       // whether they are an admin or not for certain
-       // permission checks.
-
-       // Get/create session entry
-       _, sess, err := c.GetSession(token)
-       if err != nil {
-               return nil, err
-       }
-
-       // See if the user is already set, and if so, return it
-       user, _ := sess.user.Load().(*arvados.User)
-       if user != nil {
-               return user, nil
-       }
-
-       // Fetch the user record
-       c.metrics.apiCalls.Inc()
-       var current arvados.User
-
-       err = sess.client.RequestAndDecode(&current, "GET", "arvados/v1/users/current", nil, nil)
-       if err != nil {
-               return nil, err
-       }
-
-       // Stash the user record for next time
-       sess.user.Store(&current)
-       return &current, nil
-}
index 6b8f427171ef7813f8453c0b23ed968229295fb4..010e29a0b876105d49756d736adefd316878a12e 100644 (file)
@@ -6,17 +6,21 @@ package keepweb
 
 import (
        "bytes"
+       "net/http"
+       "net/http/httptest"
+       "regexp"
+       "strings"
+       "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
-       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/arvadostest"
-       "git.arvados.org/arvados.git/sdk/go/ctxlog"
-       "github.com/prometheus/client_golang/prometheus"
        "github.com/prometheus/common/expfmt"
        "gopkg.in/check.v1"
 )
 
-func (s *UnitSuite) checkCacheMetrics(c *check.C, reg *prometheus.Registry, regs ...string) {
+func (s *IntegrationSuite) checkCacheMetrics(c *check.C, regs ...string) {
+       s.handler.Cache.updateGauges()
+       reg := s.handler.Cache.registry
        mfs, err := reg.Gather()
        c.Check(err, check.IsNil)
        buf := &bytes.Buffer{}
@@ -25,131 +29,139 @@ func (s *UnitSuite) checkCacheMetrics(c *check.C, reg *prometheus.Registry, regs
                c.Check(enc.Encode(mf), check.IsNil)
        }
        mm := buf.String()
+       // Remove comments to make the "value vs. regexp" failure
+       // output easier to read.
+       mm = regexp.MustCompile(`(?m)^#.*\n`).ReplaceAllString(mm, "")
        for _, reg := range regs {
-               c.Check(mm, check.Matches, `(?ms).*collectioncache_`+reg+`\n.*`)
+               c.Check(mm, check.Matches, `(?ms).*keepweb_sessions_`+reg+`\n.*`)
        }
 }
 
-func (s *UnitSuite) TestCache(c *check.C) {
-       arv, err := arvadosclient.MakeArvadosClient()
-       c.Assert(err, check.Equals, nil)
-
-       cache := &cache{
-               cluster:  s.cluster,
-               logger:   ctxlog.TestLogger(c),
-               registry: prometheus.NewRegistry(),
-       }
-
+func (s *IntegrationSuite) TestCache(c *check.C) {
        // Hit the same collection 5 times using the same token. Only
        // the first req should cause an API call; the next 4 should
        // hit all caches.
-       arv.ApiToken = arvadostest.AdminToken
-       var coll *arvados.Collection
+       u := mustParseURL("http://" + arvadostest.FooCollection + ".keep-web.example/foo")
+       req := &http.Request{
+               Method:     "GET",
+               Host:       u.Host,
+               URL:        u,
+               RequestURI: u.RequestURI(),
+               Header: http.Header{
+                       "Authorization": {"Bearer " + arvadostest.ActiveToken},
+               },
+       }
        for i := 0; i < 5; i++ {
-               coll, err = cache.Get(arv, arvadostest.FooCollection, false)
-               c.Check(err, check.Equals, nil)
-               c.Assert(coll, check.NotNil)
-               c.Check(coll.PortableDataHash, check.Equals, arvadostest.FooCollectionPDH)
-               c.Check(coll.ManifestText[:2], check.Equals, ". ")
+               resp := httptest.NewRecorder()
+               s.handler.ServeHTTP(resp, req)
+               c.Check(resp.Code, check.Equals, http.StatusOK)
        }
-       s.checkCacheMetrics(c, cache.registry,
-               "requests 5",
+       s.checkCacheMetrics(c,
                "hits 4",
-               "pdh_hits 4",
-               "api_calls 1")
-
-       // Hit the same collection 2 more times, this time requesting
-       // it by PDH and using a different token. The first req should
-       // miss the permission cache and fetch the new manifest; the
-       // second should hit the Collection cache and skip the API
-       // lookup.
-       arv.ApiToken = arvadostest.ActiveToken
-
-       coll2, err := cache.Get(arv, arvadostest.FooCollectionPDH, false)
-       c.Check(err, check.Equals, nil)
-       c.Assert(coll2, check.NotNil)
-       c.Check(coll2.PortableDataHash, check.Equals, arvadostest.FooCollectionPDH)
-       c.Check(coll2.ManifestText[:2], check.Equals, ". ")
-       c.Check(coll2.ManifestText, check.Not(check.Equals), coll.ManifestText)
-
-       s.checkCacheMetrics(c, cache.registry,
-               "requests 6",
-               "hits 4",
-               "pdh_hits 4",
-               "api_calls 2")
-
-       coll2, err = cache.Get(arv, arvadostest.FooCollectionPDH, false)
-       c.Check(err, check.Equals, nil)
-       c.Assert(coll2, check.NotNil)
-       c.Check(coll2.PortableDataHash, check.Equals, arvadostest.FooCollectionPDH)
-       c.Check(coll2.ManifestText[:2], check.Equals, ". ")
-
-       s.checkCacheMetrics(c, cache.registry,
-               "requests 7",
-               "hits 5",
-               "pdh_hits 4",
-               "api_calls 2")
-
-       // Alternating between two collections N times should produce
-       // only 2 more API calls.
-       arv.ApiToken = arvadostest.AdminToken
-       for i := 0; i < 20; i++ {
-               var target string
-               if i%2 == 0 {
-                       target = arvadostest.HelloWorldCollection
-               } else {
-                       target = arvadostest.FooBarDirCollection
-               }
-               _, err := cache.Get(arv, target, false)
-               c.Check(err, check.Equals, nil)
+               "misses 1",
+               "active 1")
+
+       // Hit a shared collection 3 times using PDH, using a
+       // different token.
+       u2 := mustParseURL("http://" + strings.Replace(arvadostest.BarFileCollectionPDH, "+", "-", 1) + ".keep-web.example/bar")
+       req2 := &http.Request{
+               Method:     "GET",
+               Host:       u2.Host,
+               URL:        u2,
+               RequestURI: u2.RequestURI(),
+               Header: http.Header{
+                       "Authorization": {"Bearer " + arvadostest.SpectatorToken},
+               },
        }
-       s.checkCacheMetrics(c, cache.registry,
-               "requests 27",
-               "hits 23",
-               "pdh_hits 22",
-               "api_calls 4")
-}
-
-func (s *UnitSuite) TestCacheForceReloadByPDH(c *check.C) {
-       arv, err := arvadosclient.MakeArvadosClient()
-       c.Assert(err, check.Equals, nil)
-
-       cache := &cache{
-               cluster:  s.cluster,
-               logger:   ctxlog.TestLogger(c),
-               registry: prometheus.NewRegistry(),
+       for i := 0; i < 3; i++ {
+               resp2 := httptest.NewRecorder()
+               s.handler.ServeHTTP(resp2, req2)
+               c.Check(resp2.Code, check.Equals, http.StatusOK)
        }
-
-       for _, forceReload := range []bool{false, true, false, true} {
-               _, err := cache.Get(arv, arvadostest.FooCollectionPDH, forceReload)
-               c.Check(err, check.Equals, nil)
+       s.checkCacheMetrics(c,
+               "hits 6",
+               "misses 2",
+               "active 2")
+
+       // Alternating between two collections/tokens N times should
+       // use the existing sessions.
+       for i := 0; i < 7; i++ {
+               resp := httptest.NewRecorder()
+               s.handler.ServeHTTP(resp, req)
+               c.Check(resp.Code, check.Equals, http.StatusOK)
+
+               resp2 := httptest.NewRecorder()
+               s.handler.ServeHTTP(resp2, req2)
+               c.Check(resp2.Code, check.Equals, http.StatusOK)
        }
-
-       s.checkCacheMetrics(c, cache.registry,
-               "requests 4",
-               "hits 3",
-               "pdh_hits 0",
-               "api_calls 1")
+       s.checkCacheMetrics(c,
+               "hits 20",
+               "misses 2",
+               "active 2")
 }
 
-func (s *UnitSuite) TestCacheForceReloadByUUID(c *check.C) {
-       arv, err := arvadosclient.MakeArvadosClient()
-       c.Assert(err, check.Equals, nil)
-
-       cache := &cache{
-               cluster:  s.cluster,
-               logger:   ctxlog.TestLogger(c),
-               registry: prometheus.NewRegistry(),
-       }
-
-       for _, forceReload := range []bool{false, true, false, true} {
-               _, err := cache.Get(arv, arvadostest.FooCollection, forceReload)
-               c.Check(err, check.Equals, nil)
-       }
+func (s *IntegrationSuite) TestForceReloadPDH(c *check.C) {
+       filename := strings.Replace(time.Now().Format(time.RFC3339Nano), ":", ".", -1)
+       manifest := ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:" + filename + "\n"
+       pdh := arvados.PortableDataHash(manifest)
+       client := arvados.NewClientFromEnv()
+       client.AuthToken = arvadostest.ActiveToken
+
+       _, resp := s.do("GET", "http://"+strings.Replace(pdh, "+", "-", 1)+".keep-web.example/"+filename, arvadostest.ActiveToken, nil)
+       c.Check(resp.Code, check.Equals, http.StatusNotFound)
+
+       var coll arvados.Collection
+       err := client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
+               "collection": map[string]string{
+                       "manifest_text": manifest,
+               },
+       })
+       c.Assert(err, check.IsNil)
+       defer client.RequestAndDecode(nil, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil)
+       c.Assert(coll.PortableDataHash, check.Equals, pdh)
+
+       _, resp = s.do("GET", "http://"+strings.Replace(pdh, "+", "-", 1)+".keep-web.example/"+filename, "", http.Header{
+               "Authorization": {"Bearer " + arvadostest.ActiveToken},
+               "Cache-Control": {"must-revalidate"},
+       })
+       c.Check(resp.Code, check.Equals, http.StatusOK)
+
+       _, resp = s.do("GET", "http://"+strings.Replace(pdh, "+", "-", 1)+".keep-web.example/missingfile", "", http.Header{
+               "Authorization": {"Bearer " + arvadostest.ActiveToken},
+               "Cache-Control": {"must-revalidate"},
+       })
+       c.Check(resp.Code, check.Equals, http.StatusNotFound)
+}
 
-       s.checkCacheMetrics(c, cache.registry,
-               "requests 4",
-               "hits 3",
-               "pdh_hits 3",
-               "api_calls 3")
+func (s *IntegrationSuite) TestForceReloadUUID(c *check.C) {
+       client := arvados.NewClientFromEnv()
+       client.AuthToken = arvadostest.ActiveToken
+       var coll arvados.Collection
+       err := client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
+               "collection": map[string]string{
+                       "manifest_text": ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:oldfile\n",
+               },
+       })
+       c.Assert(err, check.IsNil)
+       defer client.RequestAndDecode(nil, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil)
+
+       _, resp := s.do("GET", "http://"+coll.UUID+".keep-web.example/newfile", arvadostest.ActiveToken, nil)
+       c.Check(resp.Code, check.Equals, http.StatusNotFound)
+       _, resp = s.do("GET", "http://"+coll.UUID+".keep-web.example/oldfile", arvadostest.ActiveToken, nil)
+       c.Check(resp.Code, check.Equals, http.StatusOK)
+       _, resp = s.do("GET", "http://"+coll.UUID+".keep-web.example/newfile", arvadostest.ActiveToken, nil)
+       c.Check(resp.Code, check.Equals, http.StatusNotFound)
+       err = client.RequestAndDecode(&coll, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
+               "collection": map[string]string{
+                       "manifest_text": ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:oldfile 0:0:newfile\n",
+               },
+       })
+       c.Assert(err, check.IsNil)
+       _, resp = s.do("GET", "http://"+coll.UUID+".keep-web.example/newfile", arvadostest.ActiveToken, nil)
+       c.Check(resp.Code, check.Equals, http.StatusNotFound)
+       _, resp = s.do("GET", "http://"+coll.UUID+".keep-web.example/newfile", "", http.Header{
+               "Authorization": {"Bearer " + arvadostest.ActiveToken},
+               "Cache-Control": {"must-revalidate"},
+       })
+       c.Check(resp.Code, check.Equals, http.StatusOK)
 }
index 0c75ac56cfc4d69d5845ec332196942d5998cdf7..0e964e463248ff73ab5aaec0ec53d43581236a77 100644 (file)
@@ -14,12 +14,12 @@ import (
        "net/http"
        "net/url"
        "os"
-       "path/filepath"
        "sort"
        "strconv"
        "strings"
        "sync"
 
+       "git.arvados.org/arvados.git/lib/cmd"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/auth"
@@ -31,17 +31,16 @@ import (
 )
 
 type handler struct {
-       Cache      cache
-       Cluster    *arvados.Cluster
-       clientPool *arvadosclient.ClientPool
-       setupOnce  sync.Once
-       webdavLS   webdav.LockSystem
+       Cache     cache
+       Cluster   *arvados.Cluster
+       setupOnce sync.Once
+       webdavLS  webdav.LockSystem
 }
 
 var urlPDHDecoder = strings.NewReplacer(" ", "+", "-", "+")
 
-var notFoundMessage = "404 Not found\r\n\r\nThe requested path was not found, or you do not have permission to access it.\r"
-var unauthorizedMessage = "401 Unauthorized\r\n\r\nA valid Arvados token must be provided to access this resource.\r"
+var notFoundMessage = "Not Found"
+var unauthorizedMessage = "401 Unauthorized\r\n\r\nA valid Arvados token must be provided to access this resource.\r\n"
 
 // parseCollectionIDFromURL returns a UUID or PDH if s is a UUID or a
 // PDH (even if it is a PDH with "+" replaced by " " or "-");
@@ -57,10 +56,6 @@ func parseCollectionIDFromURL(s string) string {
 }
 
 func (h *handler) setup() {
-       // Errors will be handled at the client pool.
-       arv, _ := arvados.NewClientFromConfig(h.Cluster)
-       h.clientPool = arvadosclient.MakeClientPoolWith(arv)
-
        keepclient.DefaultBlockCache.MaxBlocks = h.Cluster.Collections.WebDAVCache.MaxBlockEntries
 
        // Even though we don't accept LOCK requests, every webdav
@@ -69,14 +64,15 @@ func (h *handler) setup() {
 }
 
 func (h *handler) serveStatus(w http.ResponseWriter, r *http.Request) {
-       json.NewEncoder(w).Encode(struct{ Version string }{version})
+       json.NewEncoder(w).Encode(struct{ Version string }{cmd.Version.String()})
 }
 
 // updateOnSuccess wraps httpserver.ResponseWriter. If the handler
 // sends an HTTP header indicating success, updateOnSuccess first
-// calls the provided update func. If the update func fails, a 500
-// response is sent, and the status code and body sent by the handler
-// are ignored (all response writes return the update error).
+// calls the provided update func. If the update func fails, an error
+// response is sent (using the error's HTTP status or 500 if none),
+// and the status code and body sent by the handler are ignored (all
+// response writes return the update error).
 type updateOnSuccess struct {
        httpserver.ResponseWriter
        logger     logrus.FieldLogger
@@ -101,10 +97,11 @@ func (uos *updateOnSuccess) WriteHeader(code int) {
                if code >= 200 && code < 400 {
                        if uos.err = uos.update(); uos.err != nil {
                                code := http.StatusInternalServerError
-                               if err, ok := uos.err.(*arvados.TransactionError); ok {
-                                       code = err.StatusCode
+                               var he interface{ HTTPStatus() int }
+                               if errors.As(uos.err, &he) {
+                                       code = he.HTTPStatus()
                                }
-                               uos.logger.WithError(uos.err).Errorf("update() returned error type %T, changing response to HTTP %d", uos.err, code)
+                               uos.logger.WithError(uos.err).Errorf("update() returned %T error, changing response to HTTP %d", uos.err, code)
                                http.Error(uos.ResponseWriter, uos.err.Error(), code)
                                return
                        }
@@ -272,11 +269,6 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                }
        }
 
-       if collectionID == "" && !useSiteFS {
-               http.Error(w, notFoundMessage, http.StatusNotFound)
-               return
-       }
-
        forceReload := false
        if cc := r.Header.Get("Cache-Control"); strings.Contains(cc, "no-cache") || strings.Contains(cc, "must-revalidate") {
                forceReload = true
@@ -317,11 +309,6 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                return
        }
 
-       if useSiteFS {
-               h.serveSiteFS(w, r, reqTokens, credentialsOK, attachment)
-               return
-       }
-
        targetPath := pathParts[stripParts:]
        if tokens == nil && len(targetPath) > 0 && strings.HasPrefix(targetPath[0], "t=") {
                // http://ID.example/t=TOKEN/PATH...
@@ -336,6 +323,25 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                stripParts++
        }
 
+       fsprefix := ""
+       if useSiteFS {
+               if writeMethod[r.Method] {
+                       http.Error(w, errReadOnly.Error(), http.StatusMethodNotAllowed)
+                       return
+               }
+               if len(reqTokens) == 0 {
+                       w.Header().Add("WWW-Authenticate", "Basic realm=\"collections\"")
+                       http.Error(w, unauthorizedMessage, http.StatusUnauthorized)
+                       return
+               }
+               tokens = reqTokens
+       } else if collectionID == "" {
+               http.Error(w, notFoundMessage, http.StatusNotFound)
+               return
+       } else {
+               fsprefix = "by_id/" + collectionID + "/"
+       }
+
        if tokens == nil {
                tokens = reqTokens
                if h.Cluster.Users.AnonymousUserToken != "" {
@@ -363,37 +369,60 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                stripParts++
        }
 
-       arv := h.clientPool.Get()
-       if arv == nil {
-               http.Error(w, "client pool error: "+h.clientPool.Err().Error(), http.StatusInternalServerError)
-               return
+       dirOpenMode := os.O_RDONLY
+       if writeMethod[r.Method] {
+               dirOpenMode = os.O_RDWR
        }
-       defer h.clientPool.Put(arv)
 
-       var collection *arvados.Collection
+       validToken := make(map[string]bool)
+       var token string
        var tokenUser *arvados.User
-       tokenResult := make(map[string]int)
-       for _, arv.ApiToken = range tokens {
-               var err error
-               collection, err = h.Cache.Get(arv, collectionID, forceReload)
-               if err == nil {
-                       // Success
-                       break
+       var sessionFS arvados.CustomFileSystem
+       var session *cachedSession
+       var collectionDir arvados.File
+       for _, token = range tokens {
+               var statusErr interface{ HTTPStatus() int }
+               fs, sess, user, err := h.Cache.GetSession(token)
+               if errors.As(err, &statusErr) && statusErr.HTTPStatus() == http.StatusUnauthorized {
+                       // bad token
+                       continue
+               } else if err != nil {
+                       http.Error(w, "cache error: "+err.Error(), http.StatusInternalServerError)
+                       return
+               }
+               f, err := fs.OpenFile(fsprefix, dirOpenMode, 0)
+               if errors.As(err, &statusErr) && statusErr.HTTPStatus() == http.StatusForbidden {
+                       // collection id is outside token scope
+                       validToken[token] = true
+                       continue
+               }
+               validToken[token] = true
+               if os.IsNotExist(err) {
+                       // collection does not exist or is not
+                       // readable using this token
+                       continue
+               } else if err != nil {
+                       http.Error(w, err.Error(), http.StatusInternalServerError)
+                       return
                }
-               if srvErr, ok := err.(arvadosclient.APIServerError); ok {
-                       switch srvErr.HttpStatusCode {
-                       case 404, 401:
-                               // Token broken or insufficient to
-                               // retrieve collection
-                               tokenResult[arv.ApiToken] = srvErr.HttpStatusCode
-                               continue
+               defer f.Close()
+
+               collectionDir, sessionFS, session, tokenUser = f, fs, sess, user
+               break
+       }
+       if forceReload {
+               err := collectionDir.Sync()
+               if err != nil {
+                       var statusErr interface{ HTTPStatus() int }
+                       if errors.As(err, &statusErr) {
+                               http.Error(w, err.Error(), statusErr.HTTPStatus())
+                       } else {
+                               http.Error(w, err.Error(), http.StatusInternalServerError)
                        }
+                       return
                }
-               // Something more serious is wrong
-               http.Error(w, "cache error: "+err.Error(), http.StatusInternalServerError)
-               return
        }
-       if collection == nil {
+       if session == nil {
                if pathToken || !credentialsOK {
                        // Either the URL is a "secret sharing link"
                        // that didn't work out (and asking the client
@@ -404,9 +433,9 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                        return
                }
                for _, t := range reqTokens {
-                       if tokenResult[t] == 404 {
-                               // The client provided valid token(s), but the
-                               // collection was not found.
+                       if validToken[t] {
+                               // The client provided valid token(s),
+                               // but the collection was not found.
                                http.Error(w, notFoundMessage, http.StatusNotFound)
                                return
                        }
@@ -453,215 +482,108 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                return
        }
 
-       kc, err := keepclient.MakeKeepClient(arv)
-       if err != nil {
-               http.Error(w, "error setting up keep client: "+err.Error(), http.StatusInternalServerError)
-               return
+       if r.Method == http.MethodGet || r.Method == http.MethodHead {
+               targetfnm := fsprefix + strings.Join(pathParts[stripParts:], "/")
+               if fi, err := sessionFS.Stat(targetfnm); err == nil && fi.IsDir() {
+                       if !strings.HasSuffix(r.URL.Path, "/") {
+                               h.seeOtherWithCookie(w, r, r.URL.Path+"/", credentialsOK)
+                       } else {
+                               h.serveDirectory(w, r, fi.Name(), sessionFS, targetfnm, !useSiteFS)
+                       }
+                       return
+               }
        }
-       kc.RequestID = r.Header.Get("X-Request-Id")
 
        var basename string
        if len(targetPath) > 0 {
                basename = targetPath[len(targetPath)-1]
        }
-       applyContentDispositionHdr(w, r, basename, attachment)
-
-       client := (&arvados.Client{
-               APIHost:   arv.ApiServer,
-               AuthToken: arv.ApiToken,
-               Insecure:  arv.ApiInsecure,
-       }).WithRequestID(r.Header.Get("X-Request-Id"))
-
-       fs, err := collection.FileSystem(client, kc)
-       if err != nil {
-               http.Error(w, "error creating collection filesystem: "+err.Error(), http.StatusInternalServerError)
-               return
-       }
-
-       writefs, writeOK := fs.(arvados.CollectionFileSystem)
-       targetIsPDH := arvadosclient.PDHMatch(collectionID)
-       if (targetIsPDH || !writeOK) && writeMethod[r.Method] {
+       if arvadosclient.PDHMatch(collectionID) && writeMethod[r.Method] {
                http.Error(w, errReadOnly.Error(), http.StatusMethodNotAllowed)
                return
        }
-
-       // Check configured permission
-       _, sess, err := h.Cache.GetSession(arv.ApiToken)
-       if err != nil {
-               http.Error(w, "session cache: "+err.Error(), http.StatusInternalServerError)
-       }
-       tokenUser, err = h.Cache.GetTokenUser(arv.ApiToken)
-       if e := (interface{ HTTPStatus() int })(nil); errors.As(err, &e) && e.HTTPStatus() == http.StatusForbidden {
-               // Ignore expected error looking up user record when
-               // using a scoped token that allows getting
-               // collections/X but not users/current
-       } else if err != nil {
-               http.Error(w, "user lookup: "+err.Error(), http.StatusInternalServerError)
+       if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
+               http.Error(w, "Not permitted", http.StatusForbidden)
+               return
        }
+       h.logUploadOrDownload(r, session.arvadosclient, sessionFS, fsprefix+strings.Join(targetPath, "/"), nil, tokenUser)
 
-       if webdavMethod[r.Method] {
-               if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
-                       http.Error(w, "Not permitted", http.StatusForbidden)
+       if writeMethod[r.Method] {
+               // Save the collection only if/when all
+               // webdav->filesystem operations succeed --
+               // and send a 500 error if the modified
+               // collection can't be saved.
+               //
+               // Perform the write in a separate sitefs, so
+               // concurrent read operations on the same
+               // collection see the previous saved
+               // state. After the write succeeds and the
+               // collection record is updated, we reset the
+               // session so the updates are visible in
+               // subsequent read requests.
+               client := session.client.WithRequestID(r.Header.Get("X-Request-Id"))
+               sessionFS = client.SiteFileSystem(session.keepclient)
+               writingDir, err := sessionFS.OpenFile(fsprefix, os.O_RDONLY, 0)
+               if err != nil {
+                       http.Error(w, err.Error(), http.StatusInternalServerError)
                        return
                }
-               h.logUploadOrDownload(r, sess.arvadosclient, nil, strings.Join(targetPath, "/"), collection, tokenUser)
-
-               if writeMethod[r.Method] {
-                       // Save the collection only if/when all
-                       // webdav->filesystem operations succeed --
-                       // and send a 500 error if the modified
-                       // collection can't be saved.
-                       w = &updateOnSuccess{
-                               ResponseWriter: w,
-                               logger:         ctxlog.FromContext(r.Context()),
-                               update: func() error {
-                                       return h.Cache.Update(client, *collection, writefs)
-                               }}
-               }
-               h := webdav.Handler{
-                       Prefix: "/" + strings.Join(pathParts[:stripParts], "/"),
-                       FileSystem: &webdavFS{
-                               collfs:        fs,
-                               writing:       writeMethod[r.Method],
-                               alwaysReadEOF: r.Method == "PROPFIND",
-                       },
-                       LockSystem: h.webdavLS,
-                       Logger: func(_ *http.Request, err error) {
+               defer writingDir.Close()
+               w = &updateOnSuccess{
+                       ResponseWriter: w,
+                       logger:         ctxlog.FromContext(r.Context()),
+                       update: func() error {
+                               err := writingDir.Sync()
+                               var te arvados.TransactionError
+                               if errors.As(err, &te) {
+                                       err = te
+                               }
                                if err != nil {
-                                       ctxlog.FromContext(r.Context()).WithError(err).Error("error reported by webdav handler")
+                                       return err
                                }
-                       },
-               }
-               h.ServeHTTP(w, r)
-               return
-       }
-
-       openPath := "/" + strings.Join(targetPath, "/")
-       f, err := fs.Open(openPath)
-       if os.IsNotExist(err) {
-               // Requested non-existent path
-               http.Error(w, notFoundMessage, http.StatusNotFound)
-               return
-       } else if err != nil {
-               // Some other (unexpected) error
-               http.Error(w, "open: "+err.Error(), http.StatusInternalServerError)
-               return
-       }
-       defer f.Close()
-       if stat, err := f.Stat(); err != nil {
-               // Can't get Size/IsDir (shouldn't happen with a collectionFS!)
-               http.Error(w, "stat: "+err.Error(), http.StatusInternalServerError)
-       } else if stat.IsDir() && !strings.HasSuffix(r.URL.Path, "/") {
-               // If client requests ".../dirname", redirect to
-               // ".../dirname/". This way, relative links in the
-               // listing for "dirname" can always be "fnm", never
-               // "dirname/fnm".
-               h.seeOtherWithCookie(w, r, r.URL.Path+"/", credentialsOK)
-       } else if stat.IsDir() {
-               h.serveDirectory(w, r, collection.Name, fs, openPath, true)
-       } else {
-               if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
-                       http.Error(w, "Not permitted", http.StatusForbidden)
-                       return
-               }
-               h.logUploadOrDownload(r, sess.arvadosclient, nil, strings.Join(targetPath, "/"), collection, tokenUser)
-
-               http.ServeContent(w, r, basename, stat.ModTime(), f)
-               if wrote := int64(w.WroteBodyBytes()); wrote != stat.Size() && w.WroteStatus() == http.StatusOK {
-                       // If we wrote fewer bytes than expected, it's
-                       // too late to change the real response code
-                       // or send an error message to the client, but
-                       // at least we can try to put some useful
-                       // debugging info in the logs.
-                       n, err := f.Read(make([]byte, 1024))
-                       ctxlog.FromContext(r.Context()).Errorf("stat.Size()==%d but only wrote %d bytes; read(1024) returns %d, %v", stat.Size(), wrote, n, err)
-               }
-       }
-}
-
-func (h *handler) getClients(reqID, token string) (arv *arvadosclient.ArvadosClient, kc *keepclient.KeepClient, client *arvados.Client, release func(), err error) {
-       arv = h.clientPool.Get()
-       if arv == nil {
-               err = h.clientPool.Err()
-               return
-       }
-       release = func() { h.clientPool.Put(arv) }
-       arv.ApiToken = token
-       kc, err = keepclient.MakeKeepClient(arv)
-       if err != nil {
-               release()
-               return
-       }
-       kc.RequestID = reqID
-       client = (&arvados.Client{
-               APIHost:   arv.ApiServer,
-               AuthToken: arv.ApiToken,
-               Insecure:  arv.ApiInsecure,
-       }).WithRequestID(reqID)
-       return
-}
-
-func (h *handler) serveSiteFS(w http.ResponseWriter, r *http.Request, tokens []string, credentialsOK, attachment bool) {
-       if len(tokens) == 0 {
-               w.Header().Add("WWW-Authenticate", "Basic realm=\"collections\"")
-               http.Error(w, unauthorizedMessage, http.StatusUnauthorized)
-               return
-       }
-       if writeMethod[r.Method] {
-               http.Error(w, errReadOnly.Error(), http.StatusMethodNotAllowed)
-               return
-       }
-
-       fs, sess, err := h.Cache.GetSession(tokens[0])
-       if err != nil {
-               http.Error(w, err.Error(), http.StatusInternalServerError)
-               return
-       }
-       fs.ForwardSlashNameSubstitution(h.Cluster.Collections.ForwardSlashNameSubstitution)
-       f, err := fs.Open(r.URL.Path)
-       if os.IsNotExist(err) {
-               http.Error(w, err.Error(), http.StatusNotFound)
-               return
-       } else if err != nil {
-               http.Error(w, err.Error(), http.StatusInternalServerError)
-               return
-       }
-       defer f.Close()
-       if fi, err := f.Stat(); err == nil && fi.IsDir() && r.Method == "GET" {
-               if !strings.HasSuffix(r.URL.Path, "/") {
-                       h.seeOtherWithCookie(w, r, r.URL.Path+"/", credentialsOK)
-               } else {
-                       h.serveDirectory(w, r, fi.Name(), fs, r.URL.Path, false)
-               }
-               return
-       }
-
-       tokenUser, err := h.Cache.GetTokenUser(tokens[0])
-       if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
-               http.Error(w, "Not permitted", http.StatusForbidden)
-               return
+                               // Sync the changes to the persistent
+                               // sessionfs for this token.
+                               snap, err := writingDir.Snapshot()
+                               if err != nil {
+                                       return err
+                               }
+                               collectionDir.Splice(snap)
+                               return nil
+                       }}
        }
-       h.logUploadOrDownload(r, sess.arvadosclient, fs, r.URL.Path, nil, tokenUser)
-
-       if r.Method == "GET" {
-               _, basename := filepath.Split(r.URL.Path)
+       if r.Method == http.MethodGet {
                applyContentDispositionHdr(w, r, basename, attachment)
        }
        wh := webdav.Handler{
-               Prefix: "/",
+               Prefix: "/" + strings.Join(pathParts[:stripParts], "/"),
                FileSystem: &webdavFS{
-                       collfs:        fs,
+                       collfs:        sessionFS,
+                       prefix:        fsprefix,
                        writing:       writeMethod[r.Method],
                        alwaysReadEOF: r.Method == "PROPFIND",
                },
                LockSystem: h.webdavLS,
-               Logger: func(_ *http.Request, err error) {
+               Logger: func(r *http.Request, err error) {
                        if err != nil {
                                ctxlog.FromContext(r.Context()).WithError(err).Error("error reported by webdav handler")
                        }
                },
        }
        wh.ServeHTTP(w, r)
+       if r.Method == http.MethodGet && w.WroteStatus() == http.StatusOK {
+               wrote := int64(w.WroteBodyBytes())
+               fnm := strings.Join(pathParts[stripParts:], "/")
+               fi, err := wh.FileSystem.Stat(r.Context(), fnm)
+               if err == nil && fi.Size() != wrote {
+                       var n int
+                       f, err := wh.FileSystem.OpenFile(r.Context(), fnm, os.O_RDONLY, 0)
+                       if err == nil {
+                               n, err = f.Read(make([]byte, 1024))
+                               f.Close()
+                       }
+                       ctxlog.FromContext(r.Context()).Errorf("stat.Size()==%d but only wrote %d bytes; read(1024) returns %d, %v", fi.Size(), wrote, n, err)
+               }
+       }
 }
 
 var dirListingTemplate = `<!DOCTYPE HTML>
@@ -971,9 +893,14 @@ func (h *handler) logUploadOrDownload(
 
 func (h *handler) determineCollection(fs arvados.CustomFileSystem, path string) (*arvados.Collection, string) {
        target := strings.TrimSuffix(path, "/")
-       for {
+       for cut := len(target); cut >= 0; cut = strings.LastIndexByte(target, '/') {
+               target = target[:cut]
                fi, err := fs.Stat(target)
-               if err != nil {
+               if os.IsNotExist(err) {
+                       // creating a new file/dir, or download
+                       // destined to fail
+                       continue
+               } else if err != nil {
                        return nil, ""
                }
                switch src := fi.Sys().(type) {
@@ -986,11 +913,6 @@ func (h *handler) determineCollection(fs arvados.CustomFileSystem, path string)
                                return nil, ""
                        }
                }
-               // Try parent
-               cut := strings.LastIndexByte(target, '/')
-               if cut < 0 {
-                       return nil, ""
-               }
-               target = target[:cut]
        }
+       return nil, ""
 }
index 768013185ae7d50dc450fecd342ebcdc343db059..0f7d5078790deab8eed80f9fadfd03b2d4d7ed2d 100644 (file)
@@ -366,6 +366,24 @@ func (s *IntegrationSuite) TestVhostPortMatch(c *check.C) {
        }
 }
 
+func (s *IntegrationSuite) do(method string, urlstring string, token string, hdr http.Header) (*http.Request, *httptest.ResponseRecorder) {
+       u := mustParseURL(urlstring)
+       if hdr == nil && token != "" {
+               hdr = http.Header{"Authorization": {"Bearer " + token}}
+       } else if hdr == nil {
+               hdr = http.Header{}
+       } else if token != "" {
+               panic("must not pass both token and hdr")
+       }
+       return s.doReq(&http.Request{
+               Method:     method,
+               Host:       u.Host,
+               URL:        u,
+               RequestURI: u.RequestURI(),
+               Header:     hdr,
+       })
+}
+
 func (s *IntegrationSuite) doReq(req *http.Request) (*http.Request, *httptest.ResponseRecorder) {
        resp := httptest.NewRecorder()
        s.handler.ServeHTTP(resp, req)
@@ -409,6 +427,26 @@ func (s *IntegrationSuite) TestSingleOriginSecretLink(c *check.C) {
        )
 }
 
+func (s *IntegrationSuite) TestCollectionSharingToken(c *check.C) {
+       s.testVhostRedirectTokenToCookie(c, "GET",
+               "example.com/c="+arvadostest.FooFileCollectionUUID+"/t="+arvadostest.FooFileCollectionSharingToken+"/foo",
+               "",
+               nil,
+               "",
+               http.StatusOK,
+               "foo",
+       )
+       // Same valid sharing token, but requesting a different collection
+       s.testVhostRedirectTokenToCookie(c, "GET",
+               "example.com/c="+arvadostest.FooCollection+"/t="+arvadostest.FooFileCollectionSharingToken+"/foo",
+               "",
+               nil,
+               "",
+               http.StatusNotFound,
+               notFoundMessage+"\n",
+       )
+}
+
 // Bad token in URL is 404 Not Found because it doesn't make sense to
 // retry the same URL with different authorization.
 func (s *IntegrationSuite) TestSingleOriginSecretLinkBadToken(c *check.C) {
@@ -1245,7 +1283,7 @@ func copyHeader(h http.Header) http.Header {
 }
 
 func (s *IntegrationSuite) checkUploadDownloadRequest(c *check.C, req *http.Request,
-       successCode int, direction string, perm bool, userUuid string, collectionUuid string, filepath string) {
+       successCode int, direction string, perm bool, userUuid, collectionUuid, collectionPDH, filepath string) {
 
        client := arvados.NewClientFromEnv()
        client.AuthToken = arvadostest.AdminToken
@@ -1258,6 +1296,7 @@ func (s *IntegrationSuite) checkUploadDownloadRequest(c *check.C, req *http.Requ
        c.Check(err, check.IsNil)
        c.Check(logentries.Items, check.HasLen, 1)
        lastLogId := logentries.Items[0].ID
+       c.Logf("lastLogId: %d", lastLogId)
 
        var logbuf bytes.Buffer
        logger := logrus.New()
@@ -1274,6 +1313,7 @@ func (s *IntegrationSuite) checkUploadDownloadRequest(c *check.C, req *http.Requ
                deadline := time.Now().Add(time.Second)
                for {
                        c.Assert(time.Now().After(deadline), check.Equals, false, check.Commentf("timed out waiting for log entry"))
+                       logentries = arvados.LogList{}
                        err = client.RequestAndDecode(&logentries, "GET", "arvados/v1/logs", nil,
                                arvados.ResourceListParams{
                                        Filters: []arvados.Filter{
@@ -1288,6 +1328,7 @@ func (s *IntegrationSuite) checkUploadDownloadRequest(c *check.C, req *http.Requ
                                logentries.Items[0].ID > lastLogId &&
                                logentries.Items[0].ObjectUUID == userUuid &&
                                logentries.Items[0].Properties["collection_uuid"] == collectionUuid &&
+                               (collectionPDH == "" || logentries.Items[0].Properties["portable_data_hash"] == collectionPDH) &&
                                logentries.Items[0].Properties["collection_file_path"] == filepath {
                                break
                        }
@@ -1321,7 +1362,7 @@ func (s *IntegrationSuite) TestDownloadLoggingPermission(c *check.C) {
                                },
                        }
                        s.checkUploadDownloadRequest(c, req, http.StatusOK, "download", adminperm,
-                               arvadostest.AdminUserUUID, arvadostest.FooCollection, "foo")
+                               arvadostest.AdminUserUUID, arvadostest.FooCollection, arvadostest.FooCollectionPDH, "foo")
 
                        // Test user permission
                        req = &http.Request{
@@ -1334,7 +1375,7 @@ func (s *IntegrationSuite) TestDownloadLoggingPermission(c *check.C) {
                                },
                        }
                        s.checkUploadDownloadRequest(c, req, http.StatusOK, "download", userperm,
-                               arvadostest.ActiveUserUUID, arvadostest.FooCollection, "foo")
+                               arvadostest.ActiveUserUUID, arvadostest.FooCollection, arvadostest.FooCollectionPDH, "foo")
                }
        }
 
@@ -1354,7 +1395,7 @@ func (s *IntegrationSuite) TestDownloadLoggingPermission(c *check.C) {
                        },
                }
                s.checkUploadDownloadRequest(c, req, http.StatusOK, "download", true,
-                       arvadostest.ActiveUserUUID, arvadostest.MultilevelCollection1, "dir1/subdir/file1")
+                       arvadostest.ActiveUserUUID, arvadostest.MultilevelCollection1, arvadostest.MultilevelCollection1PDH, "dir1/subdir/file1")
        }
 
        u = mustParseURL("http://" + strings.Replace(arvadostest.FooCollectionPDH, "+", "-", 1) + ".keep-web.example/foo")
@@ -1368,7 +1409,7 @@ func (s *IntegrationSuite) TestDownloadLoggingPermission(c *check.C) {
                },
        }
        s.checkUploadDownloadRequest(c, req, http.StatusOK, "download", true,
-               arvadostest.ActiveUserUUID, arvadostest.FooCollection, "foo")
+               arvadostest.ActiveUserUUID, "", arvadostest.FooCollectionPDH, "foo")
 }
 
 func (s *IntegrationSuite) TestUploadLoggingPermission(c *check.C) {
@@ -1408,7 +1449,7 @@ func (s *IntegrationSuite) TestUploadLoggingPermission(c *check.C) {
                                Body: io.NopCloser(bytes.NewReader([]byte("bar"))),
                        }
                        s.checkUploadDownloadRequest(c, req, http.StatusCreated, "upload", adminperm,
-                               arvadostest.AdminUserUUID, coll.UUID, "bar")
+                               arvadostest.AdminUserUUID, coll.UUID, "", "bar")
 
                        // Test user permission
                        req = &http.Request{
@@ -1422,7 +1463,7 @@ func (s *IntegrationSuite) TestUploadLoggingPermission(c *check.C) {
                                Body: io.NopCloser(bytes.NewReader([]byte("bar"))),
                        }
                        s.checkUploadDownloadRequest(c, req, http.StatusCreated, "upload", userperm,
-                               arvadostest.ActiveUserUUID, coll.UUID, "bar")
+                               arvadostest.ActiveUserUUID, coll.UUID, "", "bar")
                }
        }
 }
index 7a23cd1fad06a551310fe9efaaa7030cf0d87d91..cd379dc6bd667df887b410edce26abd6eff209e7 100644 (file)
@@ -16,10 +16,6 @@ import (
        "github.com/prometheus/client_golang/prometheus"
 )
 
-var (
-       version = "dev"
-)
-
 var Command = service.Command(arvados.ServiceNameKeepweb, newHandlerOrErrorHandler)
 
 func newHandlerOrErrorHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler {
index 1f458f8e59ad2e2fa9139d4e388fe8554f70a420..f98efd8fdfcdf39febe29e84610aea474ccda02d 100644 (file)
@@ -27,9 +27,7 @@ import (
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
-       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
-       "git.arvados.org/arvados.git/sdk/go/keepclient"
        "github.com/AdRoll/goamz/s3"
 )
 
@@ -312,33 +310,18 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
                return false
        }
 
-       var err error
-       var fs arvados.CustomFileSystem
-       var arvclient *arvadosclient.ArvadosClient
-       if r.Method == http.MethodGet || r.Method == http.MethodHead {
-               // Use a single session (cached FileSystem) across
-               // multiple read requests.
-               var sess *cachedSession
-               fs, sess, err = h.Cache.GetSession(token)
-               if err != nil {
-                       s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
-                       return true
-               }
-               arvclient = sess.arvadosclient
-       } else {
+       fs, sess, tokenUser, err := h.Cache.GetSession(token)
+       if err != nil {
+               s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
+               return true
+       }
+       readfs := fs
+       if writeMethod[r.Method] {
                // Create a FileSystem for this request, to avoid
                // exposing incomplete write operations to concurrent
                // requests.
-               var kc *keepclient.KeepClient
-               var release func()
-               var client *arvados.Client
-               arvclient, kc, client, release, err = h.getClients(r.Header.Get("X-Request-Id"), token)
-               if err != nil {
-                       s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
-                       return true
-               }
-               defer release()
-               fs = client.SiteFileSystem(kc)
+               client := sess.client.WithRequestID(r.Header.Get("X-Request-Id"))
+               fs = client.SiteFileSystem(sess.keepclient)
                fs.ForwardSlashNameSubstitution(h.Cluster.Collections.ForwardSlashNameSubstitution)
        }
 
@@ -418,12 +401,11 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
                        return true
                }
 
-               tokenUser, err := h.Cache.GetTokenUser(token)
                if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
                        http.Error(w, "Not permitted", http.StatusForbidden)
                        return true
                }
-               h.logUploadOrDownload(r, arvclient, fs, fspath, nil, tokenUser)
+               h.logUploadOrDownload(r, sess.arvadosclient, fs, fspath, nil, tokenUser)
 
                // shallow copy r, and change URL path
                r := *r
@@ -514,12 +496,11 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
                        }
                        defer f.Close()
 
-                       tokenUser, err := h.Cache.GetTokenUser(token)
                        if !h.userPermittedToUploadOrDownload(r.Method, tokenUser) {
                                http.Error(w, "Not permitted", http.StatusForbidden)
                                return true
                        }
-                       h.logUploadOrDownload(r, arvclient, fs, fspath, nil, tokenUser)
+                       h.logUploadOrDownload(r, sess.arvadosclient, fs, fspath, nil, tokenUser)
 
                        _, err = io.Copy(f, r.Body)
                        if err != nil {
@@ -534,14 +515,12 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
                                return true
                        }
                }
-               err = fs.Sync()
+               err = h.syncCollection(fs, readfs, fspath)
                if err != nil {
                        err = fmt.Errorf("sync failed: %w", err)
                        s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
                        return true
                }
-               // Ensure a subsequent read operation will see the changes.
-               h.Cache.ResetSession(token)
                w.WriteHeader(http.StatusOK)
                return true
        case r.Method == http.MethodDelete:
@@ -588,14 +567,12 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
                        s3ErrorResponse(w, InvalidArgument, err.Error(), r.URL.Path, http.StatusBadRequest)
                        return true
                }
-               err = fs.Sync()
+               err = h.syncCollection(fs, readfs, fspath)
                if err != nil {
                        err = fmt.Errorf("sync failed: %w", err)
                        s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
                        return true
                }
-               // Ensure a subsequent read operation will see the changes.
-               h.Cache.ResetSession(token)
                w.WriteHeader(http.StatusNoContent)
                return true
        default:
@@ -604,6 +581,34 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
        }
 }
 
+// Save modifications to the indicated collection in srcfs, then (if
+// successful) ensure they are also reflected in dstfs.
+func (h *handler) syncCollection(srcfs, dstfs arvados.CustomFileSystem, path string) error {
+       coll, _ := h.determineCollection(srcfs, path)
+       if coll == nil || coll.UUID == "" {
+               return errors.New("could not determine collection to sync")
+       }
+       d, err := srcfs.OpenFile("by_id/"+coll.UUID, os.O_RDWR, 0777)
+       if err != nil {
+               return err
+       }
+       defer d.Close()
+       err = d.Sync()
+       if err != nil {
+               return err
+       }
+       snap, err := d.Snapshot()
+       if err != nil {
+               return err
+       }
+       dstd, err := dstfs.OpenFile("by_id/"+coll.UUID, os.O_RDWR, 0777)
+       if err != nil {
+               return err
+       }
+       defer dstd.Close()
+       return dstd.Splice(snap)
+}
+
 func setFileInfoHeaders(header http.Header, fs arvados.CustomFileSystem, path string) error {
        maybeEncode := func(s string) string {
                for _, c := range s {
index 851bee4b72f0449012c027e96cfefa4503c861b1..aa91d82ae36ab6c01cb50ef7b7dd944af16b4236 100644 (file)
@@ -316,14 +316,14 @@ func (s *IntegrationSuite) TestS3PropertiesAsMetadata(c *check.C) {
 func (s *IntegrationSuite) TestS3CollectionPutObjectSuccess(c *check.C) {
        stage := s.s3setup(c)
        defer stage.teardown(c)
-       s.testS3PutObjectSuccess(c, stage.collbucket, "")
+       s.testS3PutObjectSuccess(c, stage.collbucket, "", stage.coll.UUID)
 }
 func (s *IntegrationSuite) TestS3ProjectPutObjectSuccess(c *check.C) {
        stage := s.s3setup(c)
        defer stage.teardown(c)
-       s.testS3PutObjectSuccess(c, stage.projbucket, stage.coll.Name+"/")
+       s.testS3PutObjectSuccess(c, stage.projbucket, stage.coll.Name+"/", stage.coll.UUID)
 }
-func (s *IntegrationSuite) testS3PutObjectSuccess(c *check.C, bucket *s3.Bucket, prefix string) {
+func (s *IntegrationSuite) testS3PutObjectSuccess(c *check.C, bucket *s3.Bucket, prefix string, collUUID string) {
        for _, trial := range []struct {
                path        string
                size        int
@@ -367,7 +367,7 @@ func (s *IntegrationSuite) testS3PutObjectSuccess(c *check.C, bucket *s3.Bucket,
                if !c.Check(err, check.NotNil) {
                        continue
                }
-               c.Check(err.(*s3.Error).StatusCode, check.Equals, 404)
+               c.Check(err.(*s3.Error).StatusCode, check.Equals, http.StatusNotFound)
                c.Check(err.(*s3.Error).Code, check.Equals, `NoSuchKey`)
                if !c.Check(err, check.ErrorMatches, `The specified key does not exist.`) {
                        continue
@@ -390,6 +390,14 @@ func (s *IntegrationSuite) testS3PutObjectSuccess(c *check.C, bucket *s3.Bucket,
                c.Check(err, check.IsNil)
                c.Check(buf2, check.HasLen, len(buf))
                c.Check(bytes.Equal(buf, buf2), check.Equals, true)
+
+               // Check that the change is immediately visible via
+               // (non-S3) webdav request.
+               _, resp := s.do("GET", "http://"+collUUID+".keep-web.example/"+trial.path, arvadostest.ActiveTokenV2, nil)
+               c.Check(resp.Code, check.Equals, http.StatusOK)
+               if !strings.HasSuffix(trial.path, "/") {
+                       c.Check(resp.Body.Len(), check.Equals, trial.size)
+               }
        }
 }
 
index 61c540808b640d6115a76e3efb70e10928b2dba3..2aa11045503c021d397861bf7852f001f32843fe 100644 (file)
@@ -29,6 +29,7 @@ import (
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/httpserver"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
+       "github.com/prometheus/client_golang/prometheus"
        check "gopkg.in/check.v1"
 )
 
@@ -49,17 +50,17 @@ func (s *IntegrationSuite) TestNoToken(c *check.C) {
        } {
                hdr, body, _ := s.runCurl(c, token, "collections.example.com", "/collections/"+arvadostest.FooCollection+"/foo")
                c.Check(hdr, check.Matches, `(?s)HTTP/1.1 404 Not Found\r\n.*`)
-               c.Check(body, check.Equals, notFoundMessage+"\n")
+               c.Check(strings.TrimSpace(body), check.Equals, notFoundMessage)
 
                if token != "" {
                        hdr, body, _ = s.runCurl(c, token, "collections.example.com", "/collections/download/"+arvadostest.FooCollection+"/"+token+"/foo")
                        c.Check(hdr, check.Matches, `(?s)HTTP/1.1 404 Not Found\r\n.*`)
-                       c.Check(body, check.Equals, notFoundMessage+"\n")
+                       c.Check(strings.TrimSpace(body), check.Equals, notFoundMessage)
                }
 
                hdr, body, _ = s.runCurl(c, token, "collections.example.com", "/bad-route")
                c.Check(hdr, check.Matches, `(?s)HTTP/1.1 404 Not Found\r\n.*`)
-               c.Check(body, check.Equals, notFoundMessage+"\n")
+               c.Check(strings.TrimSpace(body), check.Equals, notFoundMessage)
        }
 }
 
@@ -92,7 +93,7 @@ func (s *IntegrationSuite) Test404(c *check.C) {
                hdr, body, _ := s.runCurl(c, arvadostest.ActiveToken, "collections.example.com", uri)
                c.Check(hdr, check.Matches, "(?s)HTTP/1.1 404 Not Found\r\n.*")
                if len(body) > 0 {
-                       c.Check(body, check.Equals, notFoundMessage+"\n")
+                       c.Check(strings.TrimSpace(body), check.Equals, notFoundMessage)
                }
        }
 }
@@ -475,15 +476,7 @@ func (s *IntegrationSuite) TestMetrics(c *check.C) {
        c.Check(summaries["request_duration_seconds/get/200"].SampleCount, check.Equals, "3")
        c.Check(summaries["request_duration_seconds/get/404"].SampleCount, check.Equals, "1")
        c.Check(summaries["time_to_status_seconds/get/404"].SampleCount, check.Equals, "1")
-       c.Check(counters["arvados_keepweb_collectioncache_requests//"].Value, check.Equals, int64(2))
-       c.Check(counters["arvados_keepweb_collectioncache_api_calls//"].Value, check.Equals, int64(2))
-       c.Check(counters["arvados_keepweb_collectioncache_hits//"].Value, check.Equals, int64(1))
-       c.Check(counters["arvados_keepweb_collectioncache_pdh_hits//"].Value, check.Equals, int64(1))
-       c.Check(gauges["arvados_keepweb_collectioncache_cached_manifests//"].Value, check.Equals, float64(1))
-       // FooCollection's cached manifest size is 45 ("1f4b0....+45")
-       // plus one 51-byte blob signature; session fs counts 3 inodes
-       // * 64 bytes.
-       c.Check(gauges["arvados_keepweb_sessions_cached_collection_bytes//"].Value, check.Equals, float64(45+51+64*3))
+       c.Check(gauges["arvados_keepweb_sessions_cached_session_bytes//"].Value, check.Equals, float64(469))
 
        // If the Host header indicates a collection, /metrics.json
        // refers to a file in the collection -- the metrics handler
@@ -529,7 +522,7 @@ func (s *IntegrationSuite) SetUpTest(c *check.C) {
 
        ctx := ctxlog.Context(context.Background(), logger)
 
-       s.handler = newHandlerOrErrorHandler(ctx, cluster, cluster.SystemRootToken, nil).(*handler)
+       s.handler = newHandlerOrErrorHandler(ctx, cluster, cluster.SystemRootToken, prometheus.NewRegistry()).(*handler)
        s.testServer = httptest.NewUnstartedServer(
                httpserver.AddRequestIDs(
                        httpserver.LogRequests(
index 501c355a7388a53fe3b40fc3f082c63768665620..0039f04eeff369a5fb5de3132389762d16535161 100644 (file)
@@ -36,7 +36,10 @@ var (
 // existence automatically so sequences like "mkcol foo; put foo/bar"
 // work as expected.
 type webdavFS struct {
-       collfs  arvados.FileSystem
+       collfs arvados.FileSystem
+       // prefix works like fs.Sub: Stat(name) calls
+       // Stat(prefix+name) in the wrapped filesystem.
+       prefix  string
        writing bool
        // webdav PROPFIND reads the first few bytes of each file
        // whose filename extension isn't recognized, which is
@@ -56,7 +59,7 @@ func (fs *webdavFS) makeparents(name string) {
        }
        dir = dir[:len(dir)-1]
        fs.makeparents(dir)
-       fs.collfs.Mkdir(dir, 0755)
+       fs.collfs.Mkdir(fs.prefix+dir, 0755)
 }
 
 func (fs *webdavFS) Mkdir(ctx context.Context, name string, perm os.FileMode) error {
@@ -65,7 +68,7 @@ func (fs *webdavFS) Mkdir(ctx context.Context, name string, perm os.FileMode) er
        }
        name = strings.TrimRight(name, "/")
        fs.makeparents(name)
-       return fs.collfs.Mkdir(name, 0755)
+       return fs.collfs.Mkdir(fs.prefix+name, 0755)
 }
 
 func (fs *webdavFS) OpenFile(ctx context.Context, name string, flag int, perm os.FileMode) (f webdav.File, err error) {
@@ -73,7 +76,7 @@ func (fs *webdavFS) OpenFile(ctx context.Context, name string, flag int, perm os
        if writing {
                fs.makeparents(name)
        }
-       f, err = fs.collfs.OpenFile(name, flag, perm)
+       f, err = fs.collfs.OpenFile(fs.prefix+name, flag, perm)
        if !fs.writing {
                // webdav module returns 404 on all OpenFile errors,
                // but returns 405 Method Not Allowed if OpenFile()
@@ -93,7 +96,7 @@ func (fs *webdavFS) OpenFile(ctx context.Context, name string, flag int, perm os
 }
 
 func (fs *webdavFS) RemoveAll(ctx context.Context, name string) error {
-       return fs.collfs.RemoveAll(name)
+       return fs.collfs.RemoveAll(fs.prefix + name)
 }
 
 func (fs *webdavFS) Rename(ctx context.Context, oldName, newName string) error {
@@ -106,14 +109,14 @@ func (fs *webdavFS) Rename(ctx context.Context, oldName, newName string) error {
                newName = strings.TrimSuffix(newName, "/")
        }
        fs.makeparents(newName)
-       return fs.collfs.Rename(oldName, newName)
+       return fs.collfs.Rename(fs.prefix+oldName, fs.prefix+newName)
 }
 
 func (fs *webdavFS) Stat(ctx context.Context, name string) (os.FileInfo, error) {
        if fs.writing {
                fs.makeparents(name)
        }
-       return fs.collfs.Stat(name)
+       return fs.collfs.Stat(fs.prefix + name)
 }
 
 type writeFailer struct {
index 2eaea278162c9c4d2fb308b30d27b38e8ea5b639..8c4a649f69d64d4872fb11efb7083403d87094da 100644 (file)
@@ -695,7 +695,7 @@ func (s *ServerRequiredSuite) TestCollectionSharingToken(c *C) {
        defer srv.Close()
        hash, _, err := kc.PutB([]byte("shareddata"))
        c.Check(err, IsNil)
-       kc.Arvados.ApiToken = arvadostest.FooCollectionSharingToken
+       kc.Arvados.ApiToken = arvadostest.FooFileCollectionSharingToken
        rdr, _, _, err := kc.Get(hash)
        c.Assert(err, IsNil)
        data, err := ioutil.ReadAll(rdr)
index 3109e9f6662654f8bc70deb4f686eda43e66ea1f..ded96c3121c0cc8d020b401c4e8b39da791bbdac 100755 (executable)
@@ -95,20 +95,20 @@ def getuserinfocsv(arv, uuid):
 collectionNameCache = {}
 def getCollectionName(arv, uuid, pdh):
     lookupField = uuid
-    filters = [["uuid","=",uuid]]
+    filters = [["uuid", "=", uuid]]
     cached = uuid in collectionNameCache
     # look up by uuid if it is available, fall back to look up by pdh
-    if len(uuid) != 27:
+    if uuid is None or len(uuid) != 27:
         # Look up by pdh. Note that this can be misleading; the download could
         # have happened from a collection with the same pdh but different name.
         # We arbitrarily pick the oldest collection with the pdh to lookup the
         # name, if the uuid for the request is not known.
         lookupField = pdh
-        filters = [["portable_data_hash","=",pdh]]
+        filters = [["portable_data_hash", "=", pdh]]
         cached = pdh in collectionNameCache
 
     if not cached:
-        u = arv.collections().list(filters=filters,order="created_at",limit=1).execute().get("items")
+        u = arv.collections().list(filters=filters, order="created_at", limit=1).execute().get("items")
         if len(u) < 1:
             return "(deleted)"
         collectionNameCache[lookupField] = u[0]["name"]
@@ -208,20 +208,19 @@ def main(arguments=None):
                 users[owner].append([loguuid, event_at, "Deleted collection %s" % (getname(e["properties"]["old_attributes"]))])
 
         elif e["event_type"] == "file_download":
-                users.setdefault(e["object_uuid"], [])
-                users[e["object_uuid"]].append([loguuid, event_at, "Downloaded file \"%s\" from \"%s\" (%s) (%s)" % (
-                                                                                       e["properties"].get("collection_file_path") or e["properties"].get("reqPath"),
-                                                                                       getCollectionName(arv, e["properties"].get("collection_uuid"), e["properties"].get("portable_data_hash")),
-                                                                                       e["properties"].get("collection_uuid"),
-                                                                                       e["properties"].get("portable_data_hash"))])
-
+            users.setdefault(e["object_uuid"], [])
+            users[e["object_uuid"]].append([loguuid, event_at, "Downloaded file \"%s\" from \"%s\" (%s) (%s)" % (
+                e["properties"].get("collection_file_path") or e["properties"].get("reqPath"),
+                getCollectionName(arv, e["properties"].get("collection_uuid"), e["properties"].get("portable_data_hash")),
+                e["properties"].get("collection_uuid"),
+                e["properties"].get("portable_data_hash"))])
 
         elif e["event_type"] == "file_upload":
-                users.setdefault(e["object_uuid"], [])
-                users[e["object_uuid"]].append([loguuid, event_at, "Uploaded file \"%s\" to \"%s\" (%s)" % (
-                                                                                    e["properties"].get("collection_file_path") or e["properties"].get("reqPath"),
-                                                                                    getCollectionName(arv, e["properties"].get("collection_uuid"), e["properties"].get("portable_data_hash")),
-                                                                                    e["properties"].get("collection_uuid"))])
+            users.setdefault(e["object_uuid"], [])
+            users[e["object_uuid"]].append([loguuid, event_at, "Uploaded file \"%s\" to \"%s\" (%s)" % (
+                e["properties"].get("collection_file_path") or e["properties"].get("reqPath"),
+                getCollectionName(arv, e["properties"].get("collection_uuid"), e["properties"].get("portable_data_hash")),
+                e["properties"].get("collection_uuid"))])
 
         else:
             users[owner].append([loguuid, event_at, "%s %s %s" % (e["event_type"], e["object_kind"], e["object_uuid"])])