18887: Merge branch 'main' into 18887-wb1-sends-v2-anonymous-token
authorWard Vandewege <ward@curii.com>
Mon, 4 Apr 2022 19:33:28 +0000 (15:33 -0400)
committerWard Vandewege <ward@curii.com>
Mon, 4 Apr 2022 19:33:28 +0000 (15:33 -0400)
Arvados-DCO-1.1-Signed-off-by: Ward Vandewege <ward@curii.com>

27 files changed:
apps/workbench/app/controllers/application_controller.rb
build/run-tests.sh
doc/sdk/cli/subcommands.html.textile.liquid
lib/controller/handler_test.go
lib/controller/router/router.go
sdk/go/arvados/client.go
sdk/go/arvados/client_test.go
sdk/go/httpserver/logger.go
sdk/python/arvados/arvfile.py
sdk/python/arvados/collection.py
sdk/python/arvados/commands/get.py
sdk/python/arvados/keep.py
sdk/python/arvados/util.py
sdk/python/tests/test_arvfile.py
sdk/python/tests/test_collections.py
sdk/python/tests/test_util.py
services/api/app/models/arvados_model.rb
services/api/db/migrate/20220401153101_fix_created_at_indexes.rb [new file with mode: 0644]
services/api/db/structure.sql
services/api/test/fixtures/jobs.yml
services/api/test/fixtures/pipeline_instances.yml
services/api/test/functional/arvados/v1/query_test.rb
services/api/test/integration/select_test.rb
services/fuse/arvados_fuse/fusedir.py
services/fuse/tests/test_mount.py
services/keep-web/handler.go
tools/user-activity/arvados_user_activity/main.py

index 5312e733f41eb992131752ccf782e112b8c5af2e..f8c8079a1edf4f5878c456d8f4ea0893e2574a32 100644 (file)
@@ -172,7 +172,7 @@ class ApplicationController < ActionController::Base
 
   def find_objects_for_index
     @objects ||= model_class
-    @objects = @objects.filter(@filters).limit(@limit).offset(@offset)
+    @objects = @objects.filter(@filters).limit(@limit).offset(@offset).order(@order)
     @objects.fetch_multiple_pages(false)
   end
 
index 3592efbdc2bee5f6c4e25251677c581dcf65c088..67c54c98b0244a72842bccfbb964ce3dbd164c0c 100755 (executable)
@@ -556,6 +556,12 @@ setup_ruby_environment() {
             done
             "$bundle" version | tee /dev/stderr | grep -q 'version 2'
         ) || fatal 'install bundler'
+       if test -d /var/lib/arvados-arvbox/ ; then
+           # Inside arvbox, use bundler-installed binstubs.  The
+           # system bundler and rail's own bin/bundle refuse to work.
+           # I don't know why.
+           bundle=binstubs/bundle
+       fi
     fi
 }
 
index 50d5d89871a612b368c0c46e966ed5717faa7a6a..5dda77ab5ee65cdf3700be3404f53455c0c25f28 100644 (file)
@@ -212,9 +212,10 @@ This is a frontend to @arv-get@.
 <notextile>
 <pre>
 $ <code class="userinput">arv keep get --help</code>
-usage: arv-get [-h] [--retries RETRIES]
+usage: arv-get [-h] [--retries RETRIES] [--version]
                [--progress | --no-progress | --batch-progress]
-               [--hash HASH | --md5sum] [-n] [-r] [-f | --skip-existing]
+               [--hash HASH | --md5sum] [-n] [-r]
+               [-f | -v | --skip-existing | --strip-manifest] [--threads N]
                locator [destination]
 
 Copy data from Keep to a local file or pipe.
@@ -223,19 +224,20 @@ positional arguments:
   locator            Collection locator, optionally with a file path or
                      prefix.
   destination        Local file or directory where the data is to be written.
-                     Default: /dev/stdout.
+                     Default: stdout.
 
 optional arguments:
   -h, --help         show this help message and exit
   --retries RETRIES  Maximum number of times to retry server requests that
-                     encounter temporary failures (e.g., server down). Default
-                     3.
+                     encounter temporary failures (e.g., server down).
+                     Default 3.
+  --version          Print version and exit.
   --progress         Display human-readable progress on stderr (bytes and, if
                      possible, percentage of total data size). This is the
                      default behavior when it is not expected to interfere
                      with the output: specifically, stderr is a tty _and_
-                     either stdout is not a tty, or output is being written to
-                     named files rather than stdout.
+                     either stdout is not a tty, or output is being written
+                     to named files rather than stdout.
   --no-progress      Do not display human-readable progress on stderr.
   --batch-progress   Display machine-readable progress on stderr (bytes and,
                      if known, total data size).
@@ -252,11 +254,19 @@ optional arguments:
   -f                 Overwrite existing files while writing. The default
                      behavior is to refuse to write *anything* if any of the
                      output files already exist. As a special case, -f is not
-                     needed to write to /dev/stdout.
-  --skip-existing    Skip files that already exist. The default behavior is to
-                     refuse to write *anything* if any files exist that would
-                     have to be overwritten. This option causes even devices,
-                     sockets, and fifos to be skipped.
+                     needed to write to stdout.
+  -v                 Once for verbose mode, twice for debug mode.
+  --skip-existing    Skip files that already exist. The default behavior is
+                     to refuse to write *anything* if any files exist that
+                     would have to be overwritten. This option causes even
+                     devices, sockets, and fifos to be skipped.
+  --strip-manifest   When getting a collection manifest, strip its access
+                     tokens before writing it.
+  --threads N        Set the number of download threads to be used. Take into
+                     account that using lots of threads will increase the RAM
+                     requirements. Default is to use 4 threads. On high
+                     latency installations, using a greater number will
+                     improve overall throughput.
 </pre>
 </notextile>
 
index 817cff79609dab91b7743f2869950974c8796082..5e467cb0588607d3deaa06c1d92326ed18f8f09c 100644 (file)
@@ -5,9 +5,11 @@
 package controller
 
 import (
+       "bytes"
        "context"
        "crypto/tls"
        "encoding/json"
+       "io"
        "io/ioutil"
        "net/http"
        "net/http/httptest"
@@ -36,13 +38,15 @@ var _ = check.Suite(&HandlerSuite{})
 type HandlerSuite struct {
        cluster *arvados.Cluster
        handler *Handler
+       logbuf  *bytes.Buffer
        ctx     context.Context
        cancel  context.CancelFunc
 }
 
 func (s *HandlerSuite) SetUpTest(c *check.C) {
+       s.logbuf = &bytes.Buffer{}
        s.ctx, s.cancel = context.WithCancel(context.Background())
-       s.ctx = ctxlog.Context(s.ctx, ctxlog.New(os.Stderr, "json", "debug"))
+       s.ctx = ctxlog.Context(s.ctx, ctxlog.New(io.MultiWriter(os.Stderr, s.logbuf), "json", "debug"))
        s.cluster = &arvados.Cluster{
                ClusterID:  "zzzzz",
                PostgreSQL: integrationTestCluster().PostgreSQL,
@@ -317,6 +321,16 @@ func (s *HandlerSuite) TestValidateRemoteToken(c *check.C) {
        }
 }
 
+func (s *HandlerSuite) TestLogTokenUUID(c *check.C) {
+       req := httptest.NewRequest("GET", "https://0.0.0.0/arvados/v1/users/current", nil)
+       req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveTokenV2)
+       req = req.WithContext(s.ctx)
+       resp := httptest.NewRecorder()
+       httpserver.LogRequests(s.handler).ServeHTTP(resp, req)
+       c.Check(resp.Code, check.Equals, http.StatusOK)
+       c.Check(s.logbuf.String(), check.Matches, `(?ms).*"tokenUUIDs":\["`+strings.Split(arvadostest.ActiveTokenV2, "/")[1]+`"\].*`)
+}
+
 func (s *HandlerSuite) TestCreateAPIToken(c *check.C) {
        req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
        auth, err := s.handler.createAPItoken(req, arvadostest.ActiveUserUUID, nil)
index 2cfcc4fc28287c8ee44166277ecaffe23d42e293..05bdb4754f0a860ac552867d42bf6e30af9eb4d6 100644 (file)
@@ -588,6 +588,23 @@ func (rtr *router) addRoute(endpoint arvados.APIEndpoint, defaultOpts func() int
                        "apiOptsType": fmt.Sprintf("%T", opts),
                        "apiOpts":     opts,
                }).Debug("exec")
+               // Extract the token UUIDs (or a placeholder for v1 tokens)
+               var tokenUUIDs []string
+               for _, t := range creds.Tokens {
+                       if strings.HasPrefix(t, "v2/") {
+                               tokenParts := strings.Split(t, "/")
+                               if len(tokenParts) >= 3 {
+                                       tokenUUIDs = append(tokenUUIDs, tokenParts[1])
+                               }
+                       } else {
+                               end := t
+                               if len(t) > 5 {
+                                       end = t[len(t)-5:]
+                               }
+                               tokenUUIDs = append(tokenUUIDs, "v1 token ending in "+end)
+                       }
+               }
+               httpserver.SetResponseLogFields(req.Context(), logrus.Fields{"tokenUUIDs": tokenUUIDs})
                resp, err := exec(ctx, opts)
                if err != nil {
                        logger.WithError(err).Debugf("returning error type %T", err)
index 5ec828667fc940ace2c3f59b6cdc643139ae3b14..24d5ac3e335c824f5ea4a444c6066ce37f3cc86f 100644 (file)
@@ -12,6 +12,7 @@ import (
        "errors"
        "fmt"
        "io"
+       "io/fs"
        "io/ioutil"
        "log"
        "net/http"
@@ -102,11 +103,60 @@ func NewClientFromConfig(cluster *Cluster) (*Client, error) {
 }
 
 // NewClientFromEnv creates a new Client that uses the default HTTP
-// client with the API endpoint and credentials given by the
-// ARVADOS_API_* environment variables.
+// client, and loads API endpoint and credentials from ARVADOS_*
+// environment variables (if set) and
+// $HOME/.config/arvados/settings.conf (if readable).
+//
+// If a config exists in both locations, the environment variable is
+// used.
+//
+// If there is an error (other than ENOENT) reading settings.conf,
+// NewClientFromEnv logs the error to log.Default(), then proceeds as
+// if settings.conf did not exist.
+//
+// Space characters are trimmed when reading the settings file, so
+// these are equivalent:
+//
+//   ARVADOS_API_HOST=localhost\n
+//   ARVADOS_API_HOST=localhost\r\n
+//   ARVADOS_API_HOST = localhost \n
+//   \tARVADOS_API_HOST = localhost\n
 func NewClientFromEnv() *Client {
+       vars := map[string]string{}
+       home := os.Getenv("HOME")
+       conffile := home + "/.config/arvados/settings.conf"
+       if home == "" {
+               // no $HOME => just use env vars
+       } else if settings, err := os.ReadFile(conffile); errors.Is(err, fs.ErrNotExist) {
+               // no config file => just use env vars
+       } else if err != nil {
+               // config file unreadable => log message, then use env vars
+               log.Printf("continuing without loading %s: %s", conffile, err)
+       } else {
+               for _, line := range bytes.Split(settings, []byte{'\n'}) {
+                       kv := bytes.SplitN(line, []byte{'='}, 2)
+                       k := string(bytes.TrimSpace(kv[0]))
+                       if len(kv) != 2 || !strings.HasPrefix(k, "ARVADOS_") {
+                               // Same behavior as python sdk:
+                               // silently skip leading # (comments),
+                               // blank lines, typos, and non-Arvados
+                               // vars.
+                               continue
+                       }
+                       vars[k] = string(bytes.TrimSpace(kv[1]))
+               }
+       }
+       for _, env := range os.Environ() {
+               if !strings.HasPrefix(env, "ARVADOS_") {
+                       continue
+               }
+               kv := strings.SplitN(env, "=", 2)
+               if len(kv) == 2 {
+                       vars[kv[0]] = kv[1]
+               }
+       }
        var svcs []string
-       for _, s := range strings.Split(os.Getenv("ARVADOS_KEEP_SERVICES"), " ") {
+       for _, s := range strings.Split(vars["ARVADOS_KEEP_SERVICES"], " ") {
                if s == "" {
                        continue
                } else if u, err := url.Parse(s); err != nil {
@@ -118,13 +168,13 @@ func NewClientFromEnv() *Client {
                }
        }
        var insecure bool
-       if s := strings.ToLower(os.Getenv("ARVADOS_API_HOST_INSECURE")); s == "1" || s == "yes" || s == "true" {
+       if s := strings.ToLower(vars["ARVADOS_API_HOST_INSECURE"]); s == "1" || s == "yes" || s == "true" {
                insecure = true
        }
        return &Client{
                Scheme:          "https",
-               APIHost:         os.Getenv("ARVADOS_API_HOST"),
-               AuthToken:       os.Getenv("ARVADOS_API_TOKEN"),
+               APIHost:         vars["ARVADOS_API_HOST"],
+               AuthToken:       vars["ARVADOS_API_TOKEN"],
                Insecure:        insecure,
                KeepServiceURIs: svcs,
                Timeout:         5 * time.Minute,
index df938008d49756b850ca6e5ce5abee8a0510e2a3..2363803cab1de157f4074d3a2770f2cc0c9201ca 100644 (file)
@@ -10,9 +10,12 @@ import (
        "io/ioutil"
        "net/http"
        "net/url"
+       "os"
+       "strings"
        "sync"
-       "testing"
        "testing/iotest"
+
+       check "gopkg.in/check.v1"
 )
 
 type stubTransport struct {
@@ -68,43 +71,36 @@ func (stub *timeoutTransport) RoundTrip(req *http.Request) (*http.Response, erro
        }, nil
 }
 
-func TestCurrentUser(t *testing.T) {
-       t.Parallel()
+var _ = check.Suite(&clientSuite{})
+
+type clientSuite struct{}
+
+func (*clientSuite) TestCurrentUser(c *check.C) {
        stub := &stubTransport{
                Responses: map[string]string{
                        "/arvados/v1/users/current": `{"uuid":"zzzzz-abcde-012340123401234"}`,
                },
        }
-       c := &Client{
+       client := &Client{
                Client: &http.Client{
                        Transport: stub,
                },
                APIHost:   "zzzzz.arvadosapi.com",
                AuthToken: "xyzzy",
        }
-       u, err := c.CurrentUser()
-       if err != nil {
-               t.Fatal(err)
-       }
-       if x := "zzzzz-abcde-012340123401234"; u.UUID != x {
-               t.Errorf("got uuid %q, expected %q", u.UUID, x)
-       }
-       if len(stub.Requests) < 1 {
-               t.Fatal("empty stub.Requests")
-       }
+       u, err := client.CurrentUser()
+       c.Check(err, check.IsNil)
+       c.Check(u.UUID, check.Equals, "zzzzz-abcde-012340123401234")
+       c.Check(stub.Requests, check.Not(check.HasLen), 0)
        hdr := stub.Requests[len(stub.Requests)-1].Header
-       if hdr.Get("Authorization") != "OAuth2 xyzzy" {
-               t.Errorf("got headers %+q, expected Authorization header", hdr)
-       }
+       c.Check(hdr.Get("Authorization"), check.Equals, "OAuth2 xyzzy")
 
-       c.Client.Transport = &errorTransport{}
-       u, err = c.CurrentUser()
-       if err == nil {
-               t.Errorf("got nil error, expected something awful")
-       }
+       client.Client.Transport = &errorTransport{}
+       u, err = client.CurrentUser()
+       c.Check(err, check.NotNil)
 }
 
-func TestAnythingToValues(t *testing.T) {
+func (*clientSuite) TestAnythingToValues(c *check.C) {
        type testCase struct {
                in interface{}
                // ok==nil means anythingToValues should return an
@@ -158,17 +154,66 @@ func TestAnythingToValues(t *testing.T) {
                        ok: nil,
                },
        } {
-               t.Logf("%#v", tc.in)
+               c.Logf("%#v", tc.in)
                out, err := anythingToValues(tc.in)
-               switch {
-               case tc.ok == nil:
-                       if err == nil {
-                               t.Errorf("got %#v, expected error", out)
-                       }
-               case err != nil:
-                       t.Errorf("got err %#v, expected nil", err)
-               case !tc.ok(out):
-                       t.Errorf("got %#v but tc.ok() says that is wrong", out)
+               if tc.ok == nil {
+                       c.Check(err, check.NotNil)
+                       continue
                }
+               c.Check(err, check.IsNil)
+               c.Check(tc.ok(out), check.Equals, true)
        }
 }
+
+func (*clientSuite) TestLoadConfig(c *check.C) {
+       oldenv := os.Environ()
+       defer func() {
+               os.Clearenv()
+               for _, s := range oldenv {
+                       i := strings.IndexRune(s, '=')
+                       os.Setenv(s[:i], s[i+1:])
+               }
+       }()
+
+       tmp := c.MkDir()
+       os.Setenv("HOME", tmp)
+       for _, s := range os.Environ() {
+               if strings.HasPrefix(s, "ARVADOS_") {
+                       i := strings.IndexRune(s, '=')
+                       os.Unsetenv(s[:i])
+               }
+       }
+       os.Mkdir(tmp+"/.config", 0777)
+       os.Mkdir(tmp+"/.config/arvados", 0777)
+
+       // Use $HOME/.config/arvados/settings.conf if no env vars are
+       // set
+       os.WriteFile(tmp+"/.config/arvados/settings.conf", []byte(`
+               ARVADOS_API_HOST = localhost:1
+               ARVADOS_API_TOKEN = token_from_settings_file1
+       `), 0777)
+       client := NewClientFromEnv()
+       c.Check(client.AuthToken, check.Equals, "token_from_settings_file1")
+       c.Check(client.APIHost, check.Equals, "localhost:1")
+       c.Check(client.Insecure, check.Equals, false)
+
+       // ..._INSECURE=true, comments, ignored lines in settings.conf
+       os.WriteFile(tmp+"/.config/arvados/settings.conf", []byte(`
+               (ignored) = (ignored)
+               #ARVADOS_API_HOST = localhost:2
+               ARVADOS_API_TOKEN = token_from_settings_file2
+               ARVADOS_API_HOST_INSECURE = true
+       `), 0777)
+       client = NewClientFromEnv()
+       c.Check(client.AuthToken, check.Equals, "token_from_settings_file2")
+       c.Check(client.APIHost, check.Equals, "")
+       c.Check(client.Insecure, check.Equals, true)
+
+       // Environment variables override settings.conf
+       os.Setenv("ARVADOS_API_HOST", "[::]:3")
+       os.Setenv("ARVADOS_API_HOST_INSECURE", "0")
+       client = NewClientFromEnv()
+       c.Check(client.AuthToken, check.Equals, "token_from_settings_file2")
+       c.Check(client.APIHost, check.Equals, "[::]:3")
+       c.Check(client.Insecure, check.Equals, false)
+}
index 7eb7f0f03d57b571e314f8d87ca6714cf7d6563f..5a46635e9102365bbfd01c9c9c120bd8e23a7026 100644 (file)
@@ -9,6 +9,7 @@ import (
        "context"
        "net"
        "net/http"
+       "sync"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
@@ -21,7 +22,9 @@ type contextKey struct {
 }
 
 var (
-       requestTimeContextKey = contextKey{"requestTime"}
+       requestTimeContextKey       = contextKey{"requestTime"}
+       responseLogFieldsContextKey = contextKey{"responseLogFields"}
+       mutexContextKey             = contextKey{"mutex"}
 )
 
 type hijacker interface {
@@ -64,6 +67,19 @@ func HandlerWithDeadline(timeout time.Duration, next http.Handler) http.Handler
        })
 }
 
+func SetResponseLogFields(ctx context.Context, fields logrus.Fields) {
+       m, _ := ctx.Value(&mutexContextKey).(*sync.Mutex)
+       c, _ := ctx.Value(&responseLogFieldsContextKey).(logrus.Fields)
+       if m == nil || c == nil {
+               return
+       }
+       m.Lock()
+       defer m.Unlock()
+       for k, v := range fields {
+               c[k] = v
+       }
+}
+
 // LogRequests wraps an http.Handler, logging each request and
 // response.
 func LogRequests(h http.Handler) http.Handler {
@@ -81,6 +97,8 @@ func LogRequests(h http.Handler) http.Handler {
                })
                ctx := req.Context()
                ctx = context.WithValue(ctx, &requestTimeContextKey, time.Now())
+               ctx = context.WithValue(ctx, &responseLogFieldsContextKey, logrus.Fields{})
+               ctx = context.WithValue(ctx, &mutexContextKey, &sync.Mutex{})
                ctx = ctxlog.Context(ctx, lgr)
                req = req.WithContext(ctx)
 
@@ -124,6 +142,9 @@ func logResponse(w *responseTimer, req *http.Request, lgr *logrus.Entry) {
                        "timeWriteBody": stats.Duration(tDone.Sub(writeTime)),
                })
        }
+       if responseLogFields, ok := req.Context().Value(&responseLogFieldsContextKey).(logrus.Fields); ok {
+               lgr = lgr.WithFields(responseLogFields)
+       }
        respCode := w.WroteStatus()
        if respCode == 0 {
                respCode = http.StatusOK
index 0fcdc1e6334957f27a5ff1f10fbdedcf2716609a..2ce0e46b30bd67ad948f832183ab091865c2ea53 100644 (file)
@@ -481,7 +481,7 @@ class _BlockManager(object):
     DEFAULT_PUT_THREADS = 2
     DEFAULT_GET_THREADS = 2
 
-    def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None):
+    def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None, get_threads=None):
         """keep: KeepClient object to use"""
         self._keep = keep
         self._bufferblocks = collections.OrderedDict()
@@ -492,7 +492,7 @@ class _BlockManager(object):
         self.lock = threading.Lock()
         self.prefetch_enabled = True
         self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
-        self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
+        self.num_get_threads = get_threads or _BlockManager.DEFAULT_GET_THREADS
         self.copies = copies
         self.storage_classes = storage_classes_func or (lambda: [])
         self._pending_write_size = 0
@@ -593,7 +593,7 @@ class _BlockManager(object):
                 b = self._prefetch_queue.get()
                 if b is None:
                     return
-                self._keep.get(b)
+                self._keep.get(b, prefetch=True)
             except Exception:
                 _logger.exception("Exception doing block prefetch")
 
@@ -841,9 +841,6 @@ class _BlockManager(object):
         if not self.prefetch_enabled:
             return
 
-        if self._keep.get_from_cache(locator) is not None:
-            return
-
         with self.lock:
             if locator in self._bufferblocks:
                 return
@@ -1099,7 +1096,7 @@ class ArvadosFile(object):
             if size == 0 or offset >= self.size():
                 return b''
             readsegs = locators_and_ranges(self._segments, offset, size)
-            prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
+            prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager().num_get_threads, limit=32)
 
         locs = set()
         data = []
index a076de6baf622f560f92859db68e7e8cdafc65f9..a44d42b6ac7cd7a4156ab3b8bc4f72f86060e3a0 100644 (file)
@@ -1262,7 +1262,8 @@ class Collection(RichCollectionBase):
                  block_manager=None,
                  replication_desired=None,
                  storage_classes_desired=None,
-                 put_threads=None):
+                 put_threads=None,
+                 get_threads=None):
         """Collection constructor.
 
         :manifest_locator_or_text:
@@ -1311,6 +1312,7 @@ class Collection(RichCollectionBase):
         self.replication_desired = replication_desired
         self._storage_classes_desired = storage_classes_desired
         self.put_threads = put_threads
+        self.get_threads = get_threads
 
         if apiconfig:
             self._config = apiconfig
@@ -1424,7 +1426,12 @@ class Collection(RichCollectionBase):
             copies = (self.replication_desired or
                       self._my_api()._rootDesc.get('defaultCollectionReplication',
                                                    2))
-            self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads, num_retries=self.num_retries, storage_classes_func=self.storage_classes_desired)
+            self._block_manager = _BlockManager(self._my_keep(),
+                                                copies=copies,
+                                                put_threads=self.put_threads,
+                                                num_retries=self.num_retries,
+                                                storage_classes_func=self.storage_classes_desired,
+                                                get_threads=self.get_threads,)
         return self._block_manager
 
     def _remember_api_response(self, response):
index eb682976253b66a3c0c6c0ebd0a9dc5ca306d499..c061c70f0eebbac2ed2025fdecd27865c27139b8 100755 (executable)
@@ -98,6 +98,15 @@ When getting a collection manifest, strip its access tokens before writing
 it.
 """)
 
+parser.add_argument('--threads', type=int, metavar='N', default=4,
+                    help="""
+Set the number of download threads to be used. Take into account that
+using lots of threads will increase the RAM requirements. Default is
+to use 4 threads.
+On high latency installations, using a greater number will improve
+overall throughput.
+""")
+
 def parse_arguments(arguments, stdout, stderr):
     args = parser.parse_args(arguments)
 
@@ -191,7 +200,9 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
 
     try:
         reader = arvados.CollectionReader(
-            col_loc, api_client=api_client, num_retries=args.retries)
+            col_loc, api_client=api_client, num_retries=args.retries,
+            keep_client=arvados.keep.KeepClient(block_cache=arvados.keep.KeepBlockCache((args.threads+1)*64 * 1024 * 1024)),
+            get_threads=args.threads)
     except Exception as error:
         logger.error("failed to read collection: {}".format(error))
         return 1
index 1a83eae944c59f8dde5e3a7c63de8bbe9c62a9c9..7c05cc0a6a2c72ca818686b6eea5c6f0a4874d3d 100644 (file)
@@ -1036,9 +1036,10 @@ class KeepClient(object):
         else:
             return None
 
-    def get_from_cache(self, loc):
+    def get_from_cache(self, loc_s):
         """Fetch a block only if is in the cache, otherwise return None."""
-        slot = self.block_cache.get(loc)
+        locator = KeepLocator(loc_s)
+        slot = self.block_cache.get(locator.md5sum)
         if slot is not None and slot.ready.is_set():
             return slot.get()
         else:
@@ -1057,7 +1058,7 @@ class KeepClient(object):
     def get(self, loc_s, **kwargs):
         return self._get_or_head(loc_s, method="GET", **kwargs)
 
-    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None):
+    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
@@ -1096,6 +1097,13 @@ class KeepClient(object):
             if method == "GET":
                 slot, first = self.block_cache.reserve_cache(locator.md5sum)
                 if not first:
+                    if prefetch:
+                        # this is request for a prefetch, if it is
+                        # already in flight, return immediately.
+                        # clear 'slot' to prevent finally block from
+                        # calling slot.set()
+                        slot = None
+                        return None
                     self.hits_counter.add(1)
                     blob = slot.get()
                     if blob is None:
@@ -1332,6 +1340,3 @@ class KeepClient(object):
             return True
         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
             return True
-
-    def is_cached(self, locator):
-        return self.block_cache.reserve_cache(expect_hash)
index be8a03fc314d2cf599c16d5f44b1ab61cc9e885d..c383d529e8087da579fcf4ae6814f76a57044e29 100644 (file)
@@ -392,7 +392,8 @@ def keyset_list_all(fn, order_key="created_at", num_retries=0, ascending=True, *
     pagesize = 1000
     kwargs["limit"] = pagesize
     kwargs["count"] = 'none'
-    kwargs["order"] = ["%s %s" % (order_key, "asc" if ascending else "desc"), "uuid asc"]
+    asc = "asc" if ascending else "desc"
+    kwargs["order"] = ["%s %s" % (order_key, asc), "uuid %s" % asc]
     other_filters = kwargs.get("filters", [])
 
     if "select" in kwargs and "uuid" not in kwargs["select"]:
@@ -436,7 +437,7 @@ def keyset_list_all(fn, order_key="created_at", num_retries=0, ascending=True, *
         if firstitem[order_key] == lastitem[order_key]:
             # Got a page where every item has the same order key.
             # Switch to using uuid for paging.
-            nextpage = [[order_key, "=", lastitem[order_key]], ["uuid", ">", lastitem["uuid"]]]
+            nextpage = [[order_key, "=", lastitem[order_key]], ["uuid", ">" if ascending else "<", lastitem["uuid"]]]
             prev_page_all_same_order_key = True
         else:
             # Start from the last order key seen, but skip the last
index 0b8e7c8f8bf2d4615209e0dbdbfd81e6e54b32af..b45a592ecd0fbd1b1d4722bd63f6e1e0b25514dd 100644 (file)
@@ -27,7 +27,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
         def __init__(self, blocks):
             self.blocks = blocks
             self.requests = []
-        def get(self, locator, num_retries=0):
+        def get(self, locator, num_retries=0, prefetch=False):
             self.requests.append(locator)
             return self.blocks.get(locator)
         def get_from_cache(self, locator):
@@ -627,6 +627,7 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase):
             def __init__(self, blocks, nocache):
                 self.blocks = blocks
                 self.nocache = nocache
+                self.num_get_threads = 1
 
             def block_prefetch(self, loc):
                 pass
index a43e0d40dfe7ed48f5477689d3623afefe952ba3..5cf4993b2f3804d22209ae16db41fc7bc505efd8 100644 (file)
@@ -320,7 +320,7 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
         def __init__(self, content, num_retries=0):
             self.content = content
 
-        def get(self, locator, num_retries=0):
+        def get(self, locator, num_retries=0, prefetch=False):
             return self.content[locator]
 
     def test_stream_reader(self):
index 1c0e437b41196bf1e56e2048e11029725b01da2e..4dba9ce3dc7a5105533a526fe3ee304ed60d784c 100644 (file)
@@ -166,10 +166,10 @@ class KeysetListAllTestCase(unittest.TestCase):
 
     def test_onepage_desc(self):
         ks = KeysetTestHelper([[
-            {"limit": 1000, "count": "none", "order": ["created_at desc", "uuid asc"], "filters": []},
+            {"limit": 1000, "count": "none", "order": ["created_at desc", "uuid desc"], "filters": []},
             {"items": [{"created_at": "2", "uuid": "2"}, {"created_at": "1", "uuid": "1"}]}
         ], [
-            {"limit": 1000, "count": "none", "order": ["created_at desc", "uuid asc"], "filters": [["created_at", "<=", "1"], ["uuid", "!=", "1"]]},
+            {"limit": 1000, "count": "none", "order": ["created_at desc", "uuid desc"], "filters": [["created_at", "<=", "1"], ["uuid", "!=", "1"]]},
             {"items": []}
         ]])
 
index 327bf63b5fa057779d6d03d99331b179077611db..07a31d81a8a129dc67acb4aa1a7fa8f39e253ea1 100644 (file)
@@ -220,7 +220,7 @@ class ArvadosModel < ApplicationRecord
   end
 
   def self.default_orders
-    ["#{table_name}.modified_at desc", "#{table_name}.uuid"]
+    ["#{table_name}.modified_at desc", "#{table_name}.uuid desc"]
   end
 
   def self.unique_columns
diff --git a/services/api/db/migrate/20220401153101_fix_created_at_indexes.rb b/services/api/db/migrate/20220401153101_fix_created_at_indexes.rb
new file mode 100644 (file)
index 0000000..590e841
--- /dev/null
@@ -0,0 +1,28 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+class FixCreatedAtIndexes < ActiveRecord::Migration[5.2]
+  @@idxtables = [:collections, :container_requests, :groups, :links, :repositories, :users, :virtual_machines, :workflows, :logs]
+
+  def up
+    @@idxtables.each do |table|
+      ActiveRecord::Base.connection.execute("DROP INDEX IF EXISTS index_#{table.to_s}_on_created_at")
+      ActiveRecord::Base.connection.execute("DROP INDEX IF EXISTS index_#{table.to_s}_on_created_at_uuid")
+      ActiveRecord::Base.connection.execute("DROP INDEX IF EXISTS index_#{table.to_s}_on_created_at_and_uuid")
+      ActiveRecord::Base.connection.execute("DROP INDEX IF EXISTS index_#{table.to_s}_on_modified_at")
+      ActiveRecord::Base.connection.execute("DROP INDEX IF EXISTS index_#{table.to_s}_on_modified_at_uuid")
+      ActiveRecord::Base.connection.execute("DROP INDEX IF EXISTS index_#{table.to_s}_on_modified_at_and_uuid")
+
+      ActiveRecord::Base.connection.execute("CREATE INDEX IF NOT EXISTS index_#{table.to_s}_on_created_at_and_uuid ON #{table.to_s} USING btree (created_at, uuid)")
+      ActiveRecord::Base.connection.execute("CREATE INDEX IF NOT EXISTS index_#{table.to_s}_on_modified_at_and_uuid ON #{table.to_s} USING btree (modified_at, uuid)")
+    end
+  end
+
+  def down
+    @@idxtables.each do |table|
+      ActiveRecord::Base.connection.execute("DROP INDEX IF EXISTS index_#{table.to_s}_on_modified_at_and_uuid")
+      ActiveRecord::Base.connection.execute("CREATE INDEX IF NOT EXISTS index_#{table.to_s}_on_modified_at_uuid ON #{table.to_s} USING btree (modified_at desc, uuid asc)")
+    end
+  end
+end
index cfe21f7c9ae29307b42fd25236a1f4c195254da0..e6bba676257118d542333120a5815549d2cb1f64 100644 (file)
@@ -1905,10 +1905,10 @@ CREATE UNIQUE INDEX index_authorized_keys_on_uuid ON public.authorized_keys USIN
 
 
 --
--- Name: index_collections_on_created_at; Type: INDEX; Schema: public; Owner: -
+-- Name: index_collections_on_created_at_and_uuid; Type: INDEX; Schema: public; Owner: -
 --
 
-CREATE INDEX index_collections_on_created_at ON public.collections USING btree (created_at);
+CREATE INDEX index_collections_on_created_at_and_uuid ON public.collections USING btree (created_at, uuid);
 
 
 --
@@ -1933,17 +1933,10 @@ CREATE INDEX index_collections_on_is_trashed ON public.collections USING btree (
 
 
 --
--- Name: index_collections_on_modified_at; Type: INDEX; Schema: public; Owner: -
+-- Name: index_collections_on_modified_at_and_uuid; Type: INDEX; Schema: public; Owner: -
 --
 
-CREATE INDEX index_collections_on_modified_at ON public.collections USING btree (modified_at);
-
-
---
--- Name: index_collections_on_modified_at_uuid; Type: INDEX; Schema: public; Owner: -
---
-
-CREATE INDEX index_collections_on_modified_at_uuid ON public.collections USING btree (modified_at DESC, uuid);
+CREATE INDEX index_collections_on_modified_at_and_uuid ON public.collections USING btree (modified_at, uuid);
 
 
 --
@@ -1989,10 +1982,17 @@ CREATE INDEX index_container_requests_on_container_uuid ON public.container_requ
 
 
 --
--- Name: index_container_requests_on_modified_at_uuid; Type: INDEX; Schema: public; Owner: -
+-- Name: index_container_requests_on_created_at_and_uuid; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX index_container_requests_on_created_at_and_uuid ON public.container_requests USING btree (created_at, uuid);
+
+
+--
+-- Name: index_container_requests_on_modified_at_and_uuid; Type: INDEX; Schema: public; Owner: -
 --
 
-CREATE INDEX index_container_requests_on_modified_at_uuid ON public.container_requests USING btree (modified_at DESC, uuid);
+CREATE INDEX index_container_requests_on_modified_at_and_uuid ON public.container_requests USING btree (modified_at, uuid);
 
 
 --
@@ -2094,10 +2094,10 @@ CREATE UNIQUE INDEX index_frozen_groups_on_uuid ON public.frozen_groups USING bt
 
 
 --
--- Name: index_groups_on_created_at; Type: INDEX; Schema: public; Owner: -
+-- Name: index_groups_on_created_at_and_uuid; Type: INDEX; Schema: public; Owner: -
 --
 
-CREATE INDEX index_groups_on_created_at ON public.groups USING btree (created_at);
+CREATE INDEX index_groups_on_created_at_and_uuid ON public.groups USING btree (created_at, uuid);
 
 
 --
@@ -2122,17 +2122,10 @@ CREATE INDEX index_groups_on_is_trashed ON public.groups USING btree (is_trashed
 
 
 --
--- Name: index_groups_on_modified_at; Type: INDEX; Schema: public; Owner: -
+-- Name: index_groups_on_modified_at_and_uuid; Type: INDEX; Schema: public; Owner: -
 --
 
-CREATE INDEX index_groups_on_modified_at ON public.groups USING btree (modified_at);
-
-
---
--- Name: index_groups_on_modified_at_uuid; Type: INDEX; Schema: public; Owner: -
---
-
-CREATE INDEX index_groups_on_modified_at_uuid ON public.groups USING btree (modified_at DESC, uuid);
+CREATE INDEX index_groups_on_modified_at_and_uuid ON public.groups USING btree (modified_at, uuid);
 
 
 --
@@ -2360,10 +2353,10 @@ CREATE UNIQUE INDEX index_keep_services_on_uuid ON public.keep_services USING bt
 
 
 --
--- Name: index_links_on_created_at; Type: INDEX; Schema: public; Owner: -
+-- Name: index_links_on_created_at_and_uuid; Type: INDEX; Schema: public; Owner: -
 --
 
-CREATE INDEX index_links_on_created_at ON public.links USING btree (created_at);
+CREATE INDEX index_links_on_created_at_and_uuid ON public.links USING btree (created_at, uuid);
 
 
 --
@@ -2374,17 +2367,10 @@ CREATE INDEX index_links_on_head_uuid ON public.links USING btree (head_uuid);
 
 
 --
--- Name: index_links_on_modified_at; Type: INDEX; Schema: public; Owner: -
+-- Name: index_links_on_modified_at_and_uuid; Type: INDEX; Schema: public; Owner: -
 --
 
-CREATE INDEX index_links_on_modified_at ON public.links USING btree (modified_at);
-
-
---
--- Name: index_links_on_modified_at_uuid; Type: INDEX; Schema: public; Owner: -
---
-
-CREATE INDEX index_links_on_modified_at_uuid ON public.links USING btree (modified_at DESC, uuid);
+CREATE INDEX index_links_on_modified_at_and_uuid ON public.links USING btree (modified_at, uuid);
 
 
 --
@@ -2423,10 +2409,10 @@ CREATE UNIQUE INDEX index_links_on_uuid ON public.links USING btree (uuid);
 
 
 --
--- Name: index_logs_on_created_at; Type: INDEX; Schema: public; Owner: -
+-- Name: index_logs_on_created_at_and_uuid; Type: INDEX; Schema: public; Owner: -
 --
 
-CREATE INDEX index_logs_on_created_at ON public.logs USING btree (created_at);
+CREATE INDEX index_logs_on_created_at_and_uuid ON public.logs USING btree (created_at, uuid);
 
 
 --
@@ -2444,17 +2430,10 @@ CREATE INDEX index_logs_on_event_type ON public.logs USING btree (event_type);
 
 
 --
--- Name: index_logs_on_modified_at; Type: INDEX; Schema: public; Owner: -
+-- Name: index_logs_on_modified_at_and_uuid; Type: INDEX; Schema: public; Owner: -
 --
 
-CREATE INDEX index_logs_on_modified_at ON public.logs USING btree (modified_at);
-
-
---
--- Name: index_logs_on_modified_at_uuid; Type: INDEX; Schema: public; Owner: -
---
-
-CREATE INDEX index_logs_on_modified_at_uuid ON public.logs USING btree (modified_at DESC, uuid);
+CREATE INDEX index_logs_on_modified_at_and_uuid ON public.logs USING btree (modified_at, uuid);
 
 
 --
@@ -2605,10 +2584,17 @@ CREATE UNIQUE INDEX index_pipeline_templates_on_uuid ON public.pipeline_template
 
 
 --
--- Name: index_repositories_on_modified_at_uuid; Type: INDEX; Schema: public; Owner: -
+-- Name: index_repositories_on_created_at_and_uuid; Type: INDEX; Schema: public; Owner: -
 --
 
-CREATE INDEX index_repositories_on_modified_at_uuid ON public.repositories USING btree (modified_at DESC, uuid);
+CREATE INDEX index_repositories_on_created_at_and_uuid ON public.repositories USING btree (created_at, uuid);
+
+
+--
+-- Name: index_repositories_on_modified_at_and_uuid; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX index_repositories_on_modified_at_and_uuid ON public.repositories USING btree (modified_at, uuid);
 
 
 --
@@ -2689,10 +2675,10 @@ CREATE UNIQUE INDEX index_trashed_groups_on_group_uuid ON public.trashed_groups
 
 
 --
--- Name: index_users_on_created_at; Type: INDEX; Schema: public; Owner: -
+-- Name: index_users_on_created_at_and_uuid; Type: INDEX; Schema: public; Owner: -
 --
 
-CREATE INDEX index_users_on_created_at ON public.users USING btree (created_at);
+CREATE INDEX index_users_on_created_at_and_uuid ON public.users USING btree (created_at, uuid);
 
 
 --
@@ -2703,17 +2689,10 @@ CREATE UNIQUE INDEX index_users_on_identity_url ON public.users USING btree (ide
 
 
 --
--- Name: index_users_on_modified_at; Type: INDEX; Schema: public; Owner: -
---
-
-CREATE INDEX index_users_on_modified_at ON public.users USING btree (modified_at);
-
-
---
--- Name: index_users_on_modified_at_uuid; Type: INDEX; Schema: public; Owner: -
+-- Name: index_users_on_modified_at_and_uuid; Type: INDEX; Schema: public; Owner: -
 --
 
-CREATE INDEX index_users_on_modified_at_uuid ON public.users USING btree (modified_at DESC, uuid);
+CREATE INDEX index_users_on_modified_at_and_uuid ON public.users USING btree (modified_at, uuid);
 
 
 --
@@ -2737,6 +2716,13 @@ CREATE UNIQUE INDEX index_users_on_username ON public.users USING btree (usernam
 CREATE UNIQUE INDEX index_users_on_uuid ON public.users USING btree (uuid);
 
 
+--
+-- Name: index_virtual_machines_on_created_at_and_uuid; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX index_virtual_machines_on_created_at_and_uuid ON public.virtual_machines USING btree (created_at, uuid);
+
+
 --
 -- Name: index_virtual_machines_on_hostname; Type: INDEX; Schema: public; Owner: -
 --
@@ -2745,10 +2731,10 @@ CREATE INDEX index_virtual_machines_on_hostname ON public.virtual_machines USING
 
 
 --
--- Name: index_virtual_machines_on_modified_at_uuid; Type: INDEX; Schema: public; Owner: -
+-- Name: index_virtual_machines_on_modified_at_and_uuid; Type: INDEX; Schema: public; Owner: -
 --
 
-CREATE INDEX index_virtual_machines_on_modified_at_uuid ON public.virtual_machines USING btree (modified_at DESC, uuid);
+CREATE INDEX index_virtual_machines_on_modified_at_and_uuid ON public.virtual_machines USING btree (modified_at, uuid);
 
 
 --
@@ -2766,10 +2752,17 @@ CREATE UNIQUE INDEX index_virtual_machines_on_uuid ON public.virtual_machines US
 
 
 --
--- Name: index_workflows_on_modified_at_uuid; Type: INDEX; Schema: public; Owner: -
+-- Name: index_workflows_on_created_at_and_uuid; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX index_workflows_on_created_at_and_uuid ON public.workflows USING btree (created_at, uuid);
+
+
+--
+-- Name: index_workflows_on_modified_at_and_uuid; Type: INDEX; Schema: public; Owner: -
 --
 
-CREATE INDEX index_workflows_on_modified_at_uuid ON public.workflows USING btree (modified_at DESC, uuid);
+CREATE INDEX index_workflows_on_modified_at_and_uuid ON public.workflows USING btree (modified_at, uuid);
 
 
 --
@@ -3185,6 +3178,7 @@ INSERT INTO "schema_migrations" (version) VALUES
 ('20211027154300'),
 ('20220224203102'),
 ('20220301155729'),
-('20220303204419');
+('20220303204419'),
+('20220401153101');
 
 
index ab76417902214162506707d3e642f93539ffe7ed..9280aeab935e19748f25ea347592d3d704161f02 100644 (file)
@@ -8,8 +8,8 @@ running:
   cancelled_at: ~
   cancelled_by_user_uuid: ~
   cancelled_by_client_uuid: ~
-  created_at: <%= 3.minute.ago.to_s(:db) %>
-  started_at: <%= 3.minute.ago.to_s(:db) %>
+  created_at: <%= 2.7.minute.ago.to_s(:db) %>
+  started_at: <%= 2.7.minute.ago.to_s(:db) %>
   finished_at: ~
   script: hash
   repository: active/foo
index 9621b3effc1c74f0b832c021b3c9d2b99ef11586..a504c9fadd790cb21e2410cb95cefd082c32cfbf 100644 (file)
@@ -97,7 +97,7 @@ has_job:
   state: Ready
   uuid: zzzzz-d1hrv-1yfj6xkidf2muk3
   owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
-  created_at: <%= 3.1.minute.ago.to_s(:db) %>
+  created_at: <%= 2.9.minute.ago.to_s(:db) %>
   components:
    foo:
     script: foo
@@ -112,7 +112,7 @@ components_is_jobspec:
   # Helps test that clients cope with funny-shaped components.
   # For an example, see #3321.
   uuid: zzzzz-d1hrv-1yfj61234abcdk4
-  created_at: <%= 2.minute.ago.to_s(:db) %>
+  created_at: <%= 4.minute.ago.to_s(:db) %>
   owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
   modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
   modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
@@ -132,7 +132,7 @@ pipeline_with_tagged_collection_input:
   state: Ready
   uuid: zzzzz-d1hrv-1yfj61234abcdk3
   owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
-  created_at: <%= 3.1.minute.ago.to_s(:db) %>
+  created_at: <%= 3.2.minute.ago.to_s(:db) %>
   components:
     part-one:
       script_parameters:
@@ -145,7 +145,7 @@ pipeline_to_merge_params:
   uuid: zzzzz-d1hrv-1yfj6dcba4321k3
   pipeline_template_uuid: zzzzz-p5p6p-aox0k0ofxrystgw
   owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
-  created_at: <%= 3.1.minute.ago.to_s(:db) %>
+  created_at: <%= 3.3.minute.ago.to_s(:db) %>
   components:
     part-one:
       script_parameters:
@@ -193,7 +193,7 @@ pipeline_instance_owned_by_fuse:
   uuid: zzzzz-d1hrv-ri9dvgkgqs9y09j
   owner_uuid: zzzzz-tpzed-0fusedrivertest
   pipeline_template_uuid: zzzzz-p5p6p-vq4wuvy84xvaq2r
-  created_at: 2014-09-15 12:00:00
+  created_at: 2014-09-16 12:00:00
   name: "pipeline instance owned by FUSE"
   components:
     foo:
@@ -210,7 +210,7 @@ pipeline_instance_in_fuse_project:
   uuid: zzzzz-d1hrv-scarxiyajtshq3l
   owner_uuid: zzzzz-j7d0g-0000ownedbyfuse
   pipeline_template_uuid: zzzzz-p5p6p-vq4wuvy84xvaq2r
-  created_at: 2014-09-15 12:00:00
+  created_at: 2014-09-17 12:00:00
   name: "pipeline instance in FUSE project"
   components:
     foo:
@@ -227,7 +227,7 @@ pipeline_owned_by_active_in_aproject:
   state: Complete
   uuid: zzzzz-d1hrv-ju5ghi0i9z2kqc6
   owner_uuid: zzzzz-j7d0g-v955i6s2oi1cbso
-  created_at: 2014-09-15 12:00:00
+  created_at: 2014-09-18 12:00:00
   components:
     foo:
       script: foo
@@ -243,7 +243,7 @@ pipeline_owned_by_active_in_home:
   state: Complete
   uuid: zzzzz-d1hrv-lihrbd0i9z2kqc6
   owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
-  created_at: 2014-09-15 12:00:00
+  created_at: 2014-09-19 12:00:00
   components:
     foo:
       script: foo
@@ -287,7 +287,7 @@ pipeline_in_publicly_accessible_project_but_other_objects_elsewhere:
   name: Pipeline in public project with other objects elsewhere
   pipeline_template_uuid: zzzzz-p5p6p-aox0k0ofxrystgw
   state: Complete
-  created_at: 2014-09-15 12:00:00
+  created_at: 2014-09-20 12:00:00
   components:
     foo:
       script: foo
@@ -314,7 +314,7 @@ new_pipeline_in_publicly_accessible_project:
   name: Pipeline in New state in publicly accessible project
   pipeline_template_uuid: zzzzz-p5p6p-tmpltpublicproj
   state: New
-  created_at: 2014-09-15 12:00:00
+  created_at: 2014-09-21 12:00:00
   components:
     foo:
       script: foo
@@ -331,7 +331,7 @@ new_pipeline_in_publicly_accessible_project_but_other_objects_elsewhere:
   name: Pipeline in New state in public project with objects elsewhere
   pipeline_template_uuid: zzzzz-p5p6p-aox0k0ofxrystgw
   state: New
-  created_at: 2014-09-15 12:00:00
+  created_at: 2014-09-22 12:00:00
   components:
     foo:
       script: foo
@@ -348,7 +348,7 @@ new_pipeline_in_publicly_accessible_project_with_dataclass_file_and_other_object
   name: Pipeline in public project in New state with file type data class with objects elsewhere
   pipeline_template_uuid: zzzzz-p5p6p-aox0k0ofxrystgw
   state: New
-  created_at: 2014-09-15 12:00:00
+  created_at: 2014-09-23 12:00:00
   components:
     foo:
       script: foo
@@ -363,8 +363,8 @@ pipeline_in_running_state:
   name: running_with_job
   uuid: zzzzz-d1hrv-runningpipeline
   owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
-  created_at: <%= 3.1.minute.ago.to_s(:db) %>
-  started_at: <%= 3.1.minute.ago.to_s(:db) %>
+  created_at: <%= 2.8.minute.ago.to_s(:db) %>
+  started_at: <%= 2.8.minute.ago.to_s(:db) %>
   state: RunningOnServer
   components:
    foo:
@@ -393,7 +393,7 @@ complete_pipeline_with_two_jobs:
   uuid: zzzzz-d1hrv-twodonepipeline
   owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
   state: Complete
-  created_at: <%= 3.minute.ago.to_s(:db) %>
+  created_at: <%= 2.5.minute.ago.to_s(:db) %>
   started_at: <%= 2.minute.ago.to_s(:db) %>
   finished_at: <%= 1.minute.ago.to_s(:db) %>
   components:
index 9bba418578e89c6dd36009ba2e3278f700d33eb3..fae9dc40c6daf2c9df1e28edae47cd743757db52 100644 (file)
@@ -24,7 +24,7 @@ class Arvados::V1::QueryTest < ActionController::TestCase
       controller: 'logs',
     }
     assert_response :success
-    assert_equal('logs.event_type asc, logs.modified_at desc, logs.uuid',
+    assert_equal('logs.event_type asc, logs.modified_at desc, logs.uuid desc',
                  assigns(:objects).order_values.join(', '))
   end
 
@@ -36,7 +36,7 @@ class Arvados::V1::QueryTest < ActionController::TestCase
       controller: 'logs',
     }
     assert_response :success
-    assert_equal('logs.modified_at asc, logs.uuid',
+    assert_equal('logs.modified_at asc, logs.uuid desc',
                  assigns(:objects).order_values.join(', '))
   end
 
@@ -51,7 +51,7 @@ class Arvados::V1::QueryTest < ActionController::TestCase
       controller: 'logs',
     }
     assert_response :success
-    assert_equal('logs.modified_at asc, logs.event_type desc, logs.uuid',
+    assert_equal('logs.modified_at asc, logs.event_type desc, logs.uuid desc',
                  assigns(:objects).order_values.join(', '))
   end
 
index 2ee3b3cf94cab5c73f3b4e809db8b6afac7aec81..0548a767ba4cbba9b3b94d6303b65db6cb53c7b8 100644 (file)
@@ -62,7 +62,7 @@ class SelectTest < ActionDispatch::IntegrationTest
       headers: auth(:admin)
     assert_response :success
     uuids = json_response['items'].collect { |i| i['uuid'] }
-    assert_equal uuids, uuids.sort
+    assert_equal uuids, uuids.sort.reverse
   end
 
   def assert_link_classes_ascend(current_class, prev_class)
index 7de95a0cb1b0d95bd1d67dcc58b5a3c406a863ff..f3816c0d3e783b6272c5abcc424641a4bb39d6dc 100644 (file)
@@ -525,15 +525,23 @@ class CollectionDirectory(CollectionDirectoryBase):
                         self.collection.update()
                         new_collection_record = self.collection.api_response()
                     else:
+                        # If there's too many prefetch threads and you
+                        # max out the CPU, delivering data to the FUSE
+                        # layer actually ends up being slower.
+                        # Experimentally, capping 7 threads seems to
+                        # be a sweet spot.
+                        get_threads = min(max((self.api.keep.block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7)
                         # Create a new collection object
                         if uuid_pattern.match(self.collection_locator):
                             coll_reader = arvados.collection.Collection(
                                 self.collection_locator, self.api, self.api.keep,
-                                num_retries=self.num_retries)
+                                num_retries=self.num_retries,
+                                get_threads=get_threads)
                         else:
                             coll_reader = arvados.collection.CollectionReader(
                                 self.collection_locator, self.api, self.api.keep,
-                                num_retries=self.num_retries)
+                                num_retries=self.num_retries,
+                                get_threads=get_threads)
                         new_collection_record = coll_reader.api_response() or {}
                         # If the Collection only exists in Keep, there will be no API
                         # response.  Fill in the fields we need.
index ece316193d4ee6a82cf04f6a685f09b0af453cf3..1601db59440be8b57c35b988869a1a56229ef92b 100644 (file)
@@ -1088,6 +1088,7 @@ class FuseFsyncTest(FuseMagicTest):
 class MagicDirApiError(FuseMagicTest):
     def setUp(self):
         api = mock.MagicMock()
+        api.keep.block_cache = mock.MagicMock(cache_max=1)
         super(MagicDirApiError, self).setUp(api=api)
         api.collections().get().execute.side_effect = iter([
             Exception('API fail'),
index 97ec95e3aac3f96111ab49014635ae742073b4e8..ef61b06873c50661bb29f622bfb1b5e9a1097495 100644 (file)
@@ -913,6 +913,14 @@ func (h *handler) logUploadOrDownload(
                        WithField("collection_file_path", filepath)
                props["collection_uuid"] = collection.UUID
                props["collection_file_path"] = filepath
+               // h.determineCollection populates the collection_uuid prop with the PDH, if
+               // this collection is being accessed via PDH. In that case, blank the
+               // collection_uuid field so that consumers of the log entries can rely on it
+               // being a UUID, or blank. The PDH remains available via the
+               // portable_data_hash property.
+               if props["collection_uuid"] == collection.PortableDataHash {
+                       props["collection_uuid"] = ""
+               }
        }
        if r.Method == "PUT" || r.Method == "POST" {
                log.Info("File upload")
index 997da57e052db81a25306507b23b3f60935b129e..26a4f28067663898ac660e647d91ba5fa71dbfb3 100755 (executable)
@@ -10,12 +10,52 @@ import arvados
 import arvados.util
 import datetime
 import ciso8601
+import csv
 
 def parse_arguments(arguments):
     arg_parser = argparse.ArgumentParser()
-    arg_parser.add_argument('--days', type=int, required=True)
+    arg_parser.add_argument('--start', help='Start date for the report in YYYY-MM-DD format (UTC)')
+    arg_parser.add_argument('--end', help='End date for the report in YYYY-MM-DD format (UTC)')
+    arg_parser.add_argument('--days', type=int, help='Number of days before now() to start the report')
+    arg_parser.add_argument('--csv', action='store_true', help='Output in csv format (default: false)')
     args = arg_parser.parse_args(arguments)
-    return args
+
+    if args.days and (args.start or args.end):
+        arg_parser.print_help()
+        print("Error: either specify --days or both --start and --end")
+        exit(1)
+
+    if not args.days and (not args.start or not args.end):
+        arg_parser.print_help()
+        print("\nError: either specify --days or both --start and --end")
+        exit(1)
+
+    if (args.start and not args.end) or (args.end and not args.start):
+        arg_parser.print_help()
+        print("\nError: no start or end date found, either specify --days or both --start and --end")
+        exit(1)
+
+    if args.days:
+        to = datetime.datetime.utcnow()
+        since = to - datetime.timedelta(days=args.days)
+
+    if args.start:
+        try:
+            since = datetime.datetime.strptime(args.start,"%Y-%m-%d")
+        except:
+            arg_parser.print_help()
+            print("\nError: start date must be in YYYY-MM-DD format")
+            exit(1)
+
+    if args.end:
+        try:
+            to = datetime.datetime.strptime(args.end,"%Y-%m-%d")
+        except:
+            arg_parser.print_help()
+            print("\nError: end date must be in YYYY-MM-DD format")
+            exit(1)
+
+    return args, since, to
 
 def getowner(arv, uuid, owners):
     if uuid is None:
@@ -33,20 +73,46 @@ def getowner(arv, uuid, owners):
     return getowner(arv, owners[uuid], owners)
 
 def getuserinfo(arv, uuid):
-    u = arv.users().get(uuid=uuid).execute()
+    try:
+        u = arv.users().get(uuid=uuid).execute()
+    except:
+        return "deleted user (%susers/%s)" % (arv.config()["Services"]["Workbench1"]["ExternalURL"],
+                                                       uuid)
     prof = "\n".join("  %s: \"%s\"" % (k, v) for k, v in u["prefs"].get("profile", {}).items() if v)
     if prof:
         prof = "\n"+prof+"\n"
     return "%s %s <%s> (%susers/%s)%s" % (u["first_name"], u["last_name"], u["email"],
                                                        arv.config()["Services"]["Workbench1"]["ExternalURL"],
                                                        uuid, prof)
+def getuserinfocsv(arv, uuid):
+    try:
+        u = arv.users().get(uuid=uuid).execute()
+    except:
+        return [uuid,"deleted","user",""]
+    return [uuid, u["first_name"], u["last_name"], u["email"]]
+
 
 collectionNameCache = {}
-def getCollectionName(arv, uuid):
-    if uuid not in collectionNameCache:
-        u = arv.collections().get(uuid=uuid).execute()
-        collectionNameCache[uuid] = u["name"]
-    return collectionNameCache[uuid]
+def getCollectionName(arv, uuid, pdh):
+    lookupField = uuid
+    filters = [["uuid","=",uuid]]
+    cached = uuid in collectionNameCache
+    # look up by uuid if it is available, fall back to look up by pdh
+    if len(uuid) != 27:
+        # Look up by pdh. Note that this can be misleading; the download could
+        # have happened from a collection with the same pdh but different name.
+        # We arbitrarily pick the oldest collection with the pdh to lookup the
+        # name, if the uuid for the request is not known.
+        lookupField = pdh
+        filters = [["portable_data_hash","=",pdh]]
+        cached = pdh in collectionNameCache
+
+    if not cached:
+        u = arv.collections().list(filters=filters,order="created_at",limit=1).execute().get("items")
+        if len(u) < 1:
+            return "(deleted)"
+        collectionNameCache[lookupField] = u[0]["name"]
+    return collectionNameCache[lookupField]
 
 def getname(u):
     return "\"%s\" (%s)" % (u["name"], u["uuid"])
@@ -55,17 +121,20 @@ def main(arguments=None):
     if arguments is None:
         arguments = sys.argv[1:]
 
-    args = parse_arguments(arguments)
+    args, since, to = parse_arguments(arguments)
 
     arv = arvados.api()
 
-    since = datetime.datetime.utcnow() - datetime.timedelta(days=args.days)
+    prefix = ''
+    suffix = "\n"
+    if args.csv:
+        prefix = '# '
+        suffix = ''
+    print("%sUser activity on %s between %s and %s%s" % (prefix, arv.config()["ClusterID"],
+                                                       since.isoformat(sep=" ", timespec="minutes"),
+                                                       to.isoformat(sep=" ", timespec="minutes"), suffix))
 
-    print("User activity on %s between %s and %s\n" % (arv.config()["ClusterID"],
-                                                       (datetime.datetime.now() - datetime.timedelta(days=args.days)).isoformat(sep=" ", timespec="minutes"),
-                                                       datetime.datetime.now().isoformat(sep=" ", timespec="minutes")))
-
-    events = arvados.util.keyset_list_all(arv.logs().list, filters=[["created_at", ">=", since.isoformat()]])
+    events = arvados.util.keyset_list_all(arv.logs().list, filters=[["created_at", ">=", since.isoformat()],["created_at", "<", to.isoformat()]])
 
     users = {}
     owners = {}
@@ -74,99 +143,112 @@ def main(arguments=None):
         owner = getowner(arv, e["object_owner_uuid"], owners)
         users.setdefault(owner, [])
         event_at = ciso8601.parse_datetime(e["event_at"]).astimezone().isoformat(sep=" ", timespec="minutes")
-        # loguuid = e["uuid"]
-        loguuid = ""
+        loguuid = e["uuid"]
 
         if e["event_type"] == "create" and e["object_uuid"][6:11] == "tpzed":
             users.setdefault(e["object_uuid"], [])
-            users[e["object_uuid"]].append("%s User account created" % event_at)
+            users[e["object_uuid"]].append([loguuid, event_at, "User account created"])
 
         elif e["event_type"] == "update" and e["object_uuid"][6:11] == "tpzed":
             pass
 
         elif e["event_type"] == "create" and e["object_uuid"][6:11] == "xvhdp":
             if e["properties"]["new_attributes"]["requesting_container_uuid"] is None:
-                users[owner].append("%s Ran container %s %s" % (event_at, getname(e["properties"]["new_attributes"]), loguuid))
+                users[owner].append([loguuid, event_at, "Ran container %s" % (getname(e["properties"]["new_attributes"]))])
 
         elif e["event_type"] == "update" and e["object_uuid"][6:11] == "xvhdp":
             pass
 
         elif e["event_type"] == "create" and e["object_uuid"][6:11] == "j7d0g":
-            users[owner].append("%s Created project %s" %  (event_at, getname(e["properties"]["new_attributes"])))
+            users[owner].append([loguuid, event_at,"Created project %s" % (getname(e["properties"]["new_attributes"]))])
 
         elif e["event_type"] == "delete" and e["object_uuid"][6:11] == "j7d0g":
-            users[owner].append("%s Deleted project %s" % (event_at, getname(e["properties"]["old_attributes"])))
+            users[owner].append([loguuid, event_at,"Deleted project %s" % (getname(e["properties"]["old_attributes"]))])
 
         elif e["event_type"] == "update" and e["object_uuid"][6:11] == "j7d0g":
-            users[owner].append("%s Updated project %s" % (event_at, getname(e["properties"]["new_attributes"])))
+            users[owner].append([loguuid, event_at,"Updated project %s" % (getname(e["properties"]["new_attributes"]))])
 
         elif e["event_type"] in ("create", "update") and e["object_uuid"][6:11] == "gj3su":
             since_last = None
-            if len(users[owner]) > 0 and users[owner][-1].endswith("activity"):
-                sp = users[owner][-1].split(" ")
-                start = sp[0]+" "+sp[1]
-                since_last = ciso8601.parse_datetime(event_at) - ciso8601.parse_datetime(sp[3]+" "+sp[4])
+            if len(users[owner]) > 0 and users[owner][-1][-1].endswith("activity"):
+                sp = users[owner][-1][-1].split(" ")
+                start = users[owner][-1][1]
+                since_last = ciso8601.parse_datetime(event_at) - ciso8601.parse_datetime(sp[1]+" "+sp[2])
                 span = ciso8601.parse_datetime(event_at) - ciso8601.parse_datetime(start)
 
             if since_last is not None and since_last < datetime.timedelta(minutes=61):
-                users[owner][-1] = "%s to %s (%02d:%02d) Account activity" % (start, event_at, span.days*24 + int(span.seconds/3600), int((span.seconds % 3600)/60))
+                users[owner][-1] = [loguuid, start,"to %s (%02d:%02d) Account activity" % (event_at, span.days*24 + int(span.seconds/3600), int((span.seconds % 3600)/60))]
             else:
-                users[owner].append("%s to %s (0:00) Account activity" % (event_at, event_at))
+                users[owner].append([loguuid, event_at,"to %s (0:00) Account activity" % (event_at)])
 
         elif e["event_type"] == "create" and e["object_uuid"][6:11] == "o0j2j":
             if e["properties"]["new_attributes"]["link_class"] == "tag":
-                users[owner].append("%s Tagged %s" % (event_at, e["properties"]["new_attributes"]["head_uuid"]))
+                users[owner].append([event_at,"Tagged %s" % (e["properties"]["new_attributes"]["head_uuid"])])
             elif e["properties"]["new_attributes"]["link_class"] == "permission":
-                users[owner].append("%s Shared %s with %s" % (event_at, e["properties"]["new_attributes"]["tail_uuid"], e["properties"]["new_attributes"]["head_uuid"]))
+                users[owner].append([loguuid, event_at,"Shared %s with %s" % (e["properties"]["new_attributes"]["tail_uuid"], e["properties"]["new_attributes"]["head_uuid"])])
             else:
-                users[owner].append("%s %s %s %s" % (e["event_type"], e["object_kind"], e["object_uuid"], loguuid))
+                users[owner].append([loguuid, event_at,"%s %s %s" % (e["event_type"], e["object_kind"], e["object_uuid"])])
 
         elif e["event_type"] == "delete" and e["object_uuid"][6:11] == "o0j2j":
             if e["properties"]["old_attributes"]["link_class"] == "tag":
-                users[owner].append("%s Untagged %s" % (event_at, e["properties"]["old_attributes"]["head_uuid"]))
+                users[owner].append([loguuid, event_at,"Untagged %s" % (e["properties"]["old_attributes"]["head_uuid"])])
             elif e["properties"]["old_attributes"]["link_class"] == "permission":
-                users[owner].append("%s Unshared %s with %s" % (event_at, e["properties"]["old_attributes"]["tail_uuid"], e["properties"]["old_attributes"]["head_uuid"]))
+                users[owner].append([loguuid, event_at,"Unshared %s with %s" % (e["properties"]["old_attributes"]["tail_uuid"], e["properties"]["old_attributes"]["head_uuid"])])
             else:
-                users[owner].append("%s %s %s %s" % (e["event_type"], e["object_kind"], e["object_uuid"], loguuid))
+                users[owner].append([loguuid, event_at,"%s %s %s" % (e["event_type"], e["object_kind"], e["object_uuid"])])
 
         elif e["event_type"] == "create" and e["object_uuid"][6:11] == "4zz18":
             if e["properties"]["new_attributes"]["properties"].get("type") in ("log", "output", "intermediate"):
                 pass
             else:
-                users[owner].append("%s Created collection %s %s" % (event_at, getname(e["properties"]["new_attributes"]), loguuid))
+                users[owner].append([loguuid, event_at,"Created collection %s" % (getname(e["properties"]["new_attributes"]))])
 
         elif e["event_type"] == "update" and e["object_uuid"][6:11] == "4zz18":
-            users[owner].append("%s Updated collection %s %s" % (event_at, getname(e["properties"]["new_attributes"]), loguuid))
+            users[owner].append([loguuid, event_at,"Updated collection %s" % (getname(e["properties"]["new_attributes"]))])
 
         elif e["event_type"] == "delete" and e["object_uuid"][6:11] == "4zz18":
             if e["properties"]["old_attributes"]["properties"].get("type") in ("log", "output", "intermediate"):
                 pass
             else:
-                users[owner].append("%s Deleted collection %s %s" % (event_at, getname(e["properties"]["old_attributes"]), loguuid))
+                users[owner].append([loguuid, event_at, "Deleted collection %s" % (getname(e["properties"]["old_attributes"]))])
 
         elif e["event_type"] == "file_download":
-                users[e["object_uuid"]].append("%s Downloaded file \"%s\" from \"%s\" (%s) (%s)" % (event_at,
+                users.setdefault(e["object_uuid"], [])
+                users[e["object_uuid"]].append([loguuid, event_at, "Downloaded file \"%s\" from \"%s\" (%s) (%s)" % (
                                                                                        e["properties"].get("collection_file_path") or e["properties"].get("reqPath"),
-                                                                                       getCollectionName(arv, e["properties"].get("collection_uuid")),
+                                                                                       getCollectionName(arv, e["properties"].get("collection_uuid"), e["properties"].get("portable_data_hash")),
                                                                                        e["properties"].get("collection_uuid"),
-                                                                                       e["properties"].get("portable_data_hash")))
+                                                                                       e["properties"].get("portable_data_hash"))])
+
 
         elif e["event_type"] == "file_upload":
-                users[e["object_uuid"]].append("%s Uploaded file \"%s\" to \"%s\" (%s)" % (event_at,
+                users.setdefault(e["object_uuid"], [])
+                users[e["object_uuid"]].append([loguuid, event_at, "Uploaded file \"%s\" to \"%s\" (%s)" % (
                                                                                     e["properties"].get("collection_file_path") or e["properties"].get("reqPath"),
-                                                                                    getCollectionName(arv, e["properties"].get("collection_uuid")),
-                                                                                    e["properties"].get("collection_uuid")))
+                                                                                    getCollectionName(arv, e["properties"].get("collection_uuid"), e["properties"].get("portable_data_hash")),
+                                                                                    e["properties"].get("collection_uuid"))])
 
         else:
-            users[owner].append("%s %s %s %s" % (e["event_type"], e["object_kind"], e["object_uuid"], loguuid))
+            users[owner].append([loguuid, event_at, "%s %s %s" % (e["event_type"], e["object_kind"], e["object_uuid"])])
+
+    if args.csv:
+        csvwriter = csv.writer(sys.stdout, dialect='unix')
 
     for k,v in users.items():
         if k is None or k.endswith("-tpzed-000000000000000"):
             continue
-        print(getuserinfo(arv, k))
-        for ev in v:
-            print("  %s" % ev)
-        print("")
+        if not args.csv:
+          print(getuserinfo(arv, k))
+          for ev in v:
+              # Remove the log entry uuid, this report is intended for human consumption
+              ev.pop(0)
+              print("  %s" % ' '.join(ev))
+          print("")
+        else:
+          user = getuserinfocsv(arv, k)
+          for ev in v:
+            ev = user + ev
+            csvwriter.writerow(ev)
 
 if __name__ == "__main__":
     main()