def find_objects_for_index
@objects ||= model_class
- @objects = @objects.filter(@filters).limit(@limit).offset(@offset)
+ @objects = @objects.filter(@filters).limit(@limit).offset(@offset).order(@order)
@objects.fetch_multiple_pages(false)
end
done
"$bundle" version | tee /dev/stderr | grep -q 'version 2'
) || fatal 'install bundler'
+ if test -d /var/lib/arvados-arvbox/ ; then
+ # Inside arvbox, use bundler-installed binstubs. The
+ # system bundler and rail's own bin/bundle refuse to work.
+ # I don't know why.
+ bundle=binstubs/bundle
+ fi
fi
}
<notextile>
<pre>
$ <code class="userinput">arv keep get --help</code>
-usage: arv-get [-h] [--retries RETRIES]
+usage: arv-get [-h] [--retries RETRIES] [--version]
[--progress | --no-progress | --batch-progress]
- [--hash HASH | --md5sum] [-n] [-r] [-f | --skip-existing]
+ [--hash HASH | --md5sum] [-n] [-r]
+ [-f | -v | --skip-existing | --strip-manifest] [--threads N]
locator [destination]
Copy data from Keep to a local file or pipe.
locator Collection locator, optionally with a file path or
prefix.
destination Local file or directory where the data is to be written.
- Default: /dev/stdout.
+ Default: stdout.
optional arguments:
-h, --help show this help message and exit
--retries RETRIES Maximum number of times to retry server requests that
- encounter temporary failures (e.g., server down). Default
- 3.
+ encounter temporary failures (e.g., server down).
+ Default 3.
+ --version Print version and exit.
--progress Display human-readable progress on stderr (bytes and, if
possible, percentage of total data size). This is the
default behavior when it is not expected to interfere
with the output: specifically, stderr is a tty _and_
- either stdout is not a tty, or output is being written to
- named files rather than stdout.
+ either stdout is not a tty, or output is being written
+ to named files rather than stdout.
--no-progress Do not display human-readable progress on stderr.
--batch-progress Display machine-readable progress on stderr (bytes and,
if known, total data size).
-f Overwrite existing files while writing. The default
behavior is to refuse to write *anything* if any of the
output files already exist. As a special case, -f is not
- needed to write to /dev/stdout.
- --skip-existing Skip files that already exist. The default behavior is to
- refuse to write *anything* if any files exist that would
- have to be overwritten. This option causes even devices,
- sockets, and fifos to be skipped.
+ needed to write to stdout.
+ -v Once for verbose mode, twice for debug mode.
+ --skip-existing Skip files that already exist. The default behavior is
+ to refuse to write *anything* if any files exist that
+ would have to be overwritten. This option causes even
+ devices, sockets, and fifos to be skipped.
+ --strip-manifest When getting a collection manifest, strip its access
+ tokens before writing it.
+ --threads N Set the number of download threads to be used. Take into
+ account that using lots of threads will increase the RAM
+ requirements. Default is to use 4 threads. On high
+ latency installations, using a greater number will
+ improve overall throughput.
</pre>
</notextile>
package controller
import (
+ "bytes"
"context"
"crypto/tls"
"encoding/json"
+ "io"
"io/ioutil"
"net/http"
"net/http/httptest"
type HandlerSuite struct {
cluster *arvados.Cluster
handler *Handler
+ logbuf *bytes.Buffer
ctx context.Context
cancel context.CancelFunc
}
func (s *HandlerSuite) SetUpTest(c *check.C) {
+ s.logbuf = &bytes.Buffer{}
s.ctx, s.cancel = context.WithCancel(context.Background())
- s.ctx = ctxlog.Context(s.ctx, ctxlog.New(os.Stderr, "json", "debug"))
+ s.ctx = ctxlog.Context(s.ctx, ctxlog.New(io.MultiWriter(os.Stderr, s.logbuf), "json", "debug"))
s.cluster = &arvados.Cluster{
ClusterID: "zzzzz",
PostgreSQL: integrationTestCluster().PostgreSQL,
}
}
+func (s *HandlerSuite) TestLogTokenUUID(c *check.C) {
+ req := httptest.NewRequest("GET", "https://0.0.0.0/arvados/v1/users/current", nil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveTokenV2)
+ req = req.WithContext(s.ctx)
+ resp := httptest.NewRecorder()
+ httpserver.LogRequests(s.handler).ServeHTTP(resp, req)
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+ c.Check(s.logbuf.String(), check.Matches, `(?ms).*"tokenUUIDs":\["`+strings.Split(arvadostest.ActiveTokenV2, "/")[1]+`"\].*`)
+}
+
func (s *HandlerSuite) TestCreateAPIToken(c *check.C) {
req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
auth, err := s.handler.createAPItoken(req, arvadostest.ActiveUserUUID, nil)
"apiOptsType": fmt.Sprintf("%T", opts),
"apiOpts": opts,
}).Debug("exec")
+ // Extract the token UUIDs (or a placeholder for v1 tokens)
+ var tokenUUIDs []string
+ for _, t := range creds.Tokens {
+ if strings.HasPrefix(t, "v2/") {
+ tokenParts := strings.Split(t, "/")
+ if len(tokenParts) >= 3 {
+ tokenUUIDs = append(tokenUUIDs, tokenParts[1])
+ }
+ } else {
+ end := t
+ if len(t) > 5 {
+ end = t[len(t)-5:]
+ }
+ tokenUUIDs = append(tokenUUIDs, "v1 token ending in "+end)
+ }
+ }
+ httpserver.SetResponseLogFields(req.Context(), logrus.Fields{"tokenUUIDs": tokenUUIDs})
resp, err := exec(ctx, opts)
if err != nil {
logger.WithError(err).Debugf("returning error type %T", err)
"errors"
"fmt"
"io"
+ "io/fs"
"io/ioutil"
"log"
"net/http"
}
// NewClientFromEnv creates a new Client that uses the default HTTP
-// client with the API endpoint and credentials given by the
-// ARVADOS_API_* environment variables.
+// client, and loads API endpoint and credentials from ARVADOS_*
+// environment variables (if set) and
+// $HOME/.config/arvados/settings.conf (if readable).
+//
+// If a config exists in both locations, the environment variable is
+// used.
+//
+// If there is an error (other than ENOENT) reading settings.conf,
+// NewClientFromEnv logs the error to log.Default(), then proceeds as
+// if settings.conf did not exist.
+//
+// Space characters are trimmed when reading the settings file, so
+// these are equivalent:
+//
+// ARVADOS_API_HOST=localhost\n
+// ARVADOS_API_HOST=localhost\r\n
+// ARVADOS_API_HOST = localhost \n
+// \tARVADOS_API_HOST = localhost\n
func NewClientFromEnv() *Client {
+ vars := map[string]string{}
+ home := os.Getenv("HOME")
+ conffile := home + "/.config/arvados/settings.conf"
+ if home == "" {
+ // no $HOME => just use env vars
+ } else if settings, err := os.ReadFile(conffile); errors.Is(err, fs.ErrNotExist) {
+ // no config file => just use env vars
+ } else if err != nil {
+ // config file unreadable => log message, then use env vars
+ log.Printf("continuing without loading %s: %s", conffile, err)
+ } else {
+ for _, line := range bytes.Split(settings, []byte{'\n'}) {
+ kv := bytes.SplitN(line, []byte{'='}, 2)
+ k := string(bytes.TrimSpace(kv[0]))
+ if len(kv) != 2 || !strings.HasPrefix(k, "ARVADOS_") {
+ // Same behavior as python sdk:
+ // silently skip leading # (comments),
+ // blank lines, typos, and non-Arvados
+ // vars.
+ continue
+ }
+ vars[k] = string(bytes.TrimSpace(kv[1]))
+ }
+ }
+ for _, env := range os.Environ() {
+ if !strings.HasPrefix(env, "ARVADOS_") {
+ continue
+ }
+ kv := strings.SplitN(env, "=", 2)
+ if len(kv) == 2 {
+ vars[kv[0]] = kv[1]
+ }
+ }
var svcs []string
- for _, s := range strings.Split(os.Getenv("ARVADOS_KEEP_SERVICES"), " ") {
+ for _, s := range strings.Split(vars["ARVADOS_KEEP_SERVICES"], " ") {
if s == "" {
continue
} else if u, err := url.Parse(s); err != nil {
}
}
var insecure bool
- if s := strings.ToLower(os.Getenv("ARVADOS_API_HOST_INSECURE")); s == "1" || s == "yes" || s == "true" {
+ if s := strings.ToLower(vars["ARVADOS_API_HOST_INSECURE"]); s == "1" || s == "yes" || s == "true" {
insecure = true
}
return &Client{
Scheme: "https",
- APIHost: os.Getenv("ARVADOS_API_HOST"),
- AuthToken: os.Getenv("ARVADOS_API_TOKEN"),
+ APIHost: vars["ARVADOS_API_HOST"],
+ AuthToken: vars["ARVADOS_API_TOKEN"],
Insecure: insecure,
KeepServiceURIs: svcs,
Timeout: 5 * time.Minute,
"io/ioutil"
"net/http"
"net/url"
+ "os"
+ "strings"
"sync"
- "testing"
"testing/iotest"
+
+ check "gopkg.in/check.v1"
)
type stubTransport struct {
}, nil
}
-func TestCurrentUser(t *testing.T) {
- t.Parallel()
+var _ = check.Suite(&clientSuite{})
+
+type clientSuite struct{}
+
+func (*clientSuite) TestCurrentUser(c *check.C) {
stub := &stubTransport{
Responses: map[string]string{
"/arvados/v1/users/current": `{"uuid":"zzzzz-abcde-012340123401234"}`,
},
}
- c := &Client{
+ client := &Client{
Client: &http.Client{
Transport: stub,
},
APIHost: "zzzzz.arvadosapi.com",
AuthToken: "xyzzy",
}
- u, err := c.CurrentUser()
- if err != nil {
- t.Fatal(err)
- }
- if x := "zzzzz-abcde-012340123401234"; u.UUID != x {
- t.Errorf("got uuid %q, expected %q", u.UUID, x)
- }
- if len(stub.Requests) < 1 {
- t.Fatal("empty stub.Requests")
- }
+ u, err := client.CurrentUser()
+ c.Check(err, check.IsNil)
+ c.Check(u.UUID, check.Equals, "zzzzz-abcde-012340123401234")
+ c.Check(stub.Requests, check.Not(check.HasLen), 0)
hdr := stub.Requests[len(stub.Requests)-1].Header
- if hdr.Get("Authorization") != "OAuth2 xyzzy" {
- t.Errorf("got headers %+q, expected Authorization header", hdr)
- }
+ c.Check(hdr.Get("Authorization"), check.Equals, "OAuth2 xyzzy")
- c.Client.Transport = &errorTransport{}
- u, err = c.CurrentUser()
- if err == nil {
- t.Errorf("got nil error, expected something awful")
- }
+ client.Client.Transport = &errorTransport{}
+ u, err = client.CurrentUser()
+ c.Check(err, check.NotNil)
}
-func TestAnythingToValues(t *testing.T) {
+func (*clientSuite) TestAnythingToValues(c *check.C) {
type testCase struct {
in interface{}
// ok==nil means anythingToValues should return an
ok: nil,
},
} {
- t.Logf("%#v", tc.in)
+ c.Logf("%#v", tc.in)
out, err := anythingToValues(tc.in)
- switch {
- case tc.ok == nil:
- if err == nil {
- t.Errorf("got %#v, expected error", out)
- }
- case err != nil:
- t.Errorf("got err %#v, expected nil", err)
- case !tc.ok(out):
- t.Errorf("got %#v but tc.ok() says that is wrong", out)
+ if tc.ok == nil {
+ c.Check(err, check.NotNil)
+ continue
}
+ c.Check(err, check.IsNil)
+ c.Check(tc.ok(out), check.Equals, true)
}
}
+
+func (*clientSuite) TestLoadConfig(c *check.C) {
+ oldenv := os.Environ()
+ defer func() {
+ os.Clearenv()
+ for _, s := range oldenv {
+ i := strings.IndexRune(s, '=')
+ os.Setenv(s[:i], s[i+1:])
+ }
+ }()
+
+ tmp := c.MkDir()
+ os.Setenv("HOME", tmp)
+ for _, s := range os.Environ() {
+ if strings.HasPrefix(s, "ARVADOS_") {
+ i := strings.IndexRune(s, '=')
+ os.Unsetenv(s[:i])
+ }
+ }
+ os.Mkdir(tmp+"/.config", 0777)
+ os.Mkdir(tmp+"/.config/arvados", 0777)
+
+ // Use $HOME/.config/arvados/settings.conf if no env vars are
+ // set
+ os.WriteFile(tmp+"/.config/arvados/settings.conf", []byte(`
+ ARVADOS_API_HOST = localhost:1
+ ARVADOS_API_TOKEN = token_from_settings_file1
+ `), 0777)
+ client := NewClientFromEnv()
+ c.Check(client.AuthToken, check.Equals, "token_from_settings_file1")
+ c.Check(client.APIHost, check.Equals, "localhost:1")
+ c.Check(client.Insecure, check.Equals, false)
+
+ // ..._INSECURE=true, comments, ignored lines in settings.conf
+ os.WriteFile(tmp+"/.config/arvados/settings.conf", []byte(`
+ (ignored) = (ignored)
+ #ARVADOS_API_HOST = localhost:2
+ ARVADOS_API_TOKEN = token_from_settings_file2
+ ARVADOS_API_HOST_INSECURE = true
+ `), 0777)
+ client = NewClientFromEnv()
+ c.Check(client.AuthToken, check.Equals, "token_from_settings_file2")
+ c.Check(client.APIHost, check.Equals, "")
+ c.Check(client.Insecure, check.Equals, true)
+
+ // Environment variables override settings.conf
+ os.Setenv("ARVADOS_API_HOST", "[::]:3")
+ os.Setenv("ARVADOS_API_HOST_INSECURE", "0")
+ client = NewClientFromEnv()
+ c.Check(client.AuthToken, check.Equals, "token_from_settings_file2")
+ c.Check(client.APIHost, check.Equals, "[::]:3")
+ c.Check(client.Insecure, check.Equals, false)
+}
"context"
"net"
"net/http"
+ "sync"
"time"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
}
var (
- requestTimeContextKey = contextKey{"requestTime"}
+ requestTimeContextKey = contextKey{"requestTime"}
+ responseLogFieldsContextKey = contextKey{"responseLogFields"}
+ mutexContextKey = contextKey{"mutex"}
)
type hijacker interface {
})
}
+func SetResponseLogFields(ctx context.Context, fields logrus.Fields) {
+ m, _ := ctx.Value(&mutexContextKey).(*sync.Mutex)
+ c, _ := ctx.Value(&responseLogFieldsContextKey).(logrus.Fields)
+ if m == nil || c == nil {
+ return
+ }
+ m.Lock()
+ defer m.Unlock()
+ for k, v := range fields {
+ c[k] = v
+ }
+}
+
// LogRequests wraps an http.Handler, logging each request and
// response.
func LogRequests(h http.Handler) http.Handler {
})
ctx := req.Context()
ctx = context.WithValue(ctx, &requestTimeContextKey, time.Now())
+ ctx = context.WithValue(ctx, &responseLogFieldsContextKey, logrus.Fields{})
+ ctx = context.WithValue(ctx, &mutexContextKey, &sync.Mutex{})
ctx = ctxlog.Context(ctx, lgr)
req = req.WithContext(ctx)
"timeWriteBody": stats.Duration(tDone.Sub(writeTime)),
})
}
+ if responseLogFields, ok := req.Context().Value(&responseLogFieldsContextKey).(logrus.Fields); ok {
+ lgr = lgr.WithFields(responseLogFields)
+ }
respCode := w.WroteStatus()
if respCode == 0 {
respCode = http.StatusOK
DEFAULT_PUT_THREADS = 2
DEFAULT_GET_THREADS = 2
- def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None):
+ def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None, get_threads=None):
"""keep: KeepClient object to use"""
self._keep = keep
self._bufferblocks = collections.OrderedDict()
self.lock = threading.Lock()
self.prefetch_enabled = True
self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
- self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
+ self.num_get_threads = get_threads or _BlockManager.DEFAULT_GET_THREADS
self.copies = copies
self.storage_classes = storage_classes_func or (lambda: [])
self._pending_write_size = 0
b = self._prefetch_queue.get()
if b is None:
return
- self._keep.get(b)
+ self._keep.get(b, prefetch=True)
except Exception:
_logger.exception("Exception doing block prefetch")
if not self.prefetch_enabled:
return
- if self._keep.get_from_cache(locator) is not None:
- return
-
with self.lock:
if locator in self._bufferblocks:
return
if size == 0 or offset >= self.size():
return b''
readsegs = locators_and_ranges(self._segments, offset, size)
- prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
+ prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager().num_get_threads, limit=32)
locs = set()
data = []
block_manager=None,
replication_desired=None,
storage_classes_desired=None,
- put_threads=None):
+ put_threads=None,
+ get_threads=None):
"""Collection constructor.
:manifest_locator_or_text:
self.replication_desired = replication_desired
self._storage_classes_desired = storage_classes_desired
self.put_threads = put_threads
+ self.get_threads = get_threads
if apiconfig:
self._config = apiconfig
copies = (self.replication_desired or
self._my_api()._rootDesc.get('defaultCollectionReplication',
2))
- self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads, num_retries=self.num_retries, storage_classes_func=self.storage_classes_desired)
+ self._block_manager = _BlockManager(self._my_keep(),
+ copies=copies,
+ put_threads=self.put_threads,
+ num_retries=self.num_retries,
+ storage_classes_func=self.storage_classes_desired,
+ get_threads=self.get_threads,)
return self._block_manager
def _remember_api_response(self, response):
it.
""")
+parser.add_argument('--threads', type=int, metavar='N', default=4,
+ help="""
+Set the number of download threads to be used. Take into account that
+using lots of threads will increase the RAM requirements. Default is
+to use 4 threads.
+On high latency installations, using a greater number will improve
+overall throughput.
+""")
+
def parse_arguments(arguments, stdout, stderr):
args = parser.parse_args(arguments)
try:
reader = arvados.CollectionReader(
- col_loc, api_client=api_client, num_retries=args.retries)
+ col_loc, api_client=api_client, num_retries=args.retries,
+ keep_client=arvados.keep.KeepClient(block_cache=arvados.keep.KeepBlockCache((args.threads+1)*64 * 1024 * 1024)),
+ get_threads=args.threads)
except Exception as error:
logger.error("failed to read collection: {}".format(error))
return 1
else:
return None
- def get_from_cache(self, loc):
+ def get_from_cache(self, loc_s):
"""Fetch a block only if is in the cache, otherwise return None."""
- slot = self.block_cache.get(loc)
+ locator = KeepLocator(loc_s)
+ slot = self.block_cache.get(locator.md5sum)
if slot is not None and slot.ready.is_set():
return slot.get()
else:
def get(self, loc_s, **kwargs):
return self._get_or_head(loc_s, method="GET", **kwargs)
- def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None):
+ def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
"""Get data from Keep.
This method fetches one or more blocks of data from Keep. It
if method == "GET":
slot, first = self.block_cache.reserve_cache(locator.md5sum)
if not first:
+ if prefetch:
+ # this is request for a prefetch, if it is
+ # already in flight, return immediately.
+ # clear 'slot' to prevent finally block from
+ # calling slot.set()
+ slot = None
+ return None
self.hits_counter.add(1)
blob = slot.get()
if blob is None:
return True
if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
return True
-
- def is_cached(self, locator):
- return self.block_cache.reserve_cache(expect_hash)
pagesize = 1000
kwargs["limit"] = pagesize
kwargs["count"] = 'none'
- kwargs["order"] = ["%s %s" % (order_key, "asc" if ascending else "desc"), "uuid asc"]
+ asc = "asc" if ascending else "desc"
+ kwargs["order"] = ["%s %s" % (order_key, asc), "uuid %s" % asc]
other_filters = kwargs.get("filters", [])
if "select" in kwargs and "uuid" not in kwargs["select"]:
if firstitem[order_key] == lastitem[order_key]:
# Got a page where every item has the same order key.
# Switch to using uuid for paging.
- nextpage = [[order_key, "=", lastitem[order_key]], ["uuid", ">", lastitem["uuid"]]]
+ nextpage = [[order_key, "=", lastitem[order_key]], ["uuid", ">" if ascending else "<", lastitem["uuid"]]]
prev_page_all_same_order_key = True
else:
# Start from the last order key seen, but skip the last
def __init__(self, blocks):
self.blocks = blocks
self.requests = []
- def get(self, locator, num_retries=0):
+ def get(self, locator, num_retries=0, prefetch=False):
self.requests.append(locator)
return self.blocks.get(locator)
def get_from_cache(self, locator):
def __init__(self, blocks, nocache):
self.blocks = blocks
self.nocache = nocache
+ self.num_get_threads = 1
def block_prefetch(self, loc):
pass
def __init__(self, content, num_retries=0):
self.content = content
- def get(self, locator, num_retries=0):
+ def get(self, locator, num_retries=0, prefetch=False):
return self.content[locator]
def test_stream_reader(self):
def test_onepage_desc(self):
ks = KeysetTestHelper([[
- {"limit": 1000, "count": "none", "order": ["created_at desc", "uuid asc"], "filters": []},
+ {"limit": 1000, "count": "none", "order": ["created_at desc", "uuid desc"], "filters": []},
{"items": [{"created_at": "2", "uuid": "2"}, {"created_at": "1", "uuid": "1"}]}
], [
- {"limit": 1000, "count": "none", "order": ["created_at desc", "uuid asc"], "filters": [["created_at", "<=", "1"], ["uuid", "!=", "1"]]},
+ {"limit": 1000, "count": "none", "order": ["created_at desc", "uuid desc"], "filters": [["created_at", "<=", "1"], ["uuid", "!=", "1"]]},
{"items": []}
]])
end
def self.default_orders
- ["#{table_name}.modified_at desc", "#{table_name}.uuid"]
+ ["#{table_name}.modified_at desc", "#{table_name}.uuid desc"]
end
def self.unique_columns
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+class FixCreatedAtIndexes < ActiveRecord::Migration[5.2]
+ @@idxtables = [:collections, :container_requests, :groups, :links, :repositories, :users, :virtual_machines, :workflows, :logs]
+
+ def up
+ @@idxtables.each do |table|
+ ActiveRecord::Base.connection.execute("DROP INDEX IF EXISTS index_#{table.to_s}_on_created_at")
+ ActiveRecord::Base.connection.execute("DROP INDEX IF EXISTS index_#{table.to_s}_on_created_at_uuid")
+ ActiveRecord::Base.connection.execute("DROP INDEX IF EXISTS index_#{table.to_s}_on_created_at_and_uuid")
+ ActiveRecord::Base.connection.execute("DROP INDEX IF EXISTS index_#{table.to_s}_on_modified_at")
+ ActiveRecord::Base.connection.execute("DROP INDEX IF EXISTS index_#{table.to_s}_on_modified_at_uuid")
+ ActiveRecord::Base.connection.execute("DROP INDEX IF EXISTS index_#{table.to_s}_on_modified_at_and_uuid")
+
+ ActiveRecord::Base.connection.execute("CREATE INDEX IF NOT EXISTS index_#{table.to_s}_on_created_at_and_uuid ON #{table.to_s} USING btree (created_at, uuid)")
+ ActiveRecord::Base.connection.execute("CREATE INDEX IF NOT EXISTS index_#{table.to_s}_on_modified_at_and_uuid ON #{table.to_s} USING btree (modified_at, uuid)")
+ end
+ end
+
+ def down
+ @@idxtables.each do |table|
+ ActiveRecord::Base.connection.execute("DROP INDEX IF EXISTS index_#{table.to_s}_on_modified_at_and_uuid")
+ ActiveRecord::Base.connection.execute("CREATE INDEX IF NOT EXISTS index_#{table.to_s}_on_modified_at_uuid ON #{table.to_s} USING btree (modified_at desc, uuid asc)")
+ end
+ end
+end
--
--- Name: index_collections_on_created_at; Type: INDEX; Schema: public; Owner: -
+-- Name: index_collections_on_created_at_and_uuid; Type: INDEX; Schema: public; Owner: -
--
-CREATE INDEX index_collections_on_created_at ON public.collections USING btree (created_at);
+CREATE INDEX index_collections_on_created_at_and_uuid ON public.collections USING btree (created_at, uuid);
--
--
--- Name: index_collections_on_modified_at; Type: INDEX; Schema: public; Owner: -
+-- Name: index_collections_on_modified_at_and_uuid; Type: INDEX; Schema: public; Owner: -
--
-CREATE INDEX index_collections_on_modified_at ON public.collections USING btree (modified_at);
-
-
---
--- Name: index_collections_on_modified_at_uuid; Type: INDEX; Schema: public; Owner: -
---
-
-CREATE INDEX index_collections_on_modified_at_uuid ON public.collections USING btree (modified_at DESC, uuid);
+CREATE INDEX index_collections_on_modified_at_and_uuid ON public.collections USING btree (modified_at, uuid);
--
--
--- Name: index_container_requests_on_modified_at_uuid; Type: INDEX; Schema: public; Owner: -
+-- Name: index_container_requests_on_created_at_and_uuid; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX index_container_requests_on_created_at_and_uuid ON public.container_requests USING btree (created_at, uuid);
+
+
+--
+-- Name: index_container_requests_on_modified_at_and_uuid; Type: INDEX; Schema: public; Owner: -
--
-CREATE INDEX index_container_requests_on_modified_at_uuid ON public.container_requests USING btree (modified_at DESC, uuid);
+CREATE INDEX index_container_requests_on_modified_at_and_uuid ON public.container_requests USING btree (modified_at, uuid);
--
--
--- Name: index_groups_on_created_at; Type: INDEX; Schema: public; Owner: -
+-- Name: index_groups_on_created_at_and_uuid; Type: INDEX; Schema: public; Owner: -
--
-CREATE INDEX index_groups_on_created_at ON public.groups USING btree (created_at);
+CREATE INDEX index_groups_on_created_at_and_uuid ON public.groups USING btree (created_at, uuid);
--
--
--- Name: index_groups_on_modified_at; Type: INDEX; Schema: public; Owner: -
+-- Name: index_groups_on_modified_at_and_uuid; Type: INDEX; Schema: public; Owner: -
--
-CREATE INDEX index_groups_on_modified_at ON public.groups USING btree (modified_at);
-
-
---
--- Name: index_groups_on_modified_at_uuid; Type: INDEX; Schema: public; Owner: -
---
-
-CREATE INDEX index_groups_on_modified_at_uuid ON public.groups USING btree (modified_at DESC, uuid);
+CREATE INDEX index_groups_on_modified_at_and_uuid ON public.groups USING btree (modified_at, uuid);
--
--
--- Name: index_links_on_created_at; Type: INDEX; Schema: public; Owner: -
+-- Name: index_links_on_created_at_and_uuid; Type: INDEX; Schema: public; Owner: -
--
-CREATE INDEX index_links_on_created_at ON public.links USING btree (created_at);
+CREATE INDEX index_links_on_created_at_and_uuid ON public.links USING btree (created_at, uuid);
--
--
--- Name: index_links_on_modified_at; Type: INDEX; Schema: public; Owner: -
+-- Name: index_links_on_modified_at_and_uuid; Type: INDEX; Schema: public; Owner: -
--
-CREATE INDEX index_links_on_modified_at ON public.links USING btree (modified_at);
-
-
---
--- Name: index_links_on_modified_at_uuid; Type: INDEX; Schema: public; Owner: -
---
-
-CREATE INDEX index_links_on_modified_at_uuid ON public.links USING btree (modified_at DESC, uuid);
+CREATE INDEX index_links_on_modified_at_and_uuid ON public.links USING btree (modified_at, uuid);
--
--
--- Name: index_logs_on_created_at; Type: INDEX; Schema: public; Owner: -
+-- Name: index_logs_on_created_at_and_uuid; Type: INDEX; Schema: public; Owner: -
--
-CREATE INDEX index_logs_on_created_at ON public.logs USING btree (created_at);
+CREATE INDEX index_logs_on_created_at_and_uuid ON public.logs USING btree (created_at, uuid);
--
--
--- Name: index_logs_on_modified_at; Type: INDEX; Schema: public; Owner: -
+-- Name: index_logs_on_modified_at_and_uuid; Type: INDEX; Schema: public; Owner: -
--
-CREATE INDEX index_logs_on_modified_at ON public.logs USING btree (modified_at);
-
-
---
--- Name: index_logs_on_modified_at_uuid; Type: INDEX; Schema: public; Owner: -
---
-
-CREATE INDEX index_logs_on_modified_at_uuid ON public.logs USING btree (modified_at DESC, uuid);
+CREATE INDEX index_logs_on_modified_at_and_uuid ON public.logs USING btree (modified_at, uuid);
--
--
--- Name: index_repositories_on_modified_at_uuid; Type: INDEX; Schema: public; Owner: -
+-- Name: index_repositories_on_created_at_and_uuid; Type: INDEX; Schema: public; Owner: -
--
-CREATE INDEX index_repositories_on_modified_at_uuid ON public.repositories USING btree (modified_at DESC, uuid);
+CREATE INDEX index_repositories_on_created_at_and_uuid ON public.repositories USING btree (created_at, uuid);
+
+
+--
+-- Name: index_repositories_on_modified_at_and_uuid; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX index_repositories_on_modified_at_and_uuid ON public.repositories USING btree (modified_at, uuid);
--
--
--- Name: index_users_on_created_at; Type: INDEX; Schema: public; Owner: -
+-- Name: index_users_on_created_at_and_uuid; Type: INDEX; Schema: public; Owner: -
--
-CREATE INDEX index_users_on_created_at ON public.users USING btree (created_at);
+CREATE INDEX index_users_on_created_at_and_uuid ON public.users USING btree (created_at, uuid);
--
--
--- Name: index_users_on_modified_at; Type: INDEX; Schema: public; Owner: -
---
-
-CREATE INDEX index_users_on_modified_at ON public.users USING btree (modified_at);
-
-
---
--- Name: index_users_on_modified_at_uuid; Type: INDEX; Schema: public; Owner: -
+-- Name: index_users_on_modified_at_and_uuid; Type: INDEX; Schema: public; Owner: -
--
-CREATE INDEX index_users_on_modified_at_uuid ON public.users USING btree (modified_at DESC, uuid);
+CREATE INDEX index_users_on_modified_at_and_uuid ON public.users USING btree (modified_at, uuid);
--
CREATE UNIQUE INDEX index_users_on_uuid ON public.users USING btree (uuid);
+--
+-- Name: index_virtual_machines_on_created_at_and_uuid; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX index_virtual_machines_on_created_at_and_uuid ON public.virtual_machines USING btree (created_at, uuid);
+
+
--
-- Name: index_virtual_machines_on_hostname; Type: INDEX; Schema: public; Owner: -
--
--
--- Name: index_virtual_machines_on_modified_at_uuid; Type: INDEX; Schema: public; Owner: -
+-- Name: index_virtual_machines_on_modified_at_and_uuid; Type: INDEX; Schema: public; Owner: -
--
-CREATE INDEX index_virtual_machines_on_modified_at_uuid ON public.virtual_machines USING btree (modified_at DESC, uuid);
+CREATE INDEX index_virtual_machines_on_modified_at_and_uuid ON public.virtual_machines USING btree (modified_at, uuid);
--
--
--- Name: index_workflows_on_modified_at_uuid; Type: INDEX; Schema: public; Owner: -
+-- Name: index_workflows_on_created_at_and_uuid; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX index_workflows_on_created_at_and_uuid ON public.workflows USING btree (created_at, uuid);
+
+
+--
+-- Name: index_workflows_on_modified_at_and_uuid; Type: INDEX; Schema: public; Owner: -
--
-CREATE INDEX index_workflows_on_modified_at_uuid ON public.workflows USING btree (modified_at DESC, uuid);
+CREATE INDEX index_workflows_on_modified_at_and_uuid ON public.workflows USING btree (modified_at, uuid);
--
('20211027154300'),
('20220224203102'),
('20220301155729'),
-('20220303204419');
+('20220303204419'),
+('20220401153101');
cancelled_at: ~
cancelled_by_user_uuid: ~
cancelled_by_client_uuid: ~
- created_at: <%= 3.minute.ago.to_s(:db) %>
- started_at: <%= 3.minute.ago.to_s(:db) %>
+ created_at: <%= 2.7.minute.ago.to_s(:db) %>
+ started_at: <%= 2.7.minute.ago.to_s(:db) %>
finished_at: ~
script: hash
repository: active/foo
state: Ready
uuid: zzzzz-d1hrv-1yfj6xkidf2muk3
owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
- created_at: <%= 3.1.minute.ago.to_s(:db) %>
+ created_at: <%= 2.9.minute.ago.to_s(:db) %>
components:
foo:
script: foo
# Helps test that clients cope with funny-shaped components.
# For an example, see #3321.
uuid: zzzzz-d1hrv-1yfj61234abcdk4
- created_at: <%= 2.minute.ago.to_s(:db) %>
+ created_at: <%= 4.minute.ago.to_s(:db) %>
owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
state: Ready
uuid: zzzzz-d1hrv-1yfj61234abcdk3
owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
- created_at: <%= 3.1.minute.ago.to_s(:db) %>
+ created_at: <%= 3.2.minute.ago.to_s(:db) %>
components:
part-one:
script_parameters:
uuid: zzzzz-d1hrv-1yfj6dcba4321k3
pipeline_template_uuid: zzzzz-p5p6p-aox0k0ofxrystgw
owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
- created_at: <%= 3.1.minute.ago.to_s(:db) %>
+ created_at: <%= 3.3.minute.ago.to_s(:db) %>
components:
part-one:
script_parameters:
uuid: zzzzz-d1hrv-ri9dvgkgqs9y09j
owner_uuid: zzzzz-tpzed-0fusedrivertest
pipeline_template_uuid: zzzzz-p5p6p-vq4wuvy84xvaq2r
- created_at: 2014-09-15 12:00:00
+ created_at: 2014-09-16 12:00:00
name: "pipeline instance owned by FUSE"
components:
foo:
uuid: zzzzz-d1hrv-scarxiyajtshq3l
owner_uuid: zzzzz-j7d0g-0000ownedbyfuse
pipeline_template_uuid: zzzzz-p5p6p-vq4wuvy84xvaq2r
- created_at: 2014-09-15 12:00:00
+ created_at: 2014-09-17 12:00:00
name: "pipeline instance in FUSE project"
components:
foo:
state: Complete
uuid: zzzzz-d1hrv-ju5ghi0i9z2kqc6
owner_uuid: zzzzz-j7d0g-v955i6s2oi1cbso
- created_at: 2014-09-15 12:00:00
+ created_at: 2014-09-18 12:00:00
components:
foo:
script: foo
state: Complete
uuid: zzzzz-d1hrv-lihrbd0i9z2kqc6
owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
- created_at: 2014-09-15 12:00:00
+ created_at: 2014-09-19 12:00:00
components:
foo:
script: foo
name: Pipeline in public project with other objects elsewhere
pipeline_template_uuid: zzzzz-p5p6p-aox0k0ofxrystgw
state: Complete
- created_at: 2014-09-15 12:00:00
+ created_at: 2014-09-20 12:00:00
components:
foo:
script: foo
name: Pipeline in New state in publicly accessible project
pipeline_template_uuid: zzzzz-p5p6p-tmpltpublicproj
state: New
- created_at: 2014-09-15 12:00:00
+ created_at: 2014-09-21 12:00:00
components:
foo:
script: foo
name: Pipeline in New state in public project with objects elsewhere
pipeline_template_uuid: zzzzz-p5p6p-aox0k0ofxrystgw
state: New
- created_at: 2014-09-15 12:00:00
+ created_at: 2014-09-22 12:00:00
components:
foo:
script: foo
name: Pipeline in public project in New state with file type data class with objects elsewhere
pipeline_template_uuid: zzzzz-p5p6p-aox0k0ofxrystgw
state: New
- created_at: 2014-09-15 12:00:00
+ created_at: 2014-09-23 12:00:00
components:
foo:
script: foo
name: running_with_job
uuid: zzzzz-d1hrv-runningpipeline
owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
- created_at: <%= 3.1.minute.ago.to_s(:db) %>
- started_at: <%= 3.1.minute.ago.to_s(:db) %>
+ created_at: <%= 2.8.minute.ago.to_s(:db) %>
+ started_at: <%= 2.8.minute.ago.to_s(:db) %>
state: RunningOnServer
components:
foo:
uuid: zzzzz-d1hrv-twodonepipeline
owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
state: Complete
- created_at: <%= 3.minute.ago.to_s(:db) %>
+ created_at: <%= 2.5.minute.ago.to_s(:db) %>
started_at: <%= 2.minute.ago.to_s(:db) %>
finished_at: <%= 1.minute.ago.to_s(:db) %>
components:
controller: 'logs',
}
assert_response :success
- assert_equal('logs.event_type asc, logs.modified_at desc, logs.uuid',
+ assert_equal('logs.event_type asc, logs.modified_at desc, logs.uuid desc',
assigns(:objects).order_values.join(', '))
end
controller: 'logs',
}
assert_response :success
- assert_equal('logs.modified_at asc, logs.uuid',
+ assert_equal('logs.modified_at asc, logs.uuid desc',
assigns(:objects).order_values.join(', '))
end
controller: 'logs',
}
assert_response :success
- assert_equal('logs.modified_at asc, logs.event_type desc, logs.uuid',
+ assert_equal('logs.modified_at asc, logs.event_type desc, logs.uuid desc',
assigns(:objects).order_values.join(', '))
end
headers: auth(:admin)
assert_response :success
uuids = json_response['items'].collect { |i| i['uuid'] }
- assert_equal uuids, uuids.sort
+ assert_equal uuids, uuids.sort.reverse
end
def assert_link_classes_ascend(current_class, prev_class)
self.collection.update()
new_collection_record = self.collection.api_response()
else:
+ # If there's too many prefetch threads and you
+ # max out the CPU, delivering data to the FUSE
+ # layer actually ends up being slower.
+ # Experimentally, capping 7 threads seems to
+ # be a sweet spot.
+ get_threads = min(max((self.api.keep.block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7)
# Create a new collection object
if uuid_pattern.match(self.collection_locator):
coll_reader = arvados.collection.Collection(
self.collection_locator, self.api, self.api.keep,
- num_retries=self.num_retries)
+ num_retries=self.num_retries,
+ get_threads=get_threads)
else:
coll_reader = arvados.collection.CollectionReader(
self.collection_locator, self.api, self.api.keep,
- num_retries=self.num_retries)
+ num_retries=self.num_retries,
+ get_threads=get_threads)
new_collection_record = coll_reader.api_response() or {}
# If the Collection only exists in Keep, there will be no API
# response. Fill in the fields we need.
class MagicDirApiError(FuseMagicTest):
def setUp(self):
api = mock.MagicMock()
+ api.keep.block_cache = mock.MagicMock(cache_max=1)
super(MagicDirApiError, self).setUp(api=api)
api.collections().get().execute.side_effect = iter([
Exception('API fail'),
WithField("collection_file_path", filepath)
props["collection_uuid"] = collection.UUID
props["collection_file_path"] = filepath
+ // h.determineCollection populates the collection_uuid prop with the PDH, if
+ // this collection is being accessed via PDH. In that case, blank the
+ // collection_uuid field so that consumers of the log entries can rely on it
+ // being a UUID, or blank. The PDH remains available via the
+ // portable_data_hash property.
+ if props["collection_uuid"] == collection.PortableDataHash {
+ props["collection_uuid"] = ""
+ }
}
if r.Method == "PUT" || r.Method == "POST" {
log.Info("File upload")
import arvados.util
import datetime
import ciso8601
+import csv
def parse_arguments(arguments):
arg_parser = argparse.ArgumentParser()
- arg_parser.add_argument('--days', type=int, required=True)
+ arg_parser.add_argument('--start', help='Start date for the report in YYYY-MM-DD format (UTC)')
+ arg_parser.add_argument('--end', help='End date for the report in YYYY-MM-DD format (UTC)')
+ arg_parser.add_argument('--days', type=int, help='Number of days before now() to start the report')
+ arg_parser.add_argument('--csv', action='store_true', help='Output in csv format (default: false)')
args = arg_parser.parse_args(arguments)
- return args
+
+ if args.days and (args.start or args.end):
+ arg_parser.print_help()
+ print("Error: either specify --days or both --start and --end")
+ exit(1)
+
+ if not args.days and (not args.start or not args.end):
+ arg_parser.print_help()
+ print("\nError: either specify --days or both --start and --end")
+ exit(1)
+
+ if (args.start and not args.end) or (args.end and not args.start):
+ arg_parser.print_help()
+ print("\nError: no start or end date found, either specify --days or both --start and --end")
+ exit(1)
+
+ if args.days:
+ to = datetime.datetime.utcnow()
+ since = to - datetime.timedelta(days=args.days)
+
+ if args.start:
+ try:
+ since = datetime.datetime.strptime(args.start,"%Y-%m-%d")
+ except:
+ arg_parser.print_help()
+ print("\nError: start date must be in YYYY-MM-DD format")
+ exit(1)
+
+ if args.end:
+ try:
+ to = datetime.datetime.strptime(args.end,"%Y-%m-%d")
+ except:
+ arg_parser.print_help()
+ print("\nError: end date must be in YYYY-MM-DD format")
+ exit(1)
+
+ return args, since, to
def getowner(arv, uuid, owners):
if uuid is None:
return getowner(arv, owners[uuid], owners)
def getuserinfo(arv, uuid):
- u = arv.users().get(uuid=uuid).execute()
+ try:
+ u = arv.users().get(uuid=uuid).execute()
+ except:
+ return "deleted user (%susers/%s)" % (arv.config()["Services"]["Workbench1"]["ExternalURL"],
+ uuid)
prof = "\n".join(" %s: \"%s\"" % (k, v) for k, v in u["prefs"].get("profile", {}).items() if v)
if prof:
prof = "\n"+prof+"\n"
return "%s %s <%s> (%susers/%s)%s" % (u["first_name"], u["last_name"], u["email"],
arv.config()["Services"]["Workbench1"]["ExternalURL"],
uuid, prof)
+def getuserinfocsv(arv, uuid):
+ try:
+ u = arv.users().get(uuid=uuid).execute()
+ except:
+ return [uuid,"deleted","user",""]
+ return [uuid, u["first_name"], u["last_name"], u["email"]]
+
collectionNameCache = {}
-def getCollectionName(arv, uuid):
- if uuid not in collectionNameCache:
- u = arv.collections().get(uuid=uuid).execute()
- collectionNameCache[uuid] = u["name"]
- return collectionNameCache[uuid]
+def getCollectionName(arv, uuid, pdh):
+ lookupField = uuid
+ filters = [["uuid","=",uuid]]
+ cached = uuid in collectionNameCache
+ # look up by uuid if it is available, fall back to look up by pdh
+ if len(uuid) != 27:
+ # Look up by pdh. Note that this can be misleading; the download could
+ # have happened from a collection with the same pdh but different name.
+ # We arbitrarily pick the oldest collection with the pdh to lookup the
+ # name, if the uuid for the request is not known.
+ lookupField = pdh
+ filters = [["portable_data_hash","=",pdh]]
+ cached = pdh in collectionNameCache
+
+ if not cached:
+ u = arv.collections().list(filters=filters,order="created_at",limit=1).execute().get("items")
+ if len(u) < 1:
+ return "(deleted)"
+ collectionNameCache[lookupField] = u[0]["name"]
+ return collectionNameCache[lookupField]
def getname(u):
return "\"%s\" (%s)" % (u["name"], u["uuid"])
if arguments is None:
arguments = sys.argv[1:]
- args = parse_arguments(arguments)
+ args, since, to = parse_arguments(arguments)
arv = arvados.api()
- since = datetime.datetime.utcnow() - datetime.timedelta(days=args.days)
+ prefix = ''
+ suffix = "\n"
+ if args.csv:
+ prefix = '# '
+ suffix = ''
+ print("%sUser activity on %s between %s and %s%s" % (prefix, arv.config()["ClusterID"],
+ since.isoformat(sep=" ", timespec="minutes"),
+ to.isoformat(sep=" ", timespec="minutes"), suffix))
- print("User activity on %s between %s and %s\n" % (arv.config()["ClusterID"],
- (datetime.datetime.now() - datetime.timedelta(days=args.days)).isoformat(sep=" ", timespec="minutes"),
- datetime.datetime.now().isoformat(sep=" ", timespec="minutes")))
-
- events = arvados.util.keyset_list_all(arv.logs().list, filters=[["created_at", ">=", since.isoformat()]])
+ events = arvados.util.keyset_list_all(arv.logs().list, filters=[["created_at", ">=", since.isoformat()],["created_at", "<", to.isoformat()]])
users = {}
owners = {}
owner = getowner(arv, e["object_owner_uuid"], owners)
users.setdefault(owner, [])
event_at = ciso8601.parse_datetime(e["event_at"]).astimezone().isoformat(sep=" ", timespec="minutes")
- # loguuid = e["uuid"]
- loguuid = ""
+ loguuid = e["uuid"]
if e["event_type"] == "create" and e["object_uuid"][6:11] == "tpzed":
users.setdefault(e["object_uuid"], [])
- users[e["object_uuid"]].append("%s User account created" % event_at)
+ users[e["object_uuid"]].append([loguuid, event_at, "User account created"])
elif e["event_type"] == "update" and e["object_uuid"][6:11] == "tpzed":
pass
elif e["event_type"] == "create" and e["object_uuid"][6:11] == "xvhdp":
if e["properties"]["new_attributes"]["requesting_container_uuid"] is None:
- users[owner].append("%s Ran container %s %s" % (event_at, getname(e["properties"]["new_attributes"]), loguuid))
+ users[owner].append([loguuid, event_at, "Ran container %s" % (getname(e["properties"]["new_attributes"]))])
elif e["event_type"] == "update" and e["object_uuid"][6:11] == "xvhdp":
pass
elif e["event_type"] == "create" and e["object_uuid"][6:11] == "j7d0g":
- users[owner].append("%s Created project %s" % (event_at, getname(e["properties"]["new_attributes"])))
+ users[owner].append([loguuid, event_at,"Created project %s" % (getname(e["properties"]["new_attributes"]))])
elif e["event_type"] == "delete" and e["object_uuid"][6:11] == "j7d0g":
- users[owner].append("%s Deleted project %s" % (event_at, getname(e["properties"]["old_attributes"])))
+ users[owner].append([loguuid, event_at,"Deleted project %s" % (getname(e["properties"]["old_attributes"]))])
elif e["event_type"] == "update" and e["object_uuid"][6:11] == "j7d0g":
- users[owner].append("%s Updated project %s" % (event_at, getname(e["properties"]["new_attributes"])))
+ users[owner].append([loguuid, event_at,"Updated project %s" % (getname(e["properties"]["new_attributes"]))])
elif e["event_type"] in ("create", "update") and e["object_uuid"][6:11] == "gj3su":
since_last = None
- if len(users[owner]) > 0 and users[owner][-1].endswith("activity"):
- sp = users[owner][-1].split(" ")
- start = sp[0]+" "+sp[1]
- since_last = ciso8601.parse_datetime(event_at) - ciso8601.parse_datetime(sp[3]+" "+sp[4])
+ if len(users[owner]) > 0 and users[owner][-1][-1].endswith("activity"):
+ sp = users[owner][-1][-1].split(" ")
+ start = users[owner][-1][1]
+ since_last = ciso8601.parse_datetime(event_at) - ciso8601.parse_datetime(sp[1]+" "+sp[2])
span = ciso8601.parse_datetime(event_at) - ciso8601.parse_datetime(start)
if since_last is not None and since_last < datetime.timedelta(minutes=61):
- users[owner][-1] = "%s to %s (%02d:%02d) Account activity" % (start, event_at, span.days*24 + int(span.seconds/3600), int((span.seconds % 3600)/60))
+ users[owner][-1] = [loguuid, start,"to %s (%02d:%02d) Account activity" % (event_at, span.days*24 + int(span.seconds/3600), int((span.seconds % 3600)/60))]
else:
- users[owner].append("%s to %s (0:00) Account activity" % (event_at, event_at))
+ users[owner].append([loguuid, event_at,"to %s (0:00) Account activity" % (event_at)])
elif e["event_type"] == "create" and e["object_uuid"][6:11] == "o0j2j":
if e["properties"]["new_attributes"]["link_class"] == "tag":
- users[owner].append("%s Tagged %s" % (event_at, e["properties"]["new_attributes"]["head_uuid"]))
+ users[owner].append([event_at,"Tagged %s" % (e["properties"]["new_attributes"]["head_uuid"])])
elif e["properties"]["new_attributes"]["link_class"] == "permission":
- users[owner].append("%s Shared %s with %s" % (event_at, e["properties"]["new_attributes"]["tail_uuid"], e["properties"]["new_attributes"]["head_uuid"]))
+ users[owner].append([loguuid, event_at,"Shared %s with %s" % (e["properties"]["new_attributes"]["tail_uuid"], e["properties"]["new_attributes"]["head_uuid"])])
else:
- users[owner].append("%s %s %s %s" % (e["event_type"], e["object_kind"], e["object_uuid"], loguuid))
+ users[owner].append([loguuid, event_at,"%s %s %s" % (e["event_type"], e["object_kind"], e["object_uuid"])])
elif e["event_type"] == "delete" and e["object_uuid"][6:11] == "o0j2j":
if e["properties"]["old_attributes"]["link_class"] == "tag":
- users[owner].append("%s Untagged %s" % (event_at, e["properties"]["old_attributes"]["head_uuid"]))
+ users[owner].append([loguuid, event_at,"Untagged %s" % (e["properties"]["old_attributes"]["head_uuid"])])
elif e["properties"]["old_attributes"]["link_class"] == "permission":
- users[owner].append("%s Unshared %s with %s" % (event_at, e["properties"]["old_attributes"]["tail_uuid"], e["properties"]["old_attributes"]["head_uuid"]))
+ users[owner].append([loguuid, event_at,"Unshared %s with %s" % (e["properties"]["old_attributes"]["tail_uuid"], e["properties"]["old_attributes"]["head_uuid"])])
else:
- users[owner].append("%s %s %s %s" % (e["event_type"], e["object_kind"], e["object_uuid"], loguuid))
+ users[owner].append([loguuid, event_at,"%s %s %s" % (e["event_type"], e["object_kind"], e["object_uuid"])])
elif e["event_type"] == "create" and e["object_uuid"][6:11] == "4zz18":
if e["properties"]["new_attributes"]["properties"].get("type") in ("log", "output", "intermediate"):
pass
else:
- users[owner].append("%s Created collection %s %s" % (event_at, getname(e["properties"]["new_attributes"]), loguuid))
+ users[owner].append([loguuid, event_at,"Created collection %s" % (getname(e["properties"]["new_attributes"]))])
elif e["event_type"] == "update" and e["object_uuid"][6:11] == "4zz18":
- users[owner].append("%s Updated collection %s %s" % (event_at, getname(e["properties"]["new_attributes"]), loguuid))
+ users[owner].append([loguuid, event_at,"Updated collection %s" % (getname(e["properties"]["new_attributes"]))])
elif e["event_type"] == "delete" and e["object_uuid"][6:11] == "4zz18":
if e["properties"]["old_attributes"]["properties"].get("type") in ("log", "output", "intermediate"):
pass
else:
- users[owner].append("%s Deleted collection %s %s" % (event_at, getname(e["properties"]["old_attributes"]), loguuid))
+ users[owner].append([loguuid, event_at, "Deleted collection %s" % (getname(e["properties"]["old_attributes"]))])
elif e["event_type"] == "file_download":
- users[e["object_uuid"]].append("%s Downloaded file \"%s\" from \"%s\" (%s) (%s)" % (event_at,
+ users.setdefault(e["object_uuid"], [])
+ users[e["object_uuid"]].append([loguuid, event_at, "Downloaded file \"%s\" from \"%s\" (%s) (%s)" % (
e["properties"].get("collection_file_path") or e["properties"].get("reqPath"),
- getCollectionName(arv, e["properties"].get("collection_uuid")),
+ getCollectionName(arv, e["properties"].get("collection_uuid"), e["properties"].get("portable_data_hash")),
e["properties"].get("collection_uuid"),
- e["properties"].get("portable_data_hash")))
+ e["properties"].get("portable_data_hash"))])
+
elif e["event_type"] == "file_upload":
- users[e["object_uuid"]].append("%s Uploaded file \"%s\" to \"%s\" (%s)" % (event_at,
+ users.setdefault(e["object_uuid"], [])
+ users[e["object_uuid"]].append([loguuid, event_at, "Uploaded file \"%s\" to \"%s\" (%s)" % (
e["properties"].get("collection_file_path") or e["properties"].get("reqPath"),
- getCollectionName(arv, e["properties"].get("collection_uuid")),
- e["properties"].get("collection_uuid")))
+ getCollectionName(arv, e["properties"].get("collection_uuid"), e["properties"].get("portable_data_hash")),
+ e["properties"].get("collection_uuid"))])
else:
- users[owner].append("%s %s %s %s" % (e["event_type"], e["object_kind"], e["object_uuid"], loguuid))
+ users[owner].append([loguuid, event_at, "%s %s %s" % (e["event_type"], e["object_kind"], e["object_uuid"])])
+
+ if args.csv:
+ csvwriter = csv.writer(sys.stdout, dialect='unix')
for k,v in users.items():
if k is None or k.endswith("-tpzed-000000000000000"):
continue
- print(getuserinfo(arv, k))
- for ev in v:
- print(" %s" % ev)
- print("")
+ if not args.csv:
+ print(getuserinfo(arv, k))
+ for ev in v:
+ # Remove the log entry uuid, this report is intended for human consumption
+ ev.pop(0)
+ print(" %s" % ' '.join(ev))
+ print("")
+ else:
+ user = getuserinfocsv(arv, k)
+ for ev in v:
+ ev = user + ev
+ csvwriter.writerow(ev)
if __name__ == "__main__":
main()