15652: Limit concurrent writes per filesystem, not per flush.
[arvados.git] / sdk / go / arvados / fs_site.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "os"
9         "strings"
10         "sync"
11         "time"
12 )
13
14 type CustomFileSystem interface {
15         FileSystem
16         MountByID(mount string)
17         MountProject(mount, uuid string)
18         MountUsers(mount string)
19 }
20
21 type customFileSystem struct {
22         fileSystem
23         root *vdirnode
24         thr  *throttle
25
26         staleThreshold time.Time
27         staleLock      sync.Mutex
28 }
29
30 func (c *Client) CustomFileSystem(kc keepClient) CustomFileSystem {
31         root := &vdirnode{}
32         fs := &customFileSystem{
33                 root: root,
34                 fileSystem: fileSystem{
35                         fsBackend: keepBackend{apiClient: c, keepClient: kc},
36                         root:      root,
37                         thr:       newThrottle(concurrentWriters),
38                 },
39         }
40         root.inode = &treenode{
41                 fs:     fs,
42                 parent: root,
43                 fileinfo: fileinfo{
44                         name:    "/",
45                         mode:    os.ModeDir | 0755,
46                         modTime: time.Now(),
47                 },
48                 inodes: make(map[string]inode),
49         }
50         return fs
51 }
52
53 func (fs *customFileSystem) MountByID(mount string) {
54         fs.root.inode.Child(mount, func(inode) (inode, error) {
55                 return &vdirnode{
56                         inode: &treenode{
57                                 fs:     fs,
58                                 parent: fs.root,
59                                 inodes: make(map[string]inode),
60                                 fileinfo: fileinfo{
61                                         name:    mount,
62                                         modTime: time.Now(),
63                                         mode:    0755 | os.ModeDir,
64                                 },
65                         },
66                         create: fs.mountByID,
67                 }, nil
68         })
69 }
70
71 func (fs *customFileSystem) MountProject(mount, uuid string) {
72         fs.root.inode.Child(mount, func(inode) (inode, error) {
73                 return fs.newProjectNode(fs.root, mount, uuid), nil
74         })
75 }
76
77 func (fs *customFileSystem) MountUsers(mount string) {
78         fs.root.inode.Child(mount, func(inode) (inode, error) {
79                 return &lookupnode{
80                         stale:   fs.Stale,
81                         loadOne: fs.usersLoadOne,
82                         loadAll: fs.usersLoadAll,
83                         inode: &treenode{
84                                 fs:     fs,
85                                 parent: fs.root,
86                                 inodes: make(map[string]inode),
87                                 fileinfo: fileinfo{
88                                         name:    mount,
89                                         modTime: time.Now(),
90                                         mode:    0755 | os.ModeDir,
91                                 },
92                         },
93                 }, nil
94         })
95 }
96
97 // SiteFileSystem returns a FileSystem that maps collections and other
98 // Arvados objects onto a filesystem layout.
99 //
100 // This is experimental: the filesystem layout is not stable, and
101 // there are significant known bugs and shortcomings. For example,
102 // writes are not persisted until Sync() is called.
103 func (c *Client) SiteFileSystem(kc keepClient) CustomFileSystem {
104         fs := c.CustomFileSystem(kc)
105         fs.MountByID("by_id")
106         fs.MountUsers("users")
107         return fs
108 }
109
110 func (fs *customFileSystem) Sync() error {
111         fs.staleLock.Lock()
112         defer fs.staleLock.Unlock()
113         fs.staleThreshold = time.Now()
114         return nil
115 }
116
117 // Stale returns true if information obtained at time t should be
118 // considered stale.
119 func (fs *customFileSystem) Stale(t time.Time) bool {
120         fs.staleLock.Lock()
121         defer fs.staleLock.Unlock()
122         return !fs.staleThreshold.Before(t)
123 }
124
125 func (fs *customFileSystem) newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error) {
126         return nil, ErrInvalidOperation
127 }
128
129 func (fs *customFileSystem) mountByID(parent inode, id string) inode {
130         if strings.Contains(id, "-4zz18-") || pdhRegexp.MatchString(id) {
131                 return fs.mountCollection(parent, id)
132         } else if strings.Contains(id, "-j7d0g-") {
133                 return fs.newProjectNode(fs.root, id, id)
134         } else {
135                 return nil
136         }
137 }
138
139 func (fs *customFileSystem) mountCollection(parent inode, id string) inode {
140         var coll Collection
141         err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, nil)
142         if err != nil {
143                 return nil
144         }
145         cfs, err := coll.FileSystem(fs, fs)
146         if err != nil {
147                 return nil
148         }
149         root := cfs.rootnode()
150         root.SetParent(parent, id)
151         return root
152 }
153
154 func (fs *customFileSystem) newProjectNode(root inode, name, uuid string) inode {
155         return &lookupnode{
156                 stale:   fs.Stale,
157                 loadOne: func(parent inode, name string) (inode, error) { return fs.projectsLoadOne(parent, uuid, name) },
158                 loadAll: func(parent inode) ([]inode, error) { return fs.projectsLoadAll(parent, uuid) },
159                 inode: &treenode{
160                         fs:     fs,
161                         parent: root,
162                         inodes: make(map[string]inode),
163                         fileinfo: fileinfo{
164                                 name:    name,
165                                 modTime: time.Now(),
166                                 mode:    0755 | os.ModeDir,
167                         },
168                 },
169         }
170 }
171
172 // vdirnode wraps an inode by ignoring any requests to add/replace
173 // children, and calling a create() func when a non-existing child is
174 // looked up.
175 //
176 // create() can return either a new node, which will be added to the
177 // treenode, or nil for ENOENT.
178 type vdirnode struct {
179         inode
180         create func(parent inode, name string) inode
181 }
182
183 func (vn *vdirnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
184         return vn.inode.Child(name, func(existing inode) (inode, error) {
185                 if existing == nil && vn.create != nil {
186                         existing = vn.create(vn, name)
187                         if existing != nil {
188                                 existing.SetParent(vn, name)
189                                 vn.inode.(*treenode).fileinfo.modTime = time.Now()
190                         }
191                 }
192                 if replace == nil {
193                         return existing, nil
194                 } else if tryRepl, err := replace(existing); err != nil {
195                         return existing, err
196                 } else if tryRepl != existing {
197                         return existing, ErrInvalidArgument
198                 } else {
199                         return existing, nil
200                 }
201         })
202 }