Merge branch '19675-instance-types-panel' into main. Closes #19675
authorStephen Smith <stephen@curii.com>
Wed, 20 Dec 2023 23:11:44 +0000 (18:11 -0500)
committerStephen Smith <stephen@curii.com>
Wed, 20 Dec 2023 23:11:44 +0000 (18:11 -0500)
Arvados-DCO-1.1-Signed-off-by: Stephen Smith <stephen@curii.com>

38 files changed:
doc/admin/upgrading.html.textile.liquid
doc/api/keep-webdav.html.textile.liquid
doc/api/methods/groups.html.textile.liquid
go.mod
go.sum
sdk/go/arvados/fs_base.go
sdk/go/arvados/fs_collection.go
sdk/go/arvados/fs_lookup.go
sdk/go/arvados/fs_project.go
sdk/go/arvados/fs_project_test.go
sdk/go/arvados/fs_site.go
sdk/go/arvados/fs_site_test.go
sdk/go/arvadostest/fixtures.go
sdk/java-v2/src/main/java/org/arvados/client/api/client/CountingFileRequestBody.java
sdk/java-v2/src/main/java/org/arvados/client/api/client/CountingRequestBody.java [new file with mode: 0644]
sdk/java-v2/src/main/java/org/arvados/client/api/client/CountingStreamRequestBody.java [new file with mode: 0644]
sdk/java-v2/src/main/java/org/arvados/client/api/client/KeepServerApiClient.java
sdk/java-v2/src/main/java/org/arvados/client/api/client/KeepWebApiClient.java
sdk/python/arvados/__init__.py
sdk/python/arvados/events.py
sdk/python/setup.py
sdk/python/tests/test_events.py
services/keep-balance/balance_run_test.go
services/keep-balance/server.go
services/keep-web/handler_test.go
services/keep-web/server_test.go
services/workbench2/Makefile
services/workbench2/cypress/integration/process.spec.js
services/workbench2/package.json
services/workbench2/src/store/process-logs-panel/process-logs-panel-actions.ts
services/workbench2/yarn.lock
services/ws/session_v0.go
tools/arvbox/bin/arvbox
tools/arvbox/lib/arvbox/docker/common.sh
tools/arvbox/lib/arvbox/docker/go-setup.sh
tools/arvbox/lib/arvbox/docker/service/nginx/run
tools/arvbox/lib/arvbox/docker/service/sdk/run-service
tools/arvbox/lib/arvbox/docker/service/vm/run-service

index 48f8659dc755f194705083e94d42f8fd09c77b19..81b32c1444f2f57f8a1de4102fb110225e4cfd79 100644 (file)
@@ -30,6 +30,10 @@ TODO: extract this information based on git commit messages and generate changel
 
 h2(#main). development main
 
+"previous: Upgrading to 2.7.1":#v2_7_1
+
+h2(#2_7_1). v2.7.1 (2023-12-12)
+
 "previous: Upgrading to 2.7.0":#v2_7_0
 
 h3. Remove Workbench1 packages after upgrading the salt installer
index f068a49c2c032e92803135637ba0446579655993..e95d523b9dec8e4ac4f88a743c968e7250260a4c 100644 (file)
@@ -35,6 +35,12 @@ The @users@ folder will return a listing of the users for whom the client has pe
 
 In addition to the @/by_id/@ path prefix, the collection or project can be specified using a path prefix of @/c=<uuid or pdh>/@ or (if the cluster is properly configured) as a virtual host.  This is described on "Keep-web URLs":keep-web-urls.html
 
+It is possible for a project or a "filter group":methods/groups.html#filter to appear as its own descendant in the @by_id@ and @users@ tree (a filter group may match itself, its own ancestor, another filter group that matches its ancestor, etc). When this happens, the descendant appears as an empty read-only directory. For example, if filter group @f@ matches its own parent @p@:
+* @/users/example/p/f@ will show the filter group's contents (matched projects and collections).
+* @/users/example/p/f/p@ will appear as an empty directory.
+* @/by_id/uuid_of_f/p@ will show the parent project's contents, including @f@.
+* @/by_id/uuid_of_f/p/f@ will appear as an empty directory.
+
 h3(#auth). Authentication mechanisms
 
 A token can be provided in an Authorization header as a @Bearer@ token:
index 826c7fe43cc819c9af1d07c2ddab6eac52f06492..05d3fb1c7b9c4f83da5d5b4af0221be197375c1b 100644 (file)
@@ -46,7 +46,7 @@ The @frozen_by_uuid@ attribute can be cleared by an admin user. It can also be c
 
 The optional @API.FreezeProjectRequiresDescription@ and @API.FreezeProjectRequiresProperties@ configuration settings can be used to prevent users from freezing projects that have empty @description@ and/or specified @properties@ entries.
 
-h3. Filter groups
+h3(#filter). Filter groups
 
 @filter@ groups are virtual groups; they can not own other objects. Filter groups have a special @properties@ field named @filters@, which must be an array of filter conditions. See "list method filters":{{site.baseurl}}/api/methods.html#filters for details on the syntax of valid filters, but keep in mind that the attributes must include the object type (@collections@, @container_requests@, @groups@, @workflows@), separated with a dot from the field to be filtered on.
 
diff --git a/go.mod b/go.mod
index 218e2ddde8b1251507856740ad531fe5fc498fe3..0011d7970f0c47811cc47b3474feea3ad9c6048b 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -37,7 +37,7 @@ require (
        github.com/prometheus/client_model v0.3.0
        github.com/prometheus/common v0.39.0
        github.com/sirupsen/logrus v1.8.1
-       golang.org/x/crypto v0.16.0
+       golang.org/x/crypto v0.17.0
        golang.org/x/net v0.19.0
        golang.org/x/oauth2 v0.11.0
        golang.org/x/sys v0.15.0
diff --git a/go.sum b/go.sum
index 31ddd88621be84031ed69b5c00c62aabdeeb9e1e..fb2fe5e3f04d56129fd32a7a7fc5240ca2bb22d0 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -287,8 +287,8 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
 golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
 golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
 golang.org/x/crypto v0.0.0-20220314234659-1baeb1ce4c0b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
-golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY=
-golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
+golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
+golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
 golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
 golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
 golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
index 274d20702287ed464d4ea8f3796e528c3f61b30b..430a0d4c9be4f69e4f86adc3cb93d2549c8fc930 100644 (file)
@@ -13,6 +13,7 @@ import (
        "net/http"
        "os"
        "path"
+       "path/filepath"
        "strings"
        "sync"
        "time"
@@ -387,17 +388,28 @@ func (n *treenode) Size() int64 {
 }
 
 func (n *treenode) FileInfo() os.FileInfo {
-       n.Lock()
-       defer n.Unlock()
-       n.fileinfo.size = int64(len(n.inodes))
-       return n.fileinfo
+       n.RLock()
+       defer n.RUnlock()
+       fi := n.fileinfo
+       fi.size = int64(len(n.inodes))
+       return fi
 }
 
 func (n *treenode) Readdir() (fi []os.FileInfo, err error) {
+       // We need RLock to safely read n.inodes, but we must release
+       // it before calling FileInfo() on the child nodes. Otherwise,
+       // we risk deadlock when filter groups A and B match each
+       // other, concurrent Readdir() calls try to RLock them in
+       // opposite orders, and one cannot be RLocked a second time
+       // because a third caller is waiting for a write lock.
        n.RLock()
-       defer n.RUnlock()
-       fi = make([]os.FileInfo, 0, len(n.inodes))
+       inodes := make([]inode, 0, len(n.inodes))
        for _, inode := range n.inodes {
+               inodes = append(inodes, inode)
+       }
+       n.RUnlock()
+       fi = make([]os.FileInfo, 0, len(inodes))
+       for _, inode := range inodes {
                fi = append(fi, inode.FileInfo())
        }
        return
@@ -468,7 +480,8 @@ func (fs *fileSystem) openFile(name string, flag int, perm os.FileMode) (*fileha
                return nil, ErrSyncNotSupported
        }
        dirname, name := path.Split(name)
-       parent, err := rlookup(fs.root, dirname)
+       ancestors := map[inode]bool{}
+       parent, err := rlookup(fs.root, dirname, ancestors)
        if err != nil {
                return nil, err
        }
@@ -533,6 +546,24 @@ func (fs *fileSystem) openFile(name string, flag int, perm os.FileMode) (*fileha
                        return nil, err
                }
        }
+       // If n and one of its parents/ancestors are [hardlinks to]
+       // the same node (e.g., a filter group that matches itself),
+       // open an "empty directory" node instead, so the inner
+       // hardlink appears empty. This is needed to ensure
+       // Open("a/b/c/x/x").Readdir() appears empty, matching the
+       // behavior of rlookup("a/b/c/x/x/z") => ErrNotExist.
+       if hl, ok := n.(*hardlink); (ok && ancestors[hl.inode]) || ancestors[n] {
+               n = &treenode{
+                       fs:     n.FS(),
+                       parent: parent,
+                       inodes: nil,
+                       fileinfo: fileinfo{
+                               name:    name,
+                               modTime: time.Now(),
+                               mode:    0555 | os.ModeDir,
+                       },
+               }
+       }
        return &filehandle{
                inode:    n,
                append:   flag&os.O_APPEND != 0,
@@ -551,7 +582,7 @@ func (fs *fileSystem) Create(name string) (File, error) {
 
 func (fs *fileSystem) Mkdir(name string, perm os.FileMode) error {
        dirname, name := path.Split(name)
-       n, err := rlookup(fs.root, dirname)
+       n, err := rlookup(fs.root, dirname, nil)
        if err != nil {
                return err
        }
@@ -575,7 +606,7 @@ func (fs *fileSystem) Mkdir(name string, perm os.FileMode) error {
 }
 
 func (fs *fileSystem) Stat(name string) (os.FileInfo, error) {
-       node, err := rlookup(fs.root, name)
+       node, err := rlookup(fs.root, name, nil)
        if err != nil {
                return nil, err
        }
@@ -704,7 +735,7 @@ func (fs *fileSystem) remove(name string, recursive bool) error {
        if name == "" || name == "." || name == ".." {
                return ErrInvalidArgument
        }
-       dir, err := rlookup(fs.root, dirname)
+       dir, err := rlookup(fs.root, dirname, nil)
        if err != nil {
                return err
        }
@@ -741,9 +772,31 @@ func (fs *fileSystem) MemorySize() int64 {
 // rlookup (recursive lookup) returns the inode for the file/directory
 // with the given name (which may contain "/" separators). If no such
 // file/directory exists, the returned node is nil.
-func rlookup(start inode, path string) (node inode, err error) {
+//
+// The visited map should be either nil or empty. If non-nil, all
+// nodes and hardlink targets visited by the given path will be added
+// to it.
+//
+// If a cycle is detected, the second occurrence of the offending node
+// will be replaced by an empty directory. For example, if "x" is a
+// filter group that matches itself, then rlookup("a/b/c/x") will
+// return the filter group, and rlookup("a/b/c/x/x") will return an
+// empty directory.
+func rlookup(start inode, path string, visited map[inode]bool) (node inode, err error) {
+       if visited == nil {
+               visited = map[inode]bool{}
+       }
        node = start
+       // Clean up ./ and ../ and double-slashes, but (unlike
+       // filepath.Clean) retain a trailing slash, because looking up
+       // ".../regularfile/" should fail.
+       trailingSlash := strings.HasSuffix(path, "/")
+       path = filepath.Clean(path)
+       if trailingSlash && path != "/" {
+               path += "/"
+       }
        for _, name := range strings.Split(path, "/") {
+               visited[node] = true
                if node.IsDir() {
                        if name == "." || name == "" {
                                continue
@@ -761,6 +814,24 @@ func rlookup(start inode, path string) (node inode, err error) {
                if node == nil || err != nil {
                        break
                }
+               checknode := node
+               if hardlinked, ok := checknode.(*hardlink); ok {
+                       checknode = hardlinked.inode
+               }
+               if visited[checknode] {
+                       node = &treenode{
+                               fs:     node.FS(),
+                               parent: node.Parent(),
+                               inodes: nil,
+                               fileinfo: fileinfo{
+                                       name:    name,
+                                       modTime: time.Now(),
+                                       mode:    0555 | os.ModeDir,
+                               },
+                       }
+               } else {
+                       visited[checknode] = true
+               }
        }
        if node == nil && err == nil {
                err = os.ErrNotExist
index 84ff69d6bd0ad16b053fd9ab6409fc04ee5c3024..052cc1aa37581aca2351200c5444304ce912571c 100644 (file)
@@ -457,7 +457,7 @@ func (fs *collectionFileSystem) Sync() error {
 }
 
 func (fs *collectionFileSystem) Flush(path string, shortBlocks bool) error {
-       node, err := rlookup(fs.fileSystem.root, path)
+       node, err := rlookup(fs.fileSystem.root, path, nil)
        if err != nil {
                return err
        }
index 2bb09995e16e476b3889abbdc1c61b6fc1abbd15..7f2244931877aff551e5f1c0687d81d4cc8c49ed 100644 (file)
@@ -48,7 +48,19 @@ func (ln *lookupnode) Readdir() ([]os.FileInfo, error) {
                        return nil, err
                }
                for _, child := range all {
-                       _, err = ln.treenode.Child(child.FileInfo().Name(), func(inode) (inode, error) {
+                       var name string
+                       if hl, ok := child.(*hardlink); ok && hl.inode == ln {
+                               // If child is a hardlink to its
+                               // parent, FileInfo()->RLock() will
+                               // deadlock, because we already have
+                               // the write lock. In this situation
+                               // we can safely access the hardlink's
+                               // name directly.
+                               name = hl.name
+                       } else {
+                               name = child.FileInfo().Name()
+                       }
+                       _, err = ln.treenode.Child(name, func(inode) (inode, error) {
                                return child, nil
                        })
                        if err != nil {
index a68e83945e348a0f3e740b7c0a313155917b795a..df1d06e753b35191f344499fe17c9953dc849daa 100644 (file)
@@ -35,10 +35,11 @@ func (fs *customFileSystem) projectsLoadOne(parent inode, uuid, name string) (in
                contents = CollectionList{}
                err = fs.RequestAndDecode(&contents, "GET", "arvados/v1/groups/"+uuid+"/contents", nil, ResourceListParams{
                        Count: "none",
+                       Order: "uuid",
                        Filters: []Filter{
                                {"name", "=", strings.Replace(name, subst, "/", -1)},
                                {"uuid", "is_a", []string{"arvados#collection", "arvados#group"}},
-                               {"groups.group_class", "=", "project"},
+                               {"groups.group_class", "in", []string{"project", "filter"}},
                        },
                        Select: []string{"uuid", "name", "modified_at", "properties"},
                })
@@ -104,7 +105,7 @@ func (fs *customFileSystem) projectsLoadAll(parent inode, uuid string) ([]inode,
                        {"uuid", "is_a", class},
                }
                if class == "arvados#group" {
-                       filters = append(filters, Filter{"group_class", "=", "project"})
+                       filters = append(filters, Filter{"groups.group_class", "in", []string{"project", "filter"}})
                }
 
                params := ResourceListParams{
index d3dac7a14f7424539c29c89811ee900eb47f603d..5c2eb33d121b553de5876b11650117a7968fa296 100644 (file)
@@ -42,61 +42,94 @@ func (sc *spyingClient) RequestAndDecode(dst interface{}, method, path string, b
 func (s *SiteFSSuite) TestFilterGroup(c *check.C) {
        // Make sure that a collection and group that match the filter are present,
        // and that a group that does not match the filter is not present.
-       s.fs.MountProject("fg", fixtureThisFilterGroupUUID)
 
-       _, err := s.fs.OpenFile("/fg/baz_file", 0, 0)
-       c.Assert(err, check.IsNil)
+       checkOpen := func(path string, exists bool) {
+               f, err := s.fs.Open(path)
+               if exists {
+                       if c.Check(err, check.IsNil) {
+                               c.Check(f.Close(), check.IsNil)
+                       }
+               } else {
+                       c.Check(err, check.Equals, os.ErrNotExist)
+               }
+       }
 
-       _, err = s.fs.OpenFile("/fg/A Subproject", 0, 0)
-       c.Assert(err, check.IsNil)
+       checkDirContains := func(parent, child string, exists bool) {
+               f, err := s.fs.Open(parent)
+               if !c.Check(err, check.IsNil) {
+                       return
+               }
+               ents, err := f.Readdir(-1)
+               if !c.Check(err, check.IsNil) {
+                       return
+               }
+               for _, ent := range ents {
+                       if !exists {
+                               c.Check(ent.Name(), check.Not(check.Equals), child)
+                               if child == "" {
+                                       // no children are expected
+                                       c.Errorf("child %q found in parent %q", child, parent)
+                               }
+                       } else if ent.Name() == child {
+                               return
+                       }
+               }
+               if exists {
+                       c.Errorf("child %q not found in parent %q", child, parent)
+               }
+       }
 
-       _, err = s.fs.OpenFile("/fg/A Project", 0, 0)
-       c.Assert(err, check.Not(check.IsNil))
+       checkOpen("/users/active/This filter group/baz_file", true)
+       checkOpen("/users/active/This filter group/A Subproject", true)
+       checkOpen("/users/active/This filter group/A Project", false)
+       s.fs.MountProject("fg", fixtureThisFilterGroupUUID)
+       checkOpen("/fg/baz_file", true)
+       checkOpen("/fg/A Subproject", true)
+       checkOpen("/fg/A Project", false)
+       s.fs.MountProject("home", "")
+       checkOpen("/home/A filter group with an is_a collection filter/baz_file", true)
+       checkOpen("/home/A filter group with an is_a collection filter/baz_file/baz", true)
+       checkOpen("/home/A filter group with an is_a collection filter/A Subproject", false)
+       checkOpen("/home/A filter group with an is_a collection filter/A Project", false)
 
        // An empty filter means everything that is visible should be returned.
+       checkOpen("/users/active/A filter group without filters/baz_file", true)
+       checkOpen("/users/active/A filter group without filters/A Subproject", true)
+       checkOpen("/users/active/A filter group without filters/A Project", true)
        s.fs.MountProject("fg2", fixtureAFilterGroupTwoUUID)
+       checkOpen("/fg2/baz_file", true)
+       checkOpen("/fg2/A Subproject", true)
+       checkOpen("/fg2/A Project", true)
 
-       _, err = s.fs.OpenFile("/fg2/baz_file", 0, 0)
-       c.Assert(err, check.IsNil)
-
-       _, err = s.fs.OpenFile("/fg2/A Subproject", 0, 0)
-       c.Assert(err, check.IsNil)
-
-       _, err = s.fs.OpenFile("/fg2/A Project", 0, 0)
-       c.Assert(err, check.IsNil)
+       // If a filter group matches itself or one of its ancestors,
+       // the matched item appears as an empty directory.
+       checkDirContains("/users/active/A filter group without filters", "A filter group without filters", true)
+       checkOpen("/users/active/A filter group without filters/A filter group without filters", true)
+       checkOpen("/users/active/A filter group without filters/A filter group without filters/baz_file", false)
+       checkDirContains("/users/active/A filter group without filters/A filter group without filters", "", false)
 
        // An 'is_a' 'arvados#collection' filter means only collections should be returned.
+       checkOpen("/users/active/A filter group with an is_a collection filter/baz_file", true)
+       checkOpen("/users/active/A filter group with an is_a collection filter/baz_file/baz", true)
+       checkOpen("/users/active/A filter group with an is_a collection filter/A Subproject", false)
+       checkOpen("/users/active/A filter group with an is_a collection filter/A Project", false)
        s.fs.MountProject("fg3", fixtureAFilterGroupThreeUUID)
-
-       _, err = s.fs.OpenFile("/fg3/baz_file", 0, 0)
-       c.Assert(err, check.IsNil)
-
-       _, err = s.fs.OpenFile("/fg3/A Subproject", 0, 0)
-       c.Assert(err, check.Not(check.IsNil))
+       checkOpen("/fg3/baz_file", true)
+       checkOpen("/fg3/baz_file/baz", true)
+       checkOpen("/fg3/A Subproject", false)
 
        // An 'exists' 'arvados#collection' filter means only collections with certain properties should be returned.
        s.fs.MountProject("fg4", fixtureAFilterGroupFourUUID)
-
-       _, err = s.fs.Stat("/fg4/collection with list property with odd values")
-       c.Assert(err, check.IsNil)
-
-       _, err = s.fs.Stat("/fg4/collection with list property with even values")
-       c.Assert(err, check.IsNil)
+       checkOpen("/fg4/collection with list property with odd values", true)
+       checkOpen("/fg4/collection with list property with even values", true)
+       checkOpen("/fg4/baz_file", false)
 
        // A 'contains' 'arvados#collection' filter means only collections with certain properties should be returned.
        s.fs.MountProject("fg5", fixtureAFilterGroupFiveUUID)
-
-       _, err = s.fs.Stat("/fg5/collection with list property with odd values")
-       c.Assert(err, check.IsNil)
-
-       _, err = s.fs.Stat("/fg5/collection with list property with string value")
-       c.Assert(err, check.IsNil)
-
-       _, err = s.fs.Stat("/fg5/collection with prop2 5")
-       c.Assert(err, check.Not(check.IsNil))
-
-       _, err = s.fs.Stat("/fg5/collection with list property with even values")
-       c.Assert(err, check.Not(check.IsNil))
+       checkOpen("/fg5/collection with list property with odd values", true)
+       checkOpen("/fg5/collection with list property with string value", true)
+       checkOpen("/fg5/collection with prop2 5", false)
+       checkOpen("/fg5/collection with list property with even values", false)
 }
 
 func (s *SiteFSSuite) TestCurrentUserHome(c *check.C) {
index a4a18837e00e7074521ce3e562fb30c21b84c1eb..d4f02416822a8b52494f62672a95bd00c1c04519 100644 (file)
@@ -123,6 +123,10 @@ func (fs *customFileSystem) ForwardSlashNameSubstitution(repl string) {
        fs.forwardSlashNameSubstitution = repl
 }
 
+func (fs *customFileSystem) MemorySize() int64 {
+       return fs.fileSystem.MemorySize() + fs.byIDRoot.MemorySize()
+}
+
 // SiteFileSystem returns a FileSystem that maps collections and other
 // Arvados objects onto a filesystem layout.
 //
@@ -386,3 +390,7 @@ func (hl *hardlink) FileInfo() os.FileInfo {
        }
        return fi
 }
+
+func (hl *hardlink) MemorySize() int64 {
+       return 64 + int64(len(hl.name))
+}
index c7d6b2a4646a33db1f0510b00d59bec812850fe6..2c86536b2f7f19a2308f8218910969b13fd22c31 100644 (file)
@@ -185,6 +185,16 @@ func (s *SiteFSSuite) TestByUUIDAndPDH(c *check.C) {
                names = append(names, fi.Name())
        }
        c.Check(names, check.DeepEquals, []string{"baz"})
+       f, err = s.fs.Open("/by_id/" + fixtureAProjectUUID + "/A Subproject/baz_file/baz")
+       c.Assert(err, check.IsNil)
+       err = f.Close()
+       c.Assert(err, check.IsNil)
+       _, err = s.fs.Open("/by_id/" + fixtureAProjectUUID + "/A Subproject/baz_file/baz/")
+       c.Assert(err, check.Equals, ErrNotADirectory)
+       _, err = s.fs.Open("/by_id/" + fixtureAProjectUUID + "/A Subproject/baz_file/baz/z")
+       c.Assert(err, check.Equals, ErrNotADirectory)
+       _, err = s.fs.Open("/by_id/" + fixtureAProjectUUID + "/A Subproject/baz_file/baz/..")
+       c.Assert(err, check.Equals, ErrNotADirectory)
 
        _, err = s.fs.OpenFile("/by_id/"+fixtureNonexistentCollection, os.O_RDWR|os.O_CREATE, 0755)
        c.Check(err, ErrorIs, ErrInvalidOperation)
index ac12f7ae13e93405b37a6814ed4e16bbda2b911e..3b8a618fea099255434033751a156ab62d9a02d8 100644 (file)
@@ -37,8 +37,9 @@ const (
        StorageClassesDesiredArchiveConfirmedDefault = "zzzzz-4zz18-3t236wr12769qqa"
        EmptyCollectionUUID                          = "zzzzz-4zz18-gs9ooj1h9sd5mde"
 
-       AProjectUUID    = "zzzzz-j7d0g-v955i6s2oi1cbso"
-       ASubprojectUUID = "zzzzz-j7d0g-axqo7eu9pwvna1x"
+       AProjectUUID     = "zzzzz-j7d0g-v955i6s2oi1cbso"
+       ASubprojectUUID  = "zzzzz-j7d0g-axqo7eu9pwvna1x"
+       AFilterGroupUUID = "zzzzz-j7d0g-thisfiltergroup"
 
        FooAndBarFilesInDirUUID = "zzzzz-4zz18-foonbarfilesdir"
        FooAndBarFilesInDirPDH  = "870369fc72738603c2fad16664e50e2d+58"
index 43fcdba5c69a20a1817aa77400ca6cae95b0513d..d6eb033ff5cd2717369aa5d034f01a140889d72a 100644 (file)
@@ -7,12 +7,9 @@
 
 package org.arvados.client.api.client;
 
-import okhttp3.MediaType;
-import okhttp3.RequestBody;
 import okio.BufferedSink;
 import okio.Okio;
 import okio.Source;
-import org.slf4j.Logger;
 
 import java.io.File;
 
@@ -20,32 +17,20 @@ import java.io.File;
  * Based on:
  * {@link} https://gist.github.com/eduardb/dd2dc530afd37108e1ac
  */
-public class CountingFileRequestBody extends RequestBody {
-
-    private static final int SEGMENT_SIZE = 2048; // okio.Segment.SIZE
-    private static final MediaType CONTENT_BINARY = MediaType.parse(com.google.common.net.MediaType.OCTET_STREAM.toString());
-
-    private final File file;
-    private final ProgressListener listener;
+public class CountingFileRequestBody extends CountingRequestBody<File> {
 
     CountingFileRequestBody(final File file, final ProgressListener listener) {
-        this.file = file;
-        this.listener = listener;
+        super(file, listener);
     }
 
     @Override
     public long contentLength() {
-        return file.length();
-    }
-
-    @Override
-    public MediaType contentType() {
-        return CONTENT_BINARY;
+        return requestBodyData.length();
     }
 
     @Override
     public void writeTo(BufferedSink sink) {
-        try (Source source = Okio.source(file)) {
+        try (Source source = Okio.source(requestBodyData)) {
             long total = 0;
             long read;
 
@@ -61,24 +46,4 @@ public class CountingFileRequestBody extends RequestBody {
             //ignore
         }
     }
-
-    static class TransferData {
-
-        private final Logger log = org.slf4j.LoggerFactory.getLogger(TransferData.class);
-        private int progressValue;
-        private long totalSize;
-
-        TransferData(long totalSize) {
-            this.progressValue = 0;
-            this.totalSize = totalSize;
-        }
-
-        void updateTransferProgress(long transferred) {
-            float progress = (transferred / (float) totalSize) * 100;
-            if (progressValue != (int) progress) {
-                progressValue = (int) progress;
-                log.debug("{} / {} / {}%", transferred, totalSize, progressValue);
-            }
-        }
-    }
 }
\ No newline at end of file
diff --git a/sdk/java-v2/src/main/java/org/arvados/client/api/client/CountingRequestBody.java b/sdk/java-v2/src/main/java/org/arvados/client/api/client/CountingRequestBody.java
new file mode 100644 (file)
index 0000000..397a1e2
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Copyright (C) The Arvados Authors. All rights reserved.
+ *
+ * SPDX-License-Identifier: AGPL-3.0 OR Apache-2.0
+ *
+ */
+
+package org.arvados.client.api.client;
+
+import okhttp3.MediaType;
+import okhttp3.RequestBody;
+import org.slf4j.Logger;
+
+abstract class CountingRequestBody<T> extends RequestBody {
+
+    protected static final int SEGMENT_SIZE = 2048; // okio.Segment.SIZE
+    protected static final MediaType CONTENT_BINARY = MediaType.parse(com.google.common.net.MediaType.OCTET_STREAM.toString());
+
+    protected final ProgressListener listener;
+
+    protected final T requestBodyData;
+
+    CountingRequestBody(T file, final ProgressListener listener) {
+        this.requestBodyData = file;
+        this.listener = listener;
+    }
+
+    @Override
+    public MediaType contentType() {
+        return CONTENT_BINARY;
+    }
+
+    static class TransferData {
+
+        private final Logger log = org.slf4j.LoggerFactory.getLogger(TransferData.class);
+        private int progressValue;
+        private long totalSize;
+
+        TransferData(long totalSize) {
+            this.progressValue = 0;
+            this.totalSize = totalSize;
+        }
+
+        void updateTransferProgress(long transferred) {
+            float progress = (transferred / (float) totalSize) * 100;
+            if (progressValue != (int) progress) {
+                progressValue = (int) progress;
+                log.debug("{} / {} / {}%", transferred, totalSize, progressValue);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/sdk/java-v2/src/main/java/org/arvados/client/api/client/CountingStreamRequestBody.java b/sdk/java-v2/src/main/java/org/arvados/client/api/client/CountingStreamRequestBody.java
new file mode 100644 (file)
index 0000000..7c39371
--- /dev/null
@@ -0,0 +1,47 @@
+/*
+ * Copyright (C) The Arvados Authors. All rights reserved.
+ *
+ * SPDX-License-Identifier: AGPL-3.0 OR Apache-2.0
+ *
+ */
+
+package org.arvados.client.api.client;
+
+import okio.BufferedSink;
+import okio.Okio;
+import okio.Source;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class CountingStreamRequestBody extends CountingRequestBody<InputStream> {
+
+    CountingStreamRequestBody(final InputStream inputStream, final ProgressListener listener) {
+        super(inputStream, listener);
+    }
+
+    @Override
+    public long contentLength() throws IOException {
+        return requestBodyData.available();
+    }
+
+    @Override
+    public void writeTo(BufferedSink sink) {
+        try (Source source = Okio.source(requestBodyData)) {
+            long total = 0;
+            long read;
+
+            while ((read = source.read(sink.buffer(), SEGMENT_SIZE)) != -1) {
+                total += read;
+                sink.flush();
+                listener.updateProgress(total);
+
+            }
+        } catch (RuntimeException rethrown) {
+            throw rethrown;
+        } catch (Exception ignored) {
+            //ignore
+        }
+    }
+}
\ No newline at end of file
index a9306ca2ecf970591be242164d7135f5458f929a..c1525e07a77003a9c9e8254c970bfae97cb7c63a 100644 (file)
@@ -9,7 +9,7 @@ package org.arvados.client.api.client;
 
 import okhttp3.Request;
 import okhttp3.RequestBody;
-import org.arvados.client.api.client.CountingFileRequestBody.TransferData;
+import org.arvados.client.api.client.CountingRequestBody.TransferData;
 import org.arvados.client.common.Headers;
 import org.arvados.client.config.ConfigProvider;
 import org.slf4j.Logger;
index 05d39e9e6085ac66f6ea0aa1901c3ce2ae096f68..2c3168649ff70b734eccb57b627a83967f3cf94e 100644 (file)
@@ -13,6 +13,7 @@ import okhttp3.RequestBody;
 import org.arvados.client.config.ConfigProvider;
 
 import java.io.File;
+import java.io.InputStream;
 
 public class KeepWebApiClient extends BaseApiClient {
 
@@ -48,6 +49,16 @@ public class KeepWebApiClient extends BaseApiClient {
         return newCall(request);
     }
 
+    public String upload(String collectionUuid, InputStream inputStream, String fileName, ProgressListener progressListener) {
+        RequestBody requestBody = new CountingStreamRequestBody(inputStream, progressListener);
+
+        Request request = getRequestBuilder()
+                .url(getUrlBuilder(collectionUuid, fileName).build())
+                .put(requestBody)
+                .build();
+        return newCall(request);
+    }
+
     private HttpUrl.Builder getUrlBuilder(String collectionUuid, String filePathName) {
         return new HttpUrl.Builder()
                 .scheme(config.getApiProtocol())
index e90f3812982063cf078e2804cc28931121378709..83f658201ca3b8c8100c7a0497744af9f4cb3f49 100644 (file)
@@ -6,8 +6,8 @@
 This module provides the entire Python SDK for Arvados. The most useful modules
 include:
 
-* arvados.api - After you `import arvados`, you can call `arvados.api.api` as
-  `arvados.api` to construct a client object.
+* arvados.api - After you `import arvados`, you can call `arvados.api` as a
+  shortcut to the client constructor function `arvados.api.api`.
 
 * arvados.collection - The `arvados.collection.Collection` class provides a
   high-level interface to read and write collections. It coordinates sending
@@ -26,15 +26,24 @@ import types
 
 from collections import UserDict
 
-from .api import api, api_from_config, http_cache
+from . import api, errors, util
+from .api import api_from_config, http_cache
 from .collection import CollectionReader, CollectionWriter, ResumableCollectionWriter
 from arvados.keep import *
 from arvados.stream import *
 from .arvfile import StreamFileReader
 from .logging import log_format, log_date_format, log_handler
 from .retry import RetryLoop
-import arvados.errors as errors
-import arvados.util as util
+
+# Previous versions of the PySDK used to say `from .api import api`.  This
+# made it convenient to call the API client constructor, but difficult to
+# access the rest of the `arvados.api` module. The magic below fixes that
+# bug while retaining backwards compatibility: `arvados.api` is now the
+# module and you can import it normally, but we make that module callable so
+# all the existing code that says `arvados.api('v1', ...)` still works.
+class _CallableAPIModule(api.__class__):
+    __call__ = staticmethod(api.api)
+api.__class__ = _CallableAPIModule
 
 # Override logging module pulled in via `from ... import *`
 # so users can `import arvados.logging`.
index e53e4980a86f01a595649331d020c6b87e823e6a..917c876706ffb2bcab4189d5fca1da430b37a1fc 100644 (file)
 # Copyright (C) The Arvados Authors. All rights reserved.
 #
 # SPDX-License-Identifier: Apache-2.0
+"""Follow events on an Arvados cluster
 
-from __future__ import absolute_import
-from future import standard_library
-standard_library.install_aliases()
-from builtins import str
-from builtins import object
-import arvados
-from . import config
-from . import errors
-from .retry import RetryLoop
+This module provides different ways to get notified about events that happen
+on an Arvados cluster. You indicate which events you want updates about, and
+provide a function that is called any time one of those events is received
+from the server.
 
-import logging
+`subscribe` is the main entry point. It helps you construct one of the two
+API-compatible client classes: `EventClient` (which uses WebSockets) or
+`PollClient` (which periodically queries the logs list methods).
+"""
+
+import enum
 import json
-import _thread
-import threading
-import time
+import logging
 import os
 import re
 import ssl
-from ws4py.client.threadedclient import WebSocketClient
+import sys
+import _thread
+import threading
+import time
+
+import websockets.exceptions as ws_exc
+import websockets.sync.client as ws_client
+
+from . import config
+from . import errors
+from . import util
+from .retry import RetryLoop
+from ._version import __version__
+
+from typing import (
+    Any,
+    Callable,
+    Dict,
+    Iterable,
+    List,
+    Optional,
+    Union,
+)
+
+EventCallback = Callable[[Dict[str, Any]], object]
+"""Type signature for an event handler callback"""
+FilterCondition = List[Union[None, str, 'Filter']]
+"""Type signature for a single filter condition"""
+Filter = List[FilterCondition]
+"""Type signature for an entire filter"""
 
 _logger = logging.getLogger('arvados.events')
 
+class WSMethod(enum.Enum):
+    """Arvados WebSocket methods
 
-class _EventClient(WebSocketClient):
-    def __init__(self, url, filters, on_event, last_log_id, on_closed):
-        ssl_options = {'ca_certs': arvados.util.ca_certs_path()}
-        if config.flag_is_true('ARVADOS_API_HOST_INSECURE'):
-            ssl_options['cert_reqs'] = ssl.CERT_NONE
-        else:
-            ssl_options['cert_reqs'] = ssl.CERT_REQUIRED
+    This enum represents valid values for the `method` field in messages
+    sent to an Arvados WebSocket server.
+    """
+    SUBSCRIBE = 'subscribe'
+    SUB = SUBSCRIBE
+    UNSUBSCRIBE = 'unsubscribe'
+    UNSUB = UNSUBSCRIBE
 
-        # Warning: If the host part of url resolves to both IPv6 and
-        # IPv4 addresses (common with "localhost"), only one of them
-        # will be attempted -- and it might not be the right one. See
-        # ws4py's WebSocketBaseClient.__init__.
-        super(_EventClient, self).__init__(url, ssl_options=ssl_options)
 
-        self.filters = filters
-        self.on_event = on_event
+class EventClient(threading.Thread):
+    """Follow Arvados events via WebSocket
+
+    EventClient follows events on Arvados cluster published by the WebSocket
+    server. Users can select the events they want to follow and run their own
+    callback function on each.
+    """
+    _USER_AGENT = 'Python/{}.{}.{} arvados.events/{}'.format(
+        *sys.version_info[:3],
+        __version__,
+    )
+
+    def __init__(
+            self,
+            url: str,
+            filters: Optional[Filter],
+            on_event_cb: EventCallback,
+            last_log_id: Optional[int]=None,
+            *,
+            insecure: Optional[bool]=None,
+    ) -> None:
+        """Initialize a WebSocket client
+
+        Constructor arguments:
+
+        * url: str --- The `wss` URL for an Arvados WebSocket server.
+
+        * filters: arvados.events.Filter | None --- One event filter to
+          subscribe to after connecting to the WebSocket server. If not
+          specified, the client will subscribe to all events.
+
+        * on_event_cb: arvados.events.EventCallback --- When the client
+          receives an event from the WebSocket server, it calls this
+          function with the event object.
+
+        * last_log_id: int | None --- If specified, this will be used as the
+          value for the `last_log_id` field in subscribe messages sent by
+          the client.
+
+        Constructor keyword arguments:
+
+        * insecure: bool | None --- If `True`, the client will not check the
+          validity of the server's TLS certificate. If not specified, uses
+          the value from the user's `ARVADOS_API_HOST_INSECURE` setting.
+        """
+        self.url = url
+        self.filters = [filters or []]
+        self.on_event_cb = on_event_cb
         self.last_log_id = last_log_id
-        self._closing_lock = threading.RLock()
-        self._closing = False
-        self._closed = threading.Event()
-        self.on_closed = on_closed
+        self.is_closed = threading.Event()
+        self._ssl_ctx = ssl.create_default_context(
+            purpose=ssl.Purpose.SERVER_AUTH,
+            cafile=util.ca_certs_path(),
+        )
+        if insecure is None:
+            insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
+        if insecure:
+            self._ssl_ctx.check_hostname = False
+            self._ssl_ctx.verify_mode = ssl.CERT_NONE
+        self._subscribe_lock = threading.Lock()
+        self._connect()
+        super().__init__(daemon=True)
+        self.start()
+
+    def _connect(self) -> None:
+        # There are no locks protecting this method. After the thread starts,
+        # it should only be called from inside.
+        self._client = ws_client.connect(
+            self.url,
+            logger=_logger,
+            ssl_context=self._ssl_ctx,
+            user_agent_header=self._USER_AGENT,
+        )
+        self._client_ok = True
+
+    def _subscribe(self, f: Filter, last_log_id: Optional[int]) -> None:
+        extra = {}
+        if last_log_id is not None:
+            extra['last_log_id'] = last_log_id
+        return self._update_sub(WSMethod.SUBSCRIBE, f, **extra)
 
-    def opened(self):
-        for f in self.filters:
-            self.subscribe(f, self.last_log_id)
+    def _update_sub(self, method: WSMethod, f: Filter, **extra: Any) -> None:
+        msg = json.dumps({
+            'method': method.value,
+            'filters': f,
+            **extra,
+        })
+        self._client.send(msg)
 
-    def closed(self, code, reason=None):
-        self._closed.set()
-        self.on_closed()
+    def close(self, code: int=1000, reason: str='', timeout: float=0) -> None:
+        """Close the WebSocket connection and stop processing events
 
-    def received_message(self, m):
-        with self._closing_lock:
-            if not self._closing:
-                self.on_event(json.loads(str(m)))
+        Arguments:
 
-    def close(self, code=1000, reason='', timeout=0):
-        """Close event client and optionally wait for it to finish.
+        * code: int --- The WebSocket close code sent to the server when
+          disconnecting. Default 1000.
 
-        :timeout: is the number of seconds to wait for ws4py to
-        indicate that the connection has closed.
+        * reason: str --- The WebSocket close reason sent to the server when
+          disconnecting. Default is the empty string.
+
+        * timeout: float --- How long to wait for the WebSocket server to
+          acknowledge the disconnection, in seconds. Default 0, which means
+          no timeout.
         """
-        super(_EventClient, self).close(code, reason)
-        with self._closing_lock:
-            # make sure we don't process any more messages.
-            self._closing = True
-        # wait for ws4py to tell us the connection is closed.
-        self._closed.wait(timeout=timeout)
+        self.is_closed.set()
+        self._client.close_timeout = timeout or None
+        self._client.close(code, reason)
 
-    def subscribe(self, f, last_log_id=None):
-        m = {"method": "subscribe", "filters": f}
-        if last_log_id is not None:
-            m["last_log_id"] = last_log_id
-        self.send(json.dumps(m))
+    def run_forever(self) -> None:
+        """Run the WebSocket client indefinitely
 
-    def unsubscribe(self, f):
-        self.send(json.dumps({"method": "unsubscribe", "filters": f}))
+        This method blocks until the `close` method is called (e.g., from
+        another thread) or the client permanently loses its connection.
+        """
+        # Have to poll here to let KeyboardInterrupt get raised.
+        while not self.is_closed.wait(1):
+            pass
 
+    def subscribe(self, f: Filter, last_log_id: Optional[int]=None) -> None:
+        """Subscribe to another set of events from the server
 
-class EventClient(object):
-    def __init__(self, url, filters, on_event_cb, last_log_id):
-        self.url = url
-        if filters:
-            self.filters = [filters]
-        else:
-            self.filters = [[]]
-        self.on_event_cb = on_event_cb
-        self.last_log_id = last_log_id
-        self.is_closed = threading.Event()
-        self._setup_event_client()
+        Arguments:
 
-    def _setup_event_client(self):
-        self.ec = _EventClient(self.url, self.filters, self.on_event,
-                               self.last_log_id, self.on_closed)
-        self.ec.daemon = True
-        try:
-            self.ec.connect()
-        except Exception:
-            self.ec.close_connection()
-            raise
+        * f: arvados.events.Filter | None --- One filter to subscribe to
+          events for.
 
-    def subscribe(self, f, last_log_id=None):
-        self.filters.append(f)
-        self.ec.subscribe(f, last_log_id)
+        * last_log_id: int | None --- If specified, request events starting
+          from this id. If not specified, the server will only send events
+          that occur after processing the subscription.
+        """
+        with self._subscribe_lock:
+            self._subscribe(f, last_log_id)
+            self.filters.append(f)
 
-    def unsubscribe(self, f):
-        del self.filters[self.filters.index(f)]
-        self.ec.unsubscribe(f)
+    def unsubscribe(self, f: Filter) -> None:
+        """Unsubscribe from an event stream
 
-    def close(self, code=1000, reason='', timeout=0):
-        self.is_closed.set()
-        self.ec.close(code, reason, timeout)
+        Arguments:
+
+        * f: arvados.events.Filter | None --- One event filter to stop
+        receiving events for.
+        """
+        with self._subscribe_lock:
+            try:
+                index = self.filters.index(f)
+            except ValueError:
+                raise ValueError(f"filter not subscribed: {f!r}") from None
+            self._update_sub(WSMethod.UNSUBSCRIBE, f)
+            del self.filters[index]
+
+    def on_closed(self) -> None:
+        """Handle disconnection from the WebSocket server
+
+        This method is called when the client loses its connection from
+        receiving events. This implementation tries to establish a new
+        connection if it was not closed client-side.
+        """
+        if self.is_closed.is_set():
+            return
+        _logger.warning("Unexpected close. Reconnecting.")
+        for _ in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
+            try:
+                self._connect()
+            except Exception as e:
+                _logger.warning("Error '%s' during websocket reconnect.", e)
+            else:
+                _logger.warning("Reconnect successful.")
+                break
+        else:
+            _logger.error("EventClient thread could not contact websocket server.")
+            self.is_closed.set()
+            _thread.interrupt_main()
+
+    def on_event(self, m: Dict[str, Any]) -> None:
+        """Handle an event from the WebSocket server
 
-    def on_event(self, m):
-        if m.get('id') != None:
-            self.last_log_id = m.get('id')
+        This method is called whenever the client receives an event from the
+        server. This implementation records the `id` field internally, then
+        calls the callback function provided at initialization time.
+
+        Arguments:
+
+        * m: Dict[str, Any] --- The event object, deserialized from JSON.
+        """
+        try:
+            self.last_log_id = m['id']
+        except KeyError:
+            pass
         try:
             self.on_event_cb(m)
-        except Exception as e:
+        except Exception:
             _logger.exception("Unexpected exception from event callback.")
             _thread.interrupt_main()
 
-    def on_closed(self):
-        if not self.is_closed.is_set():
-            _logger.warning("Unexpected close. Reconnecting.")
-            for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
-                try:
-                    self._setup_event_client()
-                    _logger.warning("Reconnect successful.")
-                    break
-                except Exception as e:
-                    _logger.warning("Error '%s' during websocket reconnect.", e)
-            if tries_left == 0:
-                _logger.exception("EventClient thread could not contact websocket server.")
-                self.is_closed.set()
-                _thread.interrupt_main()
-                return
+    def run(self) -> None:
+        """Run the client loop
 
-    def run_forever(self):
-        # Have to poll here to let KeyboardInterrupt get raised.
-        while not self.is_closed.wait(1):
-            pass
+        This method runs in a separate thread to receive and process events
+        from the server.
+        """
+        self.setName(f'ArvadosWebsockets-{self.ident}')
+        while self._client_ok and not self.is_closed.is_set():
+            try:
+                with self._subscribe_lock:
+                    for f in self.filters:
+                        self._subscribe(f, self.last_log_id)
+                for msg_s in self._client:
+                    if not self.is_closed.is_set():
+                        msg = json.loads(msg_s)
+                        self.on_event(msg)
+            except ws_exc.ConnectionClosed:
+                self._client_ok = False
+                self.on_closed()
 
 
 class PollClient(threading.Thread):
-    def __init__(self, api, filters, on_event, poll_time, last_log_id):
+    """Follow Arvados events via polling logs
+
+    PollClient follows events on Arvados cluster by periodically running
+    logs list API calls. Users can select the events they want to follow and
+    run their own callback function on each.
+    """
+    def __init__(
+            self,
+            api: 'arvados.api_resources.ArvadosAPIClient',
+            filters: Optional[Filter],
+            on_event: EventCallback,
+            poll_time: float=15,
+            last_log_id: Optional[int]=None,
+    ) -> None:
+        """Initialize a polling client
+
+        Constructor arguments:
+
+        * api: arvados.api_resources.ArvadosAPIClient --- The Arvados API
+          client used to query logs. It will be used in a separate thread,
+          so if it is not an instance of `arvados.safeapi.ThreadSafeApiCache`
+          it should not be reused after the thread is started.
+
+        * filters: arvados.events.Filter | None --- One event filter to
+          subscribe to after connecting to the WebSocket server. If not
+          specified, the client will subscribe to all events.
+
+        * on_event: arvados.events.EventCallback --- When the client
+          receives an event from the WebSocket server, it calls this
+          function with the event object.
+
+        * poll_time: float --- The number of seconds to wait between querying
+          logs. Default 15.
+
+        * last_log_id: int | None --- If specified, queries will include a
+          filter for logs with an `id` at least this value.
+        """
         super(PollClient, self).__init__()
         self.api = api
         if filters:
@@ -174,6 +341,11 @@ class PollClient(threading.Thread):
             self._skip_old_events = False
 
     def run(self):
+        """Run the client loop
+
+        This method runs in a separate thread to poll and process events
+        from the server.
+        """
         self.on_event({'status': 200})
 
         while not self._closing.is_set():
@@ -262,23 +434,29 @@ class PollClient(threading.Thread):
                 self._closing.wait(self.poll_time)
 
     def run_forever(self):
+        """Run the polling client indefinitely
+
+        This method blocks until the `close` method is called (e.g., from
+        another thread) or the client permanently loses its connection.
+        """
         # Have to poll here, otherwise KeyboardInterrupt will never get processed.
         while not self._closing.is_set():
             self._closing.wait(1)
 
-    def close(self, code=None, reason=None, timeout=0):
-        """Close poll client and optionally wait for it to finish.
+    def close(self, code: Optional[int]=None, reason: Optional[str]=None, timeout: float=0) -> None:
+        """Stop polling and processing events
 
-        If an :on_event: handler is running in a different thread,
-        first wait (indefinitely) for it to return.
+        Arguments:
 
-        After closing, wait up to :timeout: seconds for the thread to
-        finish the poll request in progress (if any).
+        * code: Optional[int] --- Ignored; this argument exists for API
+          compatibility with `EventClient.close`.
 
-        :code: and :reason: are ignored. They are present for
-        interface compatibility with EventClient.
-        """
+        * reason: Optional[str] --- Ignored; this argument exists for API
+          compatibility with `EventClient.close`.
 
+        * timeout: float --- How long to wait for the client thread to finish
+          processing events. Default 0, which means no timeout.
+        """
         with self._closing_lock:
             self._closing.set()
         try:
@@ -290,11 +468,27 @@ class PollClient(threading.Thread):
             # to do so raises the same exception."
             pass
 
-    def subscribe(self, f):
+    def subscribe(self, f: Filter, last_log_id: Optional[int]=None) -> None:
+        """Subscribe to another set of events from the server
+
+        Arguments:
+
+        * f: arvados.events.Filter | None --- One filter to subscribe to.
+
+        * last_log_id: Optional[int] --- Ignored; this argument exists for
+          API compatibility with `EventClient.subscribe`.
+        """
         self.on_event({'status': 200})
         self.filters.append(f)
 
     def unsubscribe(self, f):
+        """Unsubscribe from an event stream
+
+        Arguments:
+
+        * f: arvados.events.Filter | None --- One event filter to stop
+        receiving events for.
+        """
         del self.filters[self.filters.index(f)]
 
 
@@ -312,21 +506,42 @@ def _subscribe_websocket(api, filters, on_event, last_log_id=None):
     else:
         return client
 
-
-def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None):
+def subscribe(
+        api: 'arvados.api_resources.ArvadosAPIClient',
+        filters: Optional[Filter],
+        on_event: EventCallback,
+        poll_fallback: float=15,
+        last_log_id: Optional[int]=None,
+) -> Union[EventClient, PollClient]:
+    """Start a thread to monitor events
+
+    This method tries to construct an `EventClient` to process Arvados
+    events via WebSockets. If that fails, or the
+    `ARVADOS_DISABLE_WEBSOCKETS` flag is set in user configuration, it falls
+    back to constructing a `PollClient` to process the events via API
+    polling.
+
+    Arguments:
+
+    * api: arvados.api_resources.ArvadosAPIClient --- The Arvados API
+      client used to query logs. It may be used in a separate thread,
+      so if it is not an instance of `arvados.safeapi.ThreadSafeApiCache`
+      it should not be reused after this method returns.
+
+    * filters: arvados.events.Filter | None --- One event filter to
+      subscribe to after initializing the client. If not specified, the
+      client will subscribe to all events.
+
+    * on_event: arvados.events.EventCallback --- When the client receives an
+      event, it calls this function with the event object.
+
+    * poll_time: float --- The number of seconds to wait between querying
+      logs. If 0, this function will refuse to construct a `PollClient`.
+      Default 15.
+
+    * last_log_id: int | None --- If specified, start processing events with
+      at least this `id` value.
     """
-    :api:
-      a client object retrieved from arvados.api(). The caller should not use this client object for anything else after calling subscribe().
-    :filters:
-      Initial subscription filters.
-    :on_event:
-      The callback when a message is received.
-    :poll_fallback:
-      If websockets are not available, fall back to polling every N seconds.  If poll_fallback=False, this will return None if websockets are not available.
-    :last_log_id:
-      Log rows that are newer than the log id
-    """
-
     if not poll_fallback:
         return _subscribe_websocket(api, filters, on_event, last_log_id)
 
index 9ba9629bca48bb574d7a9a377c844a7b6ae5904f..284a460f1a175e7e081ebb862a276fb4e8e45e3e 100644 (file)
@@ -116,21 +116,21 @@ setup(name='arvados-python-client',
       ],
       install_requires=[
           'ciso8601 >=2.0.0',
+          'dataclasses; python_version<"3.7"',
           'future',
           'google-api-core <2.11.0', # 2.11.0rc1 is incompatible with google-auth<2
           'google-api-python-client >=2.1.0',
           'google-auth <2',
           'httplib2 >=0.9.2, <0.20.2',
+          'protobuf <4.0.0dev',
           'pycurl >=7.19.5.1, <7.45.0',
+          'pyparsing <3',
           'ruamel.yaml >=0.15.54, <0.17.22',
           'setuptools >=40.3.0',
           # As of 4.8.0rc1, typing_extensions does not parse in Python 3.7
           'typing_extensions >=3.7.4, <4.8; python_version<"3.8"',
-          'ws4py >=0.4.2',
-          'protobuf <4.0.0dev',
-          'pyparsing <3',
-          'setuptools >=40.3.0',
-          'dataclasses; python_version<"3.7"',
+          'websockets >=11.0',
+          'websockets ~=11.0; python_version<"3.8"',
       ],
       classifiers=[
           'Programming Language :: Python :: 3',
index f5192160f3e5fad01d080b0eb16bf834bdfd1ed6..b4e6a0b1cd88204b2dc8f699c85d01e2c805abe1 100644 (file)
@@ -2,13 +2,7 @@
 #
 # SPDX-License-Identifier: Apache-2.0
 
-from __future__ import print_function
-from __future__ import absolute_import
-from __future__ import division
-from future import standard_library
-standard_library.install_aliases()
-from builtins import range
-from builtins import object
+import json
 import logging
 import mock
 import queue
@@ -17,10 +11,62 @@ import threading
 import time
 import unittest
 
+import websockets.exceptions as ws_exc
+
 import arvados
 from . import arvados_testutil as tutil
 from . import run_test_server
 
+class FakeWebsocketClient:
+    """Fake self-contained version of websockets.sync.client.ClientConnection
+
+    This provides enough of the API to test EventClient. It loosely mimics
+    the Arvados WebSocket API by acknowledging subscribe messages. You can use
+    `mock_wrapper` to test calls. You can set `_check_lock` to test that the
+    given lock is acquired before `send` is called.
+    """
+
+    def __init__(self):
+        self._check_lock = None
+        self._closed = threading.Event()
+        self._messages = queue.Queue()
+
+    def mock_wrapper(self):
+        wrapper = mock.Mock(wraps=self)
+        wrapper.__iter__ = lambda _: self.__iter__()
+        return wrapper
+
+    def __iter__(self):
+        while True:
+            msg = self._messages.get()
+            self._messages.task_done()
+            if isinstance(msg, Exception):
+                raise msg
+            else:
+                yield msg
+
+    def close(self, code=1000, reason=''):
+        if not self._closed.is_set():
+            self._closed.set()
+            self.force_disconnect()
+
+    def force_disconnect(self):
+        self._messages.put(ws_exc.ConnectionClosed(None, None))
+
+    def send(self, msg):
+        if self._check_lock is not None and self._check_lock.acquire(blocking=False):
+            self._check_lock.release()
+            raise AssertionError(f"called ws_client.send() without lock")
+        elif self._closed.is_set():
+            raise ws_exc.ConnectionClosed(None, None)
+        try:
+            msg = json.loads(msg)
+        except ValueError:
+            status = 400
+        else:
+            status = 200
+        self._messages.put(json.dumps({'status': status}))
+
 
 class WebsocketTest(run_test_server.TestCaseWithServers):
     MAIN_SERVER = {}
@@ -201,7 +247,7 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
 
         # close (im)properly
         if close_unexpected:
-            self.ws.ec.close_connection()
+            self.ws._client.close()
         else:
             self.ws.close()
 
@@ -240,69 +286,115 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
         self._test_websocket_reconnect(False)
 
     # Test websocket reconnection retry
-    @mock.patch('arvados.events._EventClient.connect')
-    def test_websocket_reconnect_retry(self, event_client_connect):
-        event_client_connect.side_effect = [None, Exception('EventClient.connect error'), None]
-
+    @mock.patch('arvados.events.ws_client.connect')
+    def test_websocket_reconnect_retry(self, ws_conn):
         logstream = tutil.StringIO()
         rootLogger = logging.getLogger()
         streamHandler = logging.StreamHandler(logstream)
         rootLogger.addHandler(streamHandler)
-
-        run_test_server.authorize_with('active')
-        events = queue.Queue(100)
-
-        filters = [['object_uuid', 'is_a', 'arvados#human']]
-        self.ws = arvados.events.subscribe(
-            arvados.api('v1'), filters,
-            events.put_nowait,
-            poll_fallback=False,
-            last_log_id=None)
-        self.assertIsInstance(self.ws, arvados.events.EventClient)
-
-        # simulate improper close
-        self.ws.on_closed()
-
-        # verify log messages to ensure retry happened
-        log_messages = logstream.getvalue()
-        found = log_messages.find("Error 'EventClient.connect error' during websocket reconnect.")
-        self.assertNotEqual(found, -1)
-        rootLogger.removeHandler(streamHandler)
-
-    @mock.patch('arvados.events._EventClient')
-    def test_subscribe_method(self, websocket_client):
-        filters = [['object_uuid', 'is_a', 'arvados#human']]
-        client = arvados.events.EventClient(
-            self.MOCK_WS_URL, [], lambda event: None, None)
-        client.subscribe(filters[:], 99)
-        websocket_client().subscribe.assert_called_with(filters, 99)
-
-    @mock.patch('arvados.events._EventClient')
-    def test_unsubscribe(self, websocket_client):
-        filters = [['object_uuid', 'is_a', 'arvados#human']]
-        client = arvados.events.EventClient(
-            self.MOCK_WS_URL, filters[:], lambda event: None, None)
-        client.unsubscribe(filters[:])
-        websocket_client().unsubscribe.assert_called_with(filters)
-
-    @mock.patch('arvados.events._EventClient')
+        try:
+            msg_event, wss_client, self.ws = self.fake_client(ws_conn)
+            self.assertTrue(msg_event.wait(timeout=1), "timed out waiting for setup callback")
+            msg_event.clear()
+            ws_conn.side_effect = [Exception('EventClient.connect error'), wss_client]
+            wss_client.force_disconnect()
+            self.assertTrue(msg_event.wait(timeout=1), "timed out waiting for reconnect callback")
+            # verify log messages to ensure retry happened
+            self.assertIn("Error 'EventClient.connect error' during websocket reconnect.", logstream.getvalue())
+            self.assertEqual(ws_conn.call_count, 3)
+        finally:
+            rootLogger.removeHandler(streamHandler)
+
+    @mock.patch('arvados.events.ws_client.connect')
     def test_run_forever_survives_reconnects(self, websocket_client):
-        connected = threading.Event()
-        websocket_client().connect.side_effect = connected.set
         client = arvados.events.EventClient(
             self.MOCK_WS_URL, [], lambda event: None, None)
         forever_thread = threading.Thread(target=client.run_forever)
         forever_thread.start()
         # Simulate an unexpected disconnect, and wait for reconnect.
-        close_thread = threading.Thread(target=client.on_closed)
-        close_thread.start()
-        self.assertTrue(connected.wait(timeout=self.TEST_TIMEOUT))
-        close_thread.join()
-        run_forever_alive = forever_thread.is_alive()
-        client.close()
-        forever_thread.join()
-        self.assertTrue(run_forever_alive)
-        self.assertEqual(2, websocket_client().connect.call_count)
+        try:
+            client.on_closed()
+            self.assertTrue(forever_thread.is_alive())
+            self.assertEqual(2, websocket_client.call_count)
+        finally:
+            client.close()
+            forever_thread.join()
+
+    @staticmethod
+    def fake_client(conn_patch, filters=None, url=MOCK_WS_URL):
+        """Set up EventClient test infrastructure
+
+        Given a patch of `arvados.events.ws_client.connect`,
+        this returns a 3-tuple:
+
+        * `msg_event` is a `threading.Event` that is set as the test client
+          event callback. You can wait for this event to confirm that a
+          sent message has been acknowledged and processed.
+
+        * `mock_client` is a `mock.Mock` wrapper around `FakeWebsocketClient`.
+          Use this to assert `EventClient` calls the right methods. It tests
+          that `EventClient` acquires a lock before calling `send`.
+
+        * `client` is the `EventClient` that uses `mock_client` under the hood
+          that you exercise methods of.
+
+        Other arguments are passed to initialize `EventClient`.
+        """
+        msg_event = threading.Event()
+        fake_client = FakeWebsocketClient()
+        mock_client = fake_client.mock_wrapper()
+        conn_patch.return_value = mock_client
+        client = arvados.events.EventClient(url, filters, lambda _: msg_event.set())
+        fake_client._check_lock = client._subscribe_lock
+        return msg_event, mock_client, client
+
+    @mock.patch('arvados.events.ws_client.connect')
+    def test_subscribe_locking(self, ws_conn):
+        f = [['created_at', '>=', '2023-12-01T00:00:00.000Z']]
+        msg_event, wss_client, self.ws = self.fake_client(ws_conn)
+        self.assertTrue(msg_event.wait(timeout=1), "timed out waiting for setup callback")
+        msg_event.clear()
+        wss_client.send.reset_mock()
+        self.ws.subscribe(f)
+        self.assertTrue(msg_event.wait(timeout=1), "timed out waiting for subscribe callback")
+        wss_client.send.assert_called()
+        (msg,), _ = wss_client.send.call_args
+        self.assertEqual(
+            json.loads(msg),
+            {'method': 'subscribe', 'filters': f},
+        )
+
+    @mock.patch('arvados.events.ws_client.connect')
+    def test_unsubscribe_locking(self, ws_conn):
+        f = [['created_at', '>=', '2023-12-01T01:00:00.000Z']]
+        msg_event, wss_client, self.ws = self.fake_client(ws_conn, f)
+        self.assertTrue(msg_event.wait(timeout=1), "timed out waiting for setup callback")
+        msg_event.clear()
+        wss_client.send.reset_mock()
+        self.ws.unsubscribe(f)
+        self.assertTrue(msg_event.wait(timeout=1), "timed out waiting for unsubscribe callback")
+        wss_client.send.assert_called()
+        (msg,), _ = wss_client.send.call_args
+        self.assertEqual(
+            json.loads(msg),
+            {'method': 'unsubscribe', 'filters': f},
+        )
+
+    @mock.patch('arvados.events.ws_client.connect')
+    def test_resubscribe_locking(self, ws_conn):
+        f = [['created_at', '>=', '2023-12-01T02:00:00.000Z']]
+        msg_event, wss_client, self.ws = self.fake_client(ws_conn, f)
+        self.assertTrue(msg_event.wait(timeout=1), "timed out waiting for setup callback")
+        msg_event.clear()
+        wss_client.send.reset_mock()
+        wss_client.force_disconnect()
+        self.assertTrue(msg_event.wait(timeout=1), "timed out waiting for resubscribe callback")
+        wss_client.send.assert_called()
+        (msg,), _ = wss_client.send.call_args
+        self.assertEqual(
+            json.loads(msg),
+            {'method': 'subscribe', 'filters': f},
+        )
 
 
 class PollClientTestCase(unittest.TestCase):
index b7b3fb61233249beb6c6df48e46c8de4ea678e22..f66194e2a2cd39852d8de4cf45c8aa0a4cc89962 100644 (file)
@@ -15,6 +15,7 @@ import (
        "os"
        "strings"
        "sync"
+       "syscall"
        "time"
 
        "git.arvados.org/arvados.git/lib/config"
@@ -623,7 +624,7 @@ func (s *runSuite) TestChunkPrefix(c *check.C) {
        c.Check(string(lost), check.Equals, "")
 }
 
-func (s *runSuite) TestRunForever(c *check.C) {
+func (s *runSuite) TestRunForever_TriggeredByTimer(c *check.C) {
        s.config.ManagementToken = "xyzzy"
        opts := RunOptions{
                Logger: ctxlog.TestLogger(c),
@@ -639,7 +640,7 @@ func (s *runSuite) TestRunForever(c *check.C) {
 
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()
-       s.config.Collections.BalancePeriod = arvados.Duration(time.Millisecond)
+       s.config.Collections.BalancePeriod = arvados.Duration(10 * time.Millisecond)
        srv := s.newServer(&opts)
 
        done := make(chan bool)
@@ -650,10 +651,11 @@ func (s *runSuite) TestRunForever(c *check.C) {
 
        // Each run should send 4 pull lists + 4 trash lists. The
        // first run should also send 4 empty trash lists at
-       // startup. We should complete all four runs in much less than
-       // a second.
+       // startup. We should complete at least four runs in much less
+       // than 10s.
        for t0 := time.Now(); time.Since(t0) < 10*time.Second; {
-               if pullReqs.Count() >= 16 && trashReqs.Count() == pullReqs.Count()+4 {
+               pulls := pullReqs.Count()
+               if pulls >= 16 && trashReqs.Count() == pulls+4 {
                        break
                }
                time.Sleep(time.Millisecond)
@@ -661,8 +663,70 @@ func (s *runSuite) TestRunForever(c *check.C) {
        cancel()
        <-done
        c.Check(pullReqs.Count() >= 16, check.Equals, true)
-       c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
+       c.Check(trashReqs.Count() >= 20, check.Equals, true)
 
+       // We should have completed 4 runs before calling cancel().
+       // But the next run might also have started before we called
+       // cancel(), in which case the extra run will be included in
+       // the changeset_compute_seconds_count metric.
+       completed := pullReqs.Count() / 4
        metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
-       c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count `+fmt.Sprintf("%d", pullReqs.Count()/4)+`\n.*`)
+       c.Check(metrics, check.Matches, fmt.Sprintf(`(?ms).*\narvados_keepbalance_changeset_compute_seconds_count (%d|%d)\n.*`, completed, completed+1))
+}
+
+func (s *runSuite) TestRunForever_TriggeredBySignal(c *check.C) {
+       s.config.ManagementToken = "xyzzy"
+       opts := RunOptions{
+               Logger: ctxlog.TestLogger(c),
+               Dumper: ctxlog.TestLogger(c),
+       }
+       s.stub.serveCurrentUserAdmin()
+       s.stub.serveFooBarFileCollections()
+       s.stub.serveKeepServices(stubServices)
+       s.stub.serveKeepstoreMounts()
+       s.stub.serveKeepstoreIndexFoo4Bar1()
+       trashReqs := s.stub.serveKeepstoreTrash()
+       pullReqs := s.stub.serveKeepstorePull()
+
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+       s.config.Collections.BalancePeriod = arvados.Duration(time.Minute)
+       srv := s.newServer(&opts)
+
+       done := make(chan bool)
+       go func() {
+               srv.runForever(ctx)
+               close(done)
+       }()
+
+       procself, err := os.FindProcess(os.Getpid())
+       c.Assert(err, check.IsNil)
+
+       // Each run should send 4 pull lists + 4 trash lists. The
+       // first run should also send 4 empty trash lists at
+       // startup. We should be able to complete four runs in much
+       // less than 10s.
+       completedRuns := 0
+       for t0 := time.Now(); time.Since(t0) < 10*time.Second; {
+               pulls := pullReqs.Count()
+               if pulls >= 16 && trashReqs.Count() == pulls+4 {
+                       break
+               }
+               // Once the 1st run has started automatically, we
+               // start sending a single SIGUSR1 at the end of each
+               // run, to ensure we get exactly 4 runs in total.
+               if pulls > 0 && pulls%4 == 0 && pulls <= 12 && pulls/4 > completedRuns {
+                       completedRuns = pulls / 4
+                       c.Logf("completed run %d, sending SIGUSR1 to trigger next run", completedRuns)
+                       procself.Signal(syscall.SIGUSR1)
+               }
+               time.Sleep(time.Millisecond)
+       }
+       cancel()
+       <-done
+       c.Check(pullReqs.Count(), check.Equals, 16)
+       c.Check(trashReqs.Count(), check.Equals, 20)
+
+       metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg)
+       c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 4\n.*`)
 }
index 480791ffa2637da8f282fb28507dbbcb046bcfbf..7a59c1e8c0edace3a8cb7637c6759a4962d44a99 100644 (file)
@@ -100,6 +100,7 @@ func (srv *Server) runForever(ctx context.Context) error {
 
        sigUSR1 := make(chan os.Signal, 1)
        signal.Notify(sigUSR1, syscall.SIGUSR1)
+       defer signal.Stop(sigUSR1)
 
        logger.Info("acquiring service lock")
        dblock.KeepBalanceService.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return srv.DB, nil })
@@ -126,7 +127,6 @@ func (srv *Server) runForever(ctx context.Context) error {
 
                select {
                case <-ctx.Done():
-                       signal.Stop(sigUSR1)
                        return nil
                case <-ticker.C:
                        logger.Print("timer went off")
@@ -135,8 +135,7 @@ func (srv *Server) runForever(ctx context.Context) error {
                        // Reset the timer so we don't start the N+1st
                        // run too soon after the Nth run is triggered
                        // by SIGUSR1.
-                       ticker.Stop()
-                       ticker = time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
+                       ticker.Reset(time.Duration(srv.Cluster.Collections.BalancePeriod))
                }
                logger.Print("starting next run")
        }
index c14789889d6d6631b6a7734bccea719e0020e6a0..5a12e26e9dcdf75ffd1e3e0b7a90ec7176fbe36e 100644 (file)
@@ -1105,6 +1105,17 @@ func (s *IntegrationSuite) TestDirectoryListingWithNoAnonymousToken(c *check.C)
 }
 
 func (s *IntegrationSuite) testDirectoryListing(c *check.C) {
+       // The "ownership cycle" test fixtures are reachable from the
+       // "filter group without filters" group, causing webdav's
+       // walkfs to recurse indefinitely. Avoid that by deleting one
+       // of the bogus fixtures.
+       arv := arvados.NewClientFromEnv()
+       err := arv.RequestAndDecode(nil, "DELETE", "arvados/v1/groups/zzzzz-j7d0g-cx2al9cqkmsf1hs", nil, nil)
+       if err != nil {
+               c.Assert(err, check.FitsTypeOf, &arvados.TransactionError{})
+               c.Check(err.(*arvados.TransactionError).StatusCode, check.Equals, 404)
+       }
+
        s.handler.Cluster.Services.WebDAVDownload.ExternalURL.Host = "download.example.com"
        authHeader := http.Header{
                "Authorization": {"OAuth2 " + arvadostest.ActiveToken},
@@ -1241,8 +1252,32 @@ func (s *IntegrationSuite) testDirectoryListing(c *check.C) {
                        expect:  []string{"waz"},
                        cutDirs: 2,
                },
+               {
+                       uri:     "download.example.com/users/active/This filter group/",
+                       header:  authHeader,
+                       expect:  []string{"A Subproject/"},
+                       cutDirs: 3,
+               },
+               {
+                       uri:     "download.example.com/users/active/This filter group/A Subproject",
+                       header:  authHeader,
+                       expect:  []string{"baz_file/"},
+                       cutDirs: 4,
+               },
+               {
+                       uri:     "download.example.com/by_id/" + arvadostest.AFilterGroupUUID,
+                       header:  authHeader,
+                       expect:  []string{"A Subproject/"},
+                       cutDirs: 2,
+               },
+               {
+                       uri:     "download.example.com/by_id/" + arvadostest.AFilterGroupUUID + "/A Subproject",
+                       header:  authHeader,
+                       expect:  []string{"baz_file/"},
+                       cutDirs: 3,
+               },
        } {
-               comment := check.Commentf("HTML: %q => %q", trial.uri, trial.expect)
+               comment := check.Commentf("HTML: %q redir %q => %q", trial.uri, trial.redirect, trial.expect)
                resp := httptest.NewRecorder()
                u := mustParseURL("//" + trial.uri)
                req := &http.Request{
@@ -1278,6 +1313,7 @@ func (s *IntegrationSuite) testDirectoryListing(c *check.C) {
                } else {
                        c.Check(resp.Code, check.Equals, http.StatusOK, comment)
                        for _, e := range trial.expect {
+                               e = strings.Replace(e, " ", "%20", -1)
                                c.Check(resp.Body.String(), check.Matches, `(?ms).*href="./`+e+`".*`, comment)
                        }
                        c.Check(resp.Body.String(), check.Matches, `(?ms).*--cut-dirs=`+fmt.Sprintf("%d", trial.cutDirs)+` .*`, comment)
@@ -1310,6 +1346,12 @@ func (s *IntegrationSuite) testDirectoryListing(c *check.C) {
                }
                resp = httptest.NewRecorder()
                s.handler.ServeHTTP(resp, req)
+               // This check avoids logging a big XML document in the
+               // event webdav throws a 500 error after sending
+               // headers for a 207.
+               if !c.Check(strings.HasSuffix(resp.Body.String(), "Internal Server Error"), check.Equals, false) {
+                       continue
+               }
                if trial.expect == nil {
                        c.Check(resp.Code, check.Equals, http.StatusUnauthorized, comment)
                } else {
@@ -1320,6 +1362,7 @@ func (s *IntegrationSuite) testDirectoryListing(c *check.C) {
                                } else {
                                        e = filepath.Join(u.Path, e)
                                }
+                               e = strings.Replace(e, " ", "%20", -1)
                                c.Check(resp.Body.String(), check.Matches, `(?ms).*<D:href>`+e+`</D:href>.*`, comment)
                        }
                }
index b3d0b9b418533110ce550da62946c1651af2245b..dd29c40082cb7910da661801c610ba3c753a4271 100644 (file)
@@ -476,7 +476,7 @@ func (s *IntegrationSuite) TestMetrics(c *check.C) {
        c.Check(summaries["request_duration_seconds/get/200"].SampleCount, check.Equals, "3")
        c.Check(summaries["request_duration_seconds/get/404"].SampleCount, check.Equals, "1")
        c.Check(summaries["time_to_status_seconds/get/404"].SampleCount, check.Equals, "1")
-       c.Check(gauges["arvados_keepweb_sessions_cached_session_bytes//"].Value, check.Equals, float64(469))
+       c.Check(gauges["arvados_keepweb_sessions_cached_session_bytes//"].Value, check.Equals, float64(624))
 
        // If the Host header indicates a collection, /metrics.json
        // refers to a file in the collection -- the metrics handler
index 7e70f3ec9d397ddb6af87b4cc1886044bca1dc5f..0d402a63c14465417727843f9239d395c3a8787b 100644 (file)
@@ -174,8 +174,11 @@ packages-in-docker: check-arvados-directory workbench2-build-image
        docker run -t --rm --env ci="true" \
                --env ARVADOS_DIRECTORY=/tmp/arvados \
                --env APP_NAME=${APP_NAME} \
+               --env VERSION="${VERSION}" \
                --env ITERATION=${ITERATION} \
                --env TARGETS="${TARGETS}" \
+               --env MAINTAINER="${MAINTAINER}" \
+               --env DESCRIPTION="${DESCRIPTION}" \
                --env GIT_DISCOVERY_ACROSS_FILESYSTEM=1 \
                -w "/tmp/workbench2" \
                -v ${WORKSPACE}:/tmp/workbench2 \
index 438acbf14dca2608ddf23b9dc3822d4aedbf3dd2..38d9794b18573a0379a3bbba93f53f0b661abe83 100644 (file)
@@ -556,6 +556,53 @@ describe("Process tests", function () {
             });
         });
 
+        it("preserves original ordering of lines within the same log type", function () {
+            const crName = "test_container_request";
+            createContainerRequest(activeUser, crName, "arvados/jobs", ["echo", "hello world"], false, "Committed").then(function (containerRequest) {
+                cy.appendLog(adminUser.token, containerRequest.uuid, "stdout.txt", [
+                    // Should come first
+                    "2023-07-18T20:14:46.000000000Z A out 1",
+                    // Comes fourth in a contiguous block
+                    "2023-07-18T20:14:48.128642814Z A out 2",
+                    "2023-07-18T20:14:48.128642814Z X out 3",
+                    "2023-07-18T20:14:48.128642814Z A out 4",
+                ]).as("stdout");
+
+                cy.appendLog(adminUser.token, containerRequest.uuid, "stderr.txt", [
+                    // Comes second
+                    "2023-07-18T20:14:47.000000000Z Z err 1",
+                    // Comes third in a contiguous block
+                    "2023-07-18T20:14:48.128642814Z B err 2",
+                    "2023-07-18T20:14:48.128642814Z C err 3",
+                    "2023-07-18T20:14:48.128642814Z Y err 4",
+                    "2023-07-18T20:14:48.128642814Z Z err 5",
+                    "2023-07-18T20:14:48.128642814Z A err 6",
+                ]).as("stderr");
+
+                cy.loginAs(activeUser);
+                cy.goToPath(`/processes/${containerRequest.uuid}`);
+                cy.get("[data-cy=process-details]").should("contain", crName);
+                cy.get("[data-cy=process-logs]").should("contain", "No logs yet");
+
+                cy.getAll("@stdout", "@stderr").then(() => {
+                    // Switch to All logs
+                    cy.get("[data-cy=process-logs-filter]").click();
+                    cy.get("body").contains("li", "All logs").click();
+                    // Verify sorted logs
+                    cy.get("[data-cy=process-logs] pre").eq(0).should("contain", "2023-07-18T20:14:46.000000000Z A out 1");
+                    cy.get("[data-cy=process-logs] pre").eq(1).should("contain", "2023-07-18T20:14:47.000000000Z Z err 1");
+                    cy.get("[data-cy=process-logs] pre").eq(2).should("contain", "2023-07-18T20:14:48.128642814Z B err 2");
+                    cy.get("[data-cy=process-logs] pre").eq(3).should("contain", "2023-07-18T20:14:48.128642814Z C err 3");
+                    cy.get("[data-cy=process-logs] pre").eq(4).should("contain", "2023-07-18T20:14:48.128642814Z Y err 4");
+                    cy.get("[data-cy=process-logs] pre").eq(5).should("contain", "2023-07-18T20:14:48.128642814Z Z err 5");
+                    cy.get("[data-cy=process-logs] pre").eq(6).should("contain", "2023-07-18T20:14:48.128642814Z A err 6");
+                    cy.get("[data-cy=process-logs] pre").eq(7).should("contain", "2023-07-18T20:14:48.128642814Z A out 2");
+                    cy.get("[data-cy=process-logs] pre").eq(8).should("contain", "2023-07-18T20:14:48.128642814Z X out 3");
+                    cy.get("[data-cy=process-logs] pre").eq(9).should("contain", "2023-07-18T20:14:48.128642814Z A out 4");
+                });
+            });
+        });
+
         it("correctly generates sniplines", function () {
             const SNIPLINE = `================ ✀ ================ ✀ ========= Some log(s) were skipped ========= ✀ ================ ✀ ================`;
             const crName = "test_container_request";
index d21d067540ce2b6d18954ed01c36825f56b02c3a..abb204907be5b5ac12aa25c994d4473cb18a2800 100644 (file)
@@ -3,8 +3,8 @@
   "version": "0.1.0",
   "private": true,
   "dependencies": {
-    "@coreui/coreui": "next",
-    "@coreui/react": "next",
+    "@coreui/coreui": "^4.3.2",
+    "@coreui/react": "^4.11.0",
     "@date-io/date-fns": "1",
     "@fortawesome/fontawesome-svg-core": "1.2.28",
     "@fortawesome/free-solid-svg-icons": "5.13.0",
index 4e52431eebadc9a05b240ee530cd29e3390569e4..88b56a2c324379c2b9be44c859da7ad9e05eb28e 100644 (file)
@@ -33,6 +33,12 @@ type FileWithProgress = {
     lastByte: number;
 }
 
+type SortableLine = {
+    logType: LogEventType,
+    timestamp: string;
+    contents: string;
+}
+
 export type ProcessLogsPanelAction = UnionOf<typeof processLogsPanelActions>;
 
 export const setProcessLogsPanelFilter = (filter: string) =>
@@ -257,18 +263,61 @@ const mergeMultilineLoglines = (logFragments: LogFragment[]) => (
  * @returns string[] of merged and sorted log lines
  */
 const mergeSortLogFragments = (logFragments: LogFragment[]): string[] => {
-    const sortableLines = fragmentsToLines(logFragments
-        .filter((fragment) => (!NON_SORTED_LOG_TYPES.includes(fragment.logType))));
+    const sortableFragments = logFragments
+        .filter((fragment) => (!NON_SORTED_LOG_TYPES.includes(fragment.logType)));
 
     const nonSortableLines = fragmentsToLines(logFragments
         .filter((fragment) => (NON_SORTED_LOG_TYPES.includes(fragment.logType)))
         .sort((a, b) => (a.logType.localeCompare(b.logType))));
 
-    return [...nonSortableLines, ...sortableLines.sort(sortLogLines)]
+    return [...nonSortableLines, ...sortLogFragments(sortableFragments)];
 };
 
-const sortLogLines = (a: string, b: string) => {
-    return a.localeCompare(b);
+/**
+ * Performs merge and sort of input log fragment lines
+ * @param logFragments set of sortable log fragments to be merged and sorted
+ * @returns A string array containing all lines, sorted by timestamp and
+ *          preserving line ordering and type grouping when timestamps match
+ */
+const sortLogFragments = (logFragments: LogFragment[]): string[] => {
+    const linesWithType: SortableLine[] = logFragments
+        // Map each logFragment into an array of SortableLine
+        .map((fragment: LogFragment): SortableLine[] => (
+            fragment.contents.map((singleLine: string) => {
+                const timestampMatch = singleLine.match(LOG_TIMESTAMP_PATTERN);
+                const timestamp = timestampMatch && timestampMatch[0] ? timestampMatch[0] : "";
+                return {
+                    logType: fragment.logType,
+                    timestamp: timestamp,
+                    contents: singleLine,
+                };
+            })
+        // Merge each array of SortableLine into single array
+        )).reduce((acc: SortableLine[], lines: SortableLine[]) => (
+            [...acc, ...lines]
+        ), [] as SortableLine[]);
+
+    return linesWithType
+        .sort(sortableLineSortFunc)
+        .map(lineWithType => lineWithType.contents);
+};
+
+/**
+ * Sort func to sort lines
+ *   Preserves original ordering of lines from the same source
+ *   Stably orders lines of differing type but same timestamp
+ *     (produces a block of same-timestamped lines of one type before a block
+ *     of same timestamped lines of another type for readability)
+ *   Sorts all other lines by contents (ie by timestamp)
+ */
+const sortableLineSortFunc = (a: SortableLine, b: SortableLine) => {
+    if (a.logType === b.logType) {
+        return 0;
+    } else if (a.timestamp === b.timestamp) {
+        return a.logType.localeCompare(b.logType);
+    } else {
+        return a.contents.localeCompare(b.contents);
+    }
 };
 
 const fragmentsToLines = (fragments: LogFragment[]): string[] => (
index dcabbe0d4221c5ca594434640c24743f0fd87664..18934f24c7e1d8e9d3ad777d0a324d7eece78075 100644 (file)
@@ -1646,23 +1646,25 @@ __metadata:
   languageName: node
   linkType: hard
 
-"@coreui/coreui@npm:next":
-  version: 5.0.0-alpha.3
-  resolution: "@coreui/coreui@npm:5.0.0-alpha.3"
+"@coreui/coreui@npm:^4.3.2":
+  version: 4.3.2
+  resolution: "@coreui/coreui@npm:4.3.2"
+  dependencies:
+    postcss-combine-duplicated-selectors: ^10.0.3
   peerDependencies:
-    "@popperjs/core": ^2.11.8
-  checksum: 2363ad6be775c6a895a49126a5b9062ffa9ebd0bea6dfb835c1300cd122fb1cf18d85fe647a9c08a3a384caa871e761d8ffb28ea45c7872cb2b034df6527da20
+    "@popperjs/core": ^2.11.6
+  checksum: 88fc70f4f681bb796e1d81ca8472a3d36bfcf92866fc7c6810ead850bc371c99bca123a94abb0fafdf2935972d130005cd62b485406631cfd9abd8f38e14be15
   languageName: node
   linkType: hard
 
-"@coreui/react@npm:next":
-  version: 5.0.0-alpha.3
-  resolution: "@coreui/react@npm:5.0.0-alpha.3"
+"@coreui/react@npm:^4.11.0":
+  version: 4.11.0
+  resolution: "@coreui/react@npm:4.11.0"
   peerDependencies:
-    "@coreui/coreui": ^5.0.0-alpha.2
+    "@coreui/coreui": 4.3.0
     react: ">=17"
     react-dom: ">=17"
-  checksum: efd333cc346307219dcf7fe183eed65305b12e71984bcb940d80a55509d7b92523082e37045bfcb8c4b334920ca185128a9f72f3e8bec69d15cad889cbeda4b4
+  checksum: 75c9394125e41e24fb5855b82cba93c9abeea080f9ee5bcc063ff2e581318b85c5bbef6f2c5300f5fd7a3450743488daa29b4baee6feabec38a009a452876a88
   languageName: node
   linkType: hard
 
@@ -3820,8 +3822,8 @@ __metadata:
   version: 0.0.0-use.local
   resolution: "arvados-workbench-2@workspace:."
   dependencies:
-    "@coreui/coreui": next
-    "@coreui/react": next
+    "@coreui/coreui": ^4.3.2
+    "@coreui/react": ^4.11.0
     "@date-io/date-fns": 1
     "@fortawesome/fontawesome-svg-core": 1.2.28
     "@fortawesome/free-solid-svg-icons": 5.13.0
@@ -14138,6 +14140,17 @@ __metadata:
   languageName: node
   linkType: hard
 
+"postcss-combine-duplicated-selectors@npm:^10.0.3":
+  version: 10.0.3
+  resolution: "postcss-combine-duplicated-selectors@npm:10.0.3"
+  dependencies:
+    postcss-selector-parser: ^6.0.4
+  peerDependencies:
+    postcss: ^8.1.0
+  checksum: 45c3dff41d0cddb510752ed92fe8c7fc66e5cf88f4988314655419d3ecdf1dc66f484a25ee73f4f292da5da851a0fdba0ec4d59bdedeee935d05b26d31d997ed
+  languageName: node
+  linkType: hard
+
 "postcss-convert-values@npm:^4.0.1":
   version: 4.0.1
   resolution: "postcss-convert-values@npm:4.0.1"
@@ -14782,6 +14795,16 @@ __metadata:
   languageName: node
   linkType: hard
 
+"postcss-selector-parser@npm:^6.0.4":
+  version: 6.0.13
+  resolution: "postcss-selector-parser@npm:6.0.13"
+  dependencies:
+    cssesc: ^3.0.0
+    util-deprecate: ^1.0.2
+  checksum: f89163338a1ce3b8ece8e9055cd5a3165e79a15e1c408e18de5ad8f87796b61ec2d48a2902d179ae0c4b5de10fccd3a325a4e660596549b040bc5ad1b465f096
+  languageName: node
+  linkType: hard
+
 "postcss-svgo@npm:^4.0.3":
   version: 4.0.3
   resolution: "postcss-svgo@npm:4.0.3"
index 66bc6ac7facdeb8115ba880b8e31e3db4bdf2e06..98ec762147ac1a57c30ec70b0261affcf3644683 100644 (file)
@@ -30,6 +30,7 @@ var (
                "name",
                "owner_uuid",
                "portable_data_hash",
+               "requesting_container_uuid",
                "state",
        }
 
index 4c573b0edf2e438d05c1f3da3954482fdac1c72d..b7c176af45715d7c232cfd34937e54990f506535 100755 (executable)
@@ -630,6 +630,8 @@ sv stop keepproxy
 cd /usr/src/arvados/services/api
 export DISABLE_DATABASE_ENVIRONMENT_CHECK=1
 export RAILS_ENV=development
+export GEM_HOME=/var/lib/arvados-arvbox/.gem
+env
 bin/bundle exec rake db:drop
 rm $ARVADOS_CONTAINER_PATH/api_database_setup
 rm $ARVADOS_CONTAINER_PATH/superuser_token
index 9b27e90009c7ee26af20ced5957f06c55a8de0a2..9c5df83c0e91b25f523531a5512b7efa00c2370f 100644 (file)
@@ -2,8 +2,8 @@
 #
 # SPDX-License-Identifier: AGPL-3.0
 
-export RUBY_VERSION=2.7.0
-export BUNDLER_VERSION=2.2.19
+export RUBY_VERSION=3.2.2
+export BUNDLER_VERSION=2.4.22
 
 export DEBIAN_FRONTEND=noninteractive
 export PATH=${PATH}:/usr/local/go/bin:/var/lib/arvados/bin:/usr/src/arvados/sdk/cli/binstubs
@@ -67,24 +67,38 @@ fi
 
 run_bundler() {
     flock $GEMLOCK /var/lib/arvados/bin/gem install --no-document --user bundler:$BUNDLER_VERSION
-    if test -f Gemfile.lock ; then
-        frozen=--frozen
-    else
-        frozen=""
-    fi
+
     BUNDLER=bundle
     if test -x $PWD/bin/bundle ; then
        # If present, use the one associated with rails API
        BUNDLER=$PWD/bin/bundle
     fi
 
+    # Use Gemfile.lock only if it is git tracked.
+    if git ls-files --error-unmatch Gemfile.lock ; then
+       flock $GEMLOCK $BUNDLER config set --local frozen true
+    else
+       flock $GEMLOCK $BUNDLER config set --local frozen false
+    fi
+    flock $GEMLOCK $BUNDLER config set --local deployment false
+
     if test -z "$(flock $GEMLOCK /var/lib/arvados/bin/gem list | grep 'arvados[[:blank:]].*[0-9.]*dev')" ; then
         (cd /usr/src/arvados/sdk/ruby && \
         /var/lib/arvados/bin/gem build arvados.gemspec && flock $GEMLOCK /var/lib/arvados/bin/gem install $(ls -1 *.gem | sort -r | head -n1))
     fi
-    if ! flock $GEMLOCK $BUNDLER install --verbose --local --no-deployment $frozen "$@" ; then
-        flock $GEMLOCK $BUNDLER install --verbose --no-deployment $frozen "$@"
+
+    if ! flock $GEMLOCK $BUNDLER install --verbose --local "$@" ; then
+        flock $GEMLOCK $BUNDLER install --verbose "$@"
+    fi
+}
+
+bundler_binstubs() {
+    BUNDLER=bundle
+    if test -x $PWD/bin/bundle ; then
+       # If present, use the one associated with rails API
+       BUNDLER=$PWD/bin/bundle
     fi
+    flock $GEMLOCK $BUNDLER binstubs --all
 }
 
 PYCMD=""
index 03eac65cec5cdbc3bf0defaaaf644b7b312e9399..a5af64281b13f7bf11f359cd74e055d52d337d85 100644 (file)
@@ -7,6 +7,7 @@ export GOPATH=/var/lib/gopath
 mkdir -p $GOPATH
 
 cd /usr/src/arvados
+RUNSU=""
 if [[ $UID = 0 ]] ; then
   RUNSU="/usr/local/lib/arvbox/runsu.sh"
 fi
index e92870c3eef9b8ff1741fbe7accd11ff9ea75ee1..c1a6775883e2dc47bf5de37f790396b05b6541f9 100755 (executable)
@@ -123,17 +123,17 @@ http {
     # rewrite ^/trash /trash redirect;
 
     # Redirects that include a uuid
-    rewrite ^/work_units/(.*) /processes/$1 redirect;
-    rewrite ^/container_requests/(.*) /processes/$1 redirect;
-    rewrite ^/users/(.*) /user/$1 redirect;
-    rewrite ^/groups/(.*) /group/$1 redirect;
+    rewrite ^/work_units/(.*) /processes/\$1 redirect;
+    rewrite ^/container_requests/(.*) /processes/\$1 redirect;
+    rewrite ^/users/(.*) /user/\$1 redirect;
+    rewrite ^/groups/(.*) /group/\$1 redirect;
 
     # Special file download redirects
     if (\$arg_disposition = attachment) {
-      rewrite ^/collections/([^/]*)/(.*) /?redirectToDownload=/c=$1/$2? redirect;
+      rewrite ^/collections/([^/]*)/(.*) /?redirectToDownload=/c=\$1/\$2? redirect;
     }
     if (\$arg_disposition = inline) {
-      rewrite ^/collections/([^/]*)/(.*) /?redirectToPreview=/c=$1/$2? redirect;
+      rewrite ^/collections/([^/]*)/(.*) /?redirectToPreview=/c=\$1/\$2? redirect;
     }
 
     # Redirects that go to a roughly equivalent page
index d3ff7e868345b383fb7c98e27a88a36ad44db1ed..5bff5610529d43688340d0181ae5f8342437709f 100755 (executable)
@@ -15,10 +15,12 @@ download_cache = /var/lib/pip
 EOF
 
 cd /usr/src/arvados/sdk/ruby
-run_bundler --binstubs=binstubs
+run_bundler
+bundler_binstubs
 
 cd /usr/src/arvados/sdk/cli
-run_bundler --binstubs=binstubs
+run_bundler
+bundler_binstubs
 
 export PYCMD=python3
 
index 2079bb1d0beb7ea44f3a02856f389e2f3fa2fa20..5e952b8d5c7e167c31ee3f5fce2cd65eb8bbd810 100755 (executable)
@@ -16,7 +16,8 @@ if test "$1" != "--only-deps" ; then
 fi
 
 cd /usr/src/arvados/services/login-sync
-run_bundler --binstubs=binstubs
+run_bundler
+bundler_binstubs
 
 if test "$1" = "--only-deps" ; then
     exit