b221aaa083a12fd1f13fd3eaec3d9c0f85ae61b5
[arvados.git] / sdk / go / arvados / fs_collection_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "errors"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "math/rand"
16         "net/http"
17         "os"
18         "regexp"
19         "runtime"
20         "runtime/pprof"
21         "strings"
22         "sync"
23         "sync/atomic"
24         "testing"
25         "time"
26
27         check "gopkg.in/check.v1"
28 )
29
30 var _ = check.Suite(&CollectionFSSuite{})
31
32 type keepClientStub struct {
33         blocks      map[string][]byte
34         refreshable map[string]bool
35         reads       []string             // locators from ReadAt() calls
36         onWrite     func(bufcopy []byte) // called from WriteBlock, before acquiring lock
37         authToken   string               // client's auth token (used for signing locators)
38         sigkey      string               // blob signing key
39         sigttl      time.Duration        // blob signing ttl
40         sync.RWMutex
41 }
42
43 var errStub404 = errors.New("404 block not found")
44
45 func (kcs *keepClientStub) ReadAt(locator string, p []byte, off int) (int, error) {
46         kcs.Lock()
47         kcs.reads = append(kcs.reads, locator)
48         kcs.Unlock()
49         kcs.RLock()
50         defer kcs.RUnlock()
51         if err := VerifySignature(locator, kcs.authToken, kcs.sigttl, []byte(kcs.sigkey)); err != nil {
52                 return 0, err
53         }
54         buf := kcs.blocks[locator[:32]]
55         if buf == nil {
56                 return 0, errStub404
57         }
58         return copy(p, buf[off:]), nil
59 }
60
61 func (kcs *keepClientStub) BlockWrite(_ context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
62         if opts.Data == nil {
63                 panic("oops, stub is not made for this")
64         }
65         locator := SignLocator(fmt.Sprintf("%x+%d", md5.Sum(opts.Data), len(opts.Data)), kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
66         buf := make([]byte, len(opts.Data))
67         copy(buf, opts.Data)
68         if kcs.onWrite != nil {
69                 kcs.onWrite(buf)
70         }
71         for _, sc := range opts.StorageClasses {
72                 if sc != "default" {
73                         return BlockWriteResponse{}, fmt.Errorf("stub does not write storage class %q", sc)
74                 }
75         }
76         kcs.Lock()
77         defer kcs.Unlock()
78         kcs.blocks[locator[:32]] = buf
79         return BlockWriteResponse{Locator: locator, Replicas: 1}, nil
80 }
81
82 var reRemoteSignature = regexp.MustCompile(`\+[AR][^+]*`)
83
84 func (kcs *keepClientStub) LocalLocator(locator string) (string, error) {
85         if strings.Contains(locator, "+A") {
86                 return locator, nil
87         }
88         kcs.Lock()
89         defer kcs.Unlock()
90         if strings.Contains(locator, "+R") {
91                 if len(locator) < 32 {
92                         return "", fmt.Errorf("bad locator: %q", locator)
93                 }
94                 if _, ok := kcs.blocks[locator[:32]]; !ok && !kcs.refreshable[locator[:32]] {
95                         return "", fmt.Errorf("kcs.refreshable[%q]==false", locator)
96                 }
97         }
98         locator = reRemoteSignature.ReplaceAllLiteralString(locator, "")
99         locator = SignLocator(locator, kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
100         return locator, nil
101 }
102
103 type CollectionFSSuite struct {
104         client *Client
105         coll   Collection
106         fs     CollectionFileSystem
107         kc     *keepClientStub
108 }
109
110 func (s *CollectionFSSuite) SetUpTest(c *check.C) {
111         s.client = NewClientFromEnv()
112         s.client.AuthToken = fixtureActiveToken
113         err := s.client.RequestAndDecode(&s.coll, "GET", "arvados/v1/collections/"+fixtureFooAndBarFilesInDirUUID, nil, nil)
114         c.Assert(err, check.IsNil)
115         s.kc = &keepClientStub{
116                 blocks: map[string][]byte{
117                         "3858f62230ac3c915f300c664312c63f": []byte("foobar"),
118                 },
119                 sigkey:    fixtureBlobSigningKey,
120                 sigttl:    fixtureBlobSigningTTL,
121                 authToken: fixtureActiveToken,
122         }
123         s.fs, err = s.coll.FileSystem(s.client, s.kc)
124         c.Assert(err, check.IsNil)
125 }
126
127 func (s *CollectionFSSuite) TestHttpFileSystemInterface(c *check.C) {
128         _, ok := s.fs.(http.FileSystem)
129         c.Check(ok, check.Equals, true)
130 }
131
132 func (s *CollectionFSSuite) TestUnattainableStorageClasses(c *check.C) {
133         fs, err := (&Collection{
134                 StorageClassesDesired: []string{"unobtainium"},
135         }).FileSystem(s.client, s.kc)
136         c.Assert(err, check.IsNil)
137
138         f, err := fs.OpenFile("/foo", os.O_CREATE|os.O_WRONLY, 0777)
139         c.Assert(err, check.IsNil)
140         _, err = f.Write([]byte("food"))
141         c.Assert(err, check.IsNil)
142         err = f.Close()
143         c.Assert(err, check.IsNil)
144         _, err = fs.MarshalManifest(".")
145         c.Assert(err, check.ErrorMatches, `.*stub does not write storage class \"unobtainium\"`)
146 }
147
148 func (s *CollectionFSSuite) TestColonInFilename(c *check.C) {
149         fs, err := (&Collection{
150                 ManifestText: "./foo:foo 3858f62230ac3c915f300c664312c63f+3 0:3:bar:bar\n",
151         }).FileSystem(s.client, s.kc)
152         c.Assert(err, check.IsNil)
153
154         f, err := fs.Open("/foo:foo")
155         c.Assert(err, check.IsNil)
156
157         fis, err := f.Readdir(0)
158         c.Check(err, check.IsNil)
159         c.Check(len(fis), check.Equals, 1)
160         c.Check(fis[0].Name(), check.Equals, "bar:bar")
161 }
162
163 func (s *CollectionFSSuite) TestReaddirFull(c *check.C) {
164         f, err := s.fs.Open("/dir1")
165         c.Assert(err, check.IsNil)
166
167         st, err := f.Stat()
168         c.Assert(err, check.IsNil)
169         c.Check(st.Size(), check.Equals, int64(2))
170         c.Check(st.IsDir(), check.Equals, true)
171
172         fis, err := f.Readdir(0)
173         c.Check(err, check.IsNil)
174         c.Check(len(fis), check.Equals, 2)
175         if len(fis) > 0 {
176                 c.Check(fis[0].Size(), check.Equals, int64(3))
177         }
178 }
179
180 func (s *CollectionFSSuite) TestReaddirLimited(c *check.C) {
181         f, err := s.fs.Open("./dir1")
182         c.Assert(err, check.IsNil)
183
184         fis, err := f.Readdir(1)
185         c.Check(err, check.IsNil)
186         c.Check(len(fis), check.Equals, 1)
187         if len(fis) > 0 {
188                 c.Check(fis[0].Size(), check.Equals, int64(3))
189         }
190
191         fis, err = f.Readdir(1)
192         c.Check(err, check.IsNil)
193         c.Check(len(fis), check.Equals, 1)
194         if len(fis) > 0 {
195                 c.Check(fis[0].Size(), check.Equals, int64(3))
196         }
197
198         fis, err = f.Readdir(1)
199         c.Check(len(fis), check.Equals, 0)
200         c.Check(err, check.NotNil)
201         c.Check(err, check.Equals, io.EOF)
202
203         f, err = s.fs.Open("dir1")
204         c.Assert(err, check.IsNil)
205         fis, err = f.Readdir(1)
206         c.Check(len(fis), check.Equals, 1)
207         c.Assert(err, check.IsNil)
208         fis, err = f.Readdir(2)
209         c.Check(len(fis), check.Equals, 1)
210         c.Assert(err, check.IsNil)
211         fis, err = f.Readdir(2)
212         c.Check(len(fis), check.Equals, 0)
213         c.Assert(err, check.Equals, io.EOF)
214 }
215
216 func (s *CollectionFSSuite) TestPathMunge(c *check.C) {
217         for _, path := range []string{".", "/", "./", "///", "/../", "/./.."} {
218                 f, err := s.fs.Open(path)
219                 c.Assert(err, check.IsNil)
220
221                 st, err := f.Stat()
222                 c.Assert(err, check.IsNil)
223                 c.Check(st.Size(), check.Equals, int64(1))
224                 c.Check(st.IsDir(), check.Equals, true)
225         }
226         for _, path := range []string{"/dir1", "dir1", "./dir1", "///dir1//.//", "../dir1/../dir1/"} {
227                 c.Logf("%q", path)
228                 f, err := s.fs.Open(path)
229                 c.Assert(err, check.IsNil)
230
231                 st, err := f.Stat()
232                 c.Assert(err, check.IsNil)
233                 c.Check(st.Size(), check.Equals, int64(2))
234                 c.Check(st.IsDir(), check.Equals, true)
235         }
236 }
237
238 func (s *CollectionFSSuite) TestNotExist(c *check.C) {
239         for _, path := range []string{"/no", "no", "./no", "n/o", "/n/o"} {
240                 f, err := s.fs.Open(path)
241                 c.Assert(f, check.IsNil)
242                 c.Assert(err, check.NotNil)
243                 c.Assert(os.IsNotExist(err), check.Equals, true)
244         }
245 }
246
247 func (s *CollectionFSSuite) TestReadOnlyFile(c *check.C) {
248         f, err := s.fs.OpenFile("/dir1/foo", os.O_RDONLY, 0)
249         c.Assert(err, check.IsNil)
250         st, err := f.Stat()
251         c.Assert(err, check.IsNil)
252         c.Check(st.Size(), check.Equals, int64(3))
253         n, err := f.Write([]byte("bar"))
254         c.Check(n, check.Equals, 0)
255         c.Check(err, check.Equals, ErrReadOnlyFile)
256 }
257
258 func (s *CollectionFSSuite) TestCreateFile(c *check.C) {
259         f, err := s.fs.OpenFile("/new-file 1", os.O_RDWR|os.O_CREATE, 0)
260         c.Assert(err, check.IsNil)
261         st, err := f.Stat()
262         c.Assert(err, check.IsNil)
263         c.Check(st.Size(), check.Equals, int64(0))
264
265         n, err := f.Write([]byte("bar"))
266         c.Check(n, check.Equals, 3)
267         c.Check(err, check.IsNil)
268
269         c.Check(f.Close(), check.IsNil)
270
271         f, err = s.fs.OpenFile("/new-file 1", os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
272         c.Check(f, check.IsNil)
273         c.Assert(err, check.NotNil)
274
275         f, err = s.fs.OpenFile("/new-file 1", os.O_RDWR, 0)
276         c.Assert(err, check.IsNil)
277         st, err = f.Stat()
278         c.Assert(err, check.IsNil)
279         c.Check(st.Size(), check.Equals, int64(3))
280
281         c.Check(f.Close(), check.IsNil)
282
283         m, err := s.fs.MarshalManifest(".")
284         c.Assert(err, check.IsNil)
285         c.Check(m, check.Matches, `. 37b51d194a7513e45b56f6524f2d51f2\+3\+\S+ 0:3:new-file\\0401\n./dir1 .* 3:3:bar 0:3:foo\n`)
286 }
287
288 func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
289         maxBlockSize = 8
290         defer func() { maxBlockSize = 2 << 26 }()
291
292         f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
293         c.Assert(err, check.IsNil)
294         defer f.Close()
295         st, err := f.Stat()
296         c.Assert(err, check.IsNil)
297         c.Check(st.Size(), check.Equals, int64(3))
298
299         f2, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
300         c.Assert(err, check.IsNil)
301         defer f2.Close()
302
303         buf := make([]byte, 64)
304         n, err := f.Read(buf)
305         c.Check(n, check.Equals, 3)
306         c.Check(err, check.Equals, io.EOF)
307         c.Check(string(buf[:3]), check.DeepEquals, "foo")
308
309         pos, err := f.Seek(-2, io.SeekCurrent)
310         c.Check(pos, check.Equals, int64(1))
311         c.Check(err, check.IsNil)
312
313         // Split a storedExtent in two, and insert a memExtent
314         n, err = f.Write([]byte("*"))
315         c.Check(n, check.Equals, 1)
316         c.Check(err, check.IsNil)
317
318         pos, err = f.Seek(0, io.SeekCurrent)
319         c.Check(pos, check.Equals, int64(2))
320         c.Check(err, check.IsNil)
321
322         pos, err = f.Seek(0, io.SeekStart)
323         c.Check(pos, check.Equals, int64(0))
324         c.Check(err, check.IsNil)
325
326         rbuf, err := ioutil.ReadAll(f)
327         c.Check(len(rbuf), check.Equals, 3)
328         c.Check(err, check.IsNil)
329         c.Check(string(rbuf), check.Equals, "f*o")
330
331         // Write multiple blocks in one call
332         f.Seek(1, io.SeekStart)
333         n, err = f.Write([]byte("0123456789abcdefg"))
334         c.Check(n, check.Equals, 17)
335         c.Check(err, check.IsNil)
336         pos, err = f.Seek(0, io.SeekCurrent)
337         c.Check(pos, check.Equals, int64(18))
338         c.Check(err, check.IsNil)
339         pos, err = f.Seek(-18, io.SeekCurrent)
340         c.Check(pos, check.Equals, int64(0))
341         c.Check(err, check.IsNil)
342         n, err = io.ReadFull(f, buf)
343         c.Check(n, check.Equals, 18)
344         c.Check(err, check.Equals, io.ErrUnexpectedEOF)
345         c.Check(string(buf[:n]), check.Equals, "f0123456789abcdefg")
346
347         buf2, err := ioutil.ReadAll(f2)
348         c.Check(err, check.IsNil)
349         c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
350
351         // truncate to current size
352         err = f.Truncate(18)
353         c.Check(err, check.IsNil)
354         f2.Seek(0, io.SeekStart)
355         buf2, err = ioutil.ReadAll(f2)
356         c.Check(err, check.IsNil)
357         c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
358
359         // shrink to zero some data
360         f.Truncate(15)
361         f2.Seek(0, io.SeekStart)
362         buf2, err = ioutil.ReadAll(f2)
363         c.Check(err, check.IsNil)
364         c.Check(string(buf2), check.Equals, "f0123456789abcd")
365
366         // grow to partial block/extent
367         f.Truncate(20)
368         f2.Seek(0, io.SeekStart)
369         buf2, err = ioutil.ReadAll(f2)
370         c.Check(err, check.IsNil)
371         c.Check(string(buf2), check.Equals, "f0123456789abcd\x00\x00\x00\x00\x00")
372
373         f.Truncate(0)
374         f2.Seek(0, io.SeekStart)
375         f2.Write([]byte("12345678abcdefghijkl"))
376
377         // grow to block/extent boundary
378         f.Truncate(64)
379         f2.Seek(0, io.SeekStart)
380         buf2, err = ioutil.ReadAll(f2)
381         c.Check(err, check.IsNil)
382         c.Check(len(buf2), check.Equals, 64)
383         c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 8)
384
385         // shrink to block/extent boundary
386         err = f.Truncate(32)
387         c.Check(err, check.IsNil)
388         f2.Seek(0, io.SeekStart)
389         buf2, err = ioutil.ReadAll(f2)
390         c.Check(err, check.IsNil)
391         c.Check(len(buf2), check.Equals, 32)
392         c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 4)
393
394         // shrink to partial block/extent
395         err = f.Truncate(15)
396         c.Check(err, check.IsNil)
397         f2.Seek(0, io.SeekStart)
398         buf2, err = ioutil.ReadAll(f2)
399         c.Check(err, check.IsNil)
400         c.Check(string(buf2), check.Equals, "12345678abcdefg")
401         c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 2)
402
403         // Force flush to ensure the block "12345678" gets stored, so
404         // we know what to expect in the final manifest below.
405         _, err = s.fs.MarshalManifest(".")
406         c.Check(err, check.IsNil)
407
408         // Truncate to size=3 while f2's ptr is at 15
409         err = f.Truncate(3)
410         c.Check(err, check.IsNil)
411         buf2, err = ioutil.ReadAll(f2)
412         c.Check(err, check.IsNil)
413         c.Check(string(buf2), check.Equals, "")
414         f2.Seek(0, io.SeekStart)
415         buf2, err = ioutil.ReadAll(f2)
416         c.Check(err, check.IsNil)
417         c.Check(string(buf2), check.Equals, "123")
418         c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 1)
419
420         m, err := s.fs.MarshalManifest(".")
421         c.Check(err, check.IsNil)
422         m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
423         c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 25d55ad283aa400af464c76d713c07ad+8 3:3:bar 6:3:foo\n")
424         c.Check(s.fs.Size(), check.Equals, int64(6))
425 }
426
427 func (s *CollectionFSSuite) TestSeekSparse(c *check.C) {
428         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
429         c.Assert(err, check.IsNil)
430         f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755)
431         c.Assert(err, check.IsNil)
432         defer f.Close()
433
434         checkSize := func(size int64) {
435                 fi, err := f.Stat()
436                 c.Assert(err, check.IsNil)
437                 c.Check(fi.Size(), check.Equals, size)
438
439                 f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755)
440                 c.Assert(err, check.IsNil)
441                 defer f.Close()
442                 fi, err = f.Stat()
443                 c.Check(err, check.IsNil)
444                 c.Check(fi.Size(), check.Equals, size)
445                 pos, err := f.Seek(0, io.SeekEnd)
446                 c.Check(err, check.IsNil)
447                 c.Check(pos, check.Equals, size)
448         }
449
450         f.Seek(2, io.SeekEnd)
451         checkSize(0)
452         f.Write([]byte{1})
453         checkSize(3)
454
455         f.Seek(2, io.SeekCurrent)
456         checkSize(3)
457         f.Write([]byte{})
458         checkSize(5)
459
460         f.Seek(8, io.SeekStart)
461         checkSize(5)
462         n, err := f.Read(make([]byte, 1))
463         c.Check(n, check.Equals, 0)
464         c.Check(err, check.Equals, io.EOF)
465         checkSize(5)
466         f.Write([]byte{1, 2, 3})
467         checkSize(11)
468 }
469
470 func (s *CollectionFSSuite) TestMarshalCopiesRemoteBlocks(c *check.C) {
471         foo := "foo"
472         bar := "bar"
473         hash := map[string]string{
474                 foo: fmt.Sprintf("%x", md5.Sum([]byte(foo))),
475                 bar: fmt.Sprintf("%x", md5.Sum([]byte(bar))),
476         }
477
478         fs, err := (&Collection{
479                 ManifestText: ". " + hash[foo] + "+3+Rzaaaa-foo@bab " + hash[bar] + "+3+A12345@ffffff 0:2:fo.txt 2:4:obar.txt\n",
480         }).FileSystem(s.client, s.kc)
481         c.Assert(err, check.IsNil)
482         manifest, err := fs.MarshalManifest(".")
483         c.Check(manifest, check.Equals, "")
484         c.Check(err, check.NotNil)
485
486         s.kc.refreshable = map[string]bool{hash[bar]: true}
487
488         for _, sigIn := range []string{"Rzaaaa-foo@bab", "A12345@abcde"} {
489                 fs, err = (&Collection{
490                         ManifestText: ". " + hash[foo] + "+3+A12345@fffff " + hash[bar] + "+3+" + sigIn + " 0:2:fo.txt 2:4:obar.txt\n",
491                 }).FileSystem(s.client, s.kc)
492                 c.Assert(err, check.IsNil)
493                 manifest, err := fs.MarshalManifest(".")
494                 c.Check(err, check.IsNil)
495                 // Both blocks should now have +A signatures.
496                 c.Check(manifest, check.Matches, `.*\+A.* .*\+A.*\n`)
497                 c.Check(manifest, check.Not(check.Matches), `.*\+R.*\n`)
498         }
499 }
500
501 func (s *CollectionFSSuite) TestMarshalSmallBlocks(c *check.C) {
502         maxBlockSize = 8
503         defer func() { maxBlockSize = 2 << 26 }()
504
505         var err error
506         s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
507         c.Assert(err, check.IsNil)
508         for _, name := range []string{"foo", "bar", "baz"} {
509                 f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
510                 c.Assert(err, check.IsNil)
511                 f.Write([]byte(name))
512                 f.Close()
513         }
514
515         m, err := s.fs.MarshalManifest(".")
516         c.Check(err, check.IsNil)
517         m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
518         c.Check(m, check.Equals, ". c3c23db5285662ef7172373df0003206+6 acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:bar 3:3:baz 6:3:foo\n")
519 }
520
521 func (s *CollectionFSSuite) TestMkdir(c *check.C) {
522         err := s.fs.Mkdir("foo/bar", 0755)
523         c.Check(err, check.Equals, os.ErrNotExist)
524
525         f, err := s.fs.OpenFile("foo/bar", os.O_CREATE, 0)
526         c.Check(err, check.Equals, os.ErrNotExist)
527
528         err = s.fs.Mkdir("foo", 0755)
529         c.Check(err, check.IsNil)
530
531         f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_WRONLY, 0)
532         c.Check(err, check.IsNil)
533         if err == nil {
534                 defer f.Close()
535                 f.Write([]byte("foo"))
536         }
537
538         // mkdir fails if a file already exists with that name
539         err = s.fs.Mkdir("foo/bar", 0755)
540         c.Check(err, check.NotNil)
541
542         err = s.fs.Remove("foo/bar")
543         c.Check(err, check.IsNil)
544
545         // mkdir succeeds after the file is deleted
546         err = s.fs.Mkdir("foo/bar", 0755)
547         c.Check(err, check.IsNil)
548
549         // creating a file in a nonexistent subdir should still fail
550         f, err = s.fs.OpenFile("foo/bar/baz/foo.txt", os.O_CREATE|os.O_WRONLY, 0)
551         c.Check(err, check.Equals, os.ErrNotExist)
552
553         f, err = s.fs.OpenFile("foo/bar/foo.txt", os.O_CREATE|os.O_WRONLY, 0)
554         c.Check(err, check.IsNil)
555         if err == nil {
556                 defer f.Close()
557                 f.Write([]byte("foo"))
558         }
559
560         // creating foo/bar as a regular file should fail
561         f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_EXCL, 0)
562         c.Check(err, check.NotNil)
563
564         // creating foo/bar as a directory should fail
565         f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_EXCL, os.ModeDir)
566         c.Check(err, check.NotNil)
567         err = s.fs.Mkdir("foo/bar", 0755)
568         c.Check(err, check.NotNil)
569
570         m, err := s.fs.MarshalManifest(".")
571         c.Check(err, check.IsNil)
572         m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
573         c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 3:3:bar 0:3:foo\n./foo/bar acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo.txt\n")
574 }
575
576 func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
577         if testing.Short() {
578                 c.Skip("slow")
579         }
580
581         maxBlockSize = 8
582         defer func() { maxBlockSize = 1 << 26 }()
583
584         var wg sync.WaitGroup
585         for n := 0; n < 128; n++ {
586                 wg.Add(1)
587                 go func() {
588                         defer wg.Done()
589                         f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
590                         c.Assert(err, check.IsNil)
591                         defer f.Close()
592                         for i := 0; i < 1024; i++ {
593                                 r := rand.Uint32()
594                                 switch {
595                                 case r%11 == 0:
596                                         _, err := s.fs.MarshalManifest(".")
597                                         c.Check(err, check.IsNil)
598                                 case r&3 == 0:
599                                         f.Truncate(int64(rand.Intn(64)))
600                                 case r&3 == 1:
601                                         f.Seek(int64(rand.Intn(64)), io.SeekStart)
602                                 case r&3 == 2:
603                                         _, err := f.Write([]byte("beep boop"))
604                                         c.Check(err, check.IsNil)
605                                 case r&3 == 3:
606                                         _, err := ioutil.ReadAll(f)
607                                         c.Check(err, check.IsNil)
608                                 }
609                         }
610                 }()
611         }
612         wg.Wait()
613
614         f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
615         c.Assert(err, check.IsNil)
616         defer f.Close()
617         buf, err := ioutil.ReadAll(f)
618         c.Check(err, check.IsNil)
619         c.Logf("after lots of random r/w/seek/trunc, buf is %q", buf)
620 }
621
622 func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
623         maxBlockSize = 40
624         defer func() { maxBlockSize = 2 << 26 }()
625
626         var err error
627         s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
628         c.Assert(err, check.IsNil)
629
630         const nfiles = 256
631         const ngoroutines = 256
632
633         var wg sync.WaitGroup
634         for n := 0; n < ngoroutines; n++ {
635                 wg.Add(1)
636                 go func(n int) {
637                         defer wg.Done()
638                         expect := make([]byte, 0, 64)
639                         wbytes := []byte("there's no simple explanation for anything important that any of us do")
640                         f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
641                         c.Assert(err, check.IsNil)
642                         defer f.Close()
643                         for i := 0; i < nfiles; i++ {
644                                 trunc := rand.Intn(65)
645                                 woff := rand.Intn(trunc + 1)
646                                 wbytes = wbytes[:rand.Intn(64-woff+1)]
647                                 for buf, i := expect[:cap(expect)], len(expect); i < trunc; i++ {
648                                         buf[i] = 0
649                                 }
650                                 expect = expect[:trunc]
651                                 if trunc < woff+len(wbytes) {
652                                         expect = expect[:woff+len(wbytes)]
653                                 }
654                                 copy(expect[woff:], wbytes)
655                                 f.Truncate(int64(trunc))
656                                 pos, err := f.Seek(int64(woff), io.SeekStart)
657                                 c.Check(pos, check.Equals, int64(woff))
658                                 c.Check(err, check.IsNil)
659                                 n, err := f.Write(wbytes)
660                                 c.Check(n, check.Equals, len(wbytes))
661                                 c.Check(err, check.IsNil)
662                                 pos, err = f.Seek(0, io.SeekStart)
663                                 c.Check(pos, check.Equals, int64(0))
664                                 c.Check(err, check.IsNil)
665                                 buf, err := ioutil.ReadAll(f)
666                                 c.Check(string(buf), check.Equals, string(expect))
667                                 c.Check(err, check.IsNil)
668                         }
669                 }(n)
670         }
671         wg.Wait()
672
673         for n := 0; n < ngoroutines; n++ {
674                 f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDONLY, 0)
675                 c.Assert(err, check.IsNil)
676                 f.(*filehandle).inode.(*filenode).waitPrune()
677                 s.checkMemSize(c, f)
678                 defer f.Close()
679         }
680
681         root, err := s.fs.Open("/")
682         c.Assert(err, check.IsNil)
683         defer root.Close()
684         fi, err := root.Readdir(-1)
685         c.Check(err, check.IsNil)
686         c.Check(len(fi), check.Equals, nfiles)
687
688         _, err = s.fs.MarshalManifest(".")
689         c.Check(err, check.IsNil)
690         // TODO: check manifest content
691 }
692
693 func (s *CollectionFSSuite) TestRemove(c *check.C) {
694         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
695         c.Assert(err, check.IsNil)
696         err = fs.Mkdir("dir0", 0755)
697         c.Assert(err, check.IsNil)
698         err = fs.Mkdir("dir1", 0755)
699         c.Assert(err, check.IsNil)
700         err = fs.Mkdir("dir1/dir2", 0755)
701         c.Assert(err, check.IsNil)
702         err = fs.Mkdir("dir1/dir3", 0755)
703         c.Assert(err, check.IsNil)
704
705         err = fs.Remove("dir0")
706         c.Check(err, check.IsNil)
707         err = fs.Remove("dir0")
708         c.Check(err, check.Equals, os.ErrNotExist)
709
710         err = fs.Remove("dir1/dir2/.")
711         c.Check(err, check.Equals, ErrInvalidArgument)
712         err = fs.Remove("dir1/dir2/..")
713         c.Check(err, check.Equals, ErrInvalidArgument)
714         err = fs.Remove("dir1")
715         c.Check(err, check.Equals, ErrDirectoryNotEmpty)
716         err = fs.Remove("dir1/dir2/../../../dir1")
717         c.Check(err, check.Equals, ErrDirectoryNotEmpty)
718         err = fs.Remove("dir1/dir3/")
719         c.Check(err, check.IsNil)
720         err = fs.RemoveAll("dir1")
721         c.Check(err, check.IsNil)
722         err = fs.RemoveAll("dir1")
723         c.Check(err, check.IsNil)
724 }
725
726 func (s *CollectionFSSuite) TestRenameError(c *check.C) {
727         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
728         c.Assert(err, check.IsNil)
729         err = fs.Mkdir("first", 0755)
730         c.Assert(err, check.IsNil)
731         err = fs.Mkdir("first/second", 0755)
732         c.Assert(err, check.IsNil)
733         f, err := fs.OpenFile("first/second/file", os.O_CREATE|os.O_WRONLY, 0755)
734         c.Assert(err, check.IsNil)
735         f.Write([]byte{1, 2, 3, 4, 5})
736         f.Close()
737         err = fs.Rename("first", "first/second/third")
738         c.Check(err, check.Equals, ErrInvalidArgument)
739         err = fs.Rename("first", "first/third")
740         c.Check(err, check.Equals, ErrInvalidArgument)
741         err = fs.Rename("first/second", "second")
742         c.Check(err, check.IsNil)
743         f, err = fs.OpenFile("second/file", 0, 0)
744         c.Assert(err, check.IsNil)
745         data, err := ioutil.ReadAll(f)
746         c.Check(err, check.IsNil)
747         c.Check(data, check.DeepEquals, []byte{1, 2, 3, 4, 5})
748 }
749
750 func (s *CollectionFSSuite) TestRenameDirectory(c *check.C) {
751         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
752         c.Assert(err, check.IsNil)
753         err = fs.Mkdir("foo", 0755)
754         c.Assert(err, check.IsNil)
755         err = fs.Mkdir("bar", 0755)
756         c.Assert(err, check.IsNil)
757         err = fs.Rename("bar", "baz")
758         c.Check(err, check.IsNil)
759         err = fs.Rename("foo", "baz")
760         c.Check(err, check.NotNil)
761         err = fs.Rename("foo", "baz/")
762         c.Check(err, check.IsNil)
763         err = fs.Rename("baz/foo", ".")
764         c.Check(err, check.Equals, ErrInvalidArgument)
765         err = fs.Rename("baz/foo/", ".")
766         c.Check(err, check.Equals, ErrInvalidArgument)
767 }
768
769 func (s *CollectionFSSuite) TestRename(c *check.C) {
770         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
771         c.Assert(err, check.IsNil)
772         const (
773                 outer = 16
774                 inner = 16
775         )
776         for i := 0; i < outer; i++ {
777                 err = fs.Mkdir(fmt.Sprintf("dir%d", i), 0755)
778                 c.Assert(err, check.IsNil)
779                 for j := 0; j < inner; j++ {
780                         err = fs.Mkdir(fmt.Sprintf("dir%d/dir%d", i, j), 0755)
781                         c.Assert(err, check.IsNil)
782                         for _, fnm := range []string{
783                                 fmt.Sprintf("dir%d/file%d", i, j),
784                                 fmt.Sprintf("dir%d/dir%d/file%d", i, j, j),
785                         } {
786                                 f, err := fs.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0755)
787                                 c.Assert(err, check.IsNil)
788                                 _, err = f.Write([]byte("beep"))
789                                 c.Assert(err, check.IsNil)
790                                 f.Close()
791                         }
792                 }
793         }
794         var wg sync.WaitGroup
795         for i := 0; i < outer; i++ {
796                 for j := 0; j < inner; j++ {
797                         wg.Add(1)
798                         go func(i, j int) {
799                                 defer wg.Done()
800                                 oldname := fmt.Sprintf("dir%d/dir%d/file%d", i, j, j)
801                                 newname := fmt.Sprintf("dir%d/newfile%d", i, inner-j-1)
802                                 _, err := fs.Open(newname)
803                                 c.Check(err, check.Equals, os.ErrNotExist)
804                                 err = fs.Rename(oldname, newname)
805                                 c.Check(err, check.IsNil)
806                                 f, err := fs.Open(newname)
807                                 c.Check(err, check.IsNil)
808                                 f.Close()
809                         }(i, j)
810
811                         wg.Add(1)
812                         go func(i, j int) {
813                                 defer wg.Done()
814                                 // oldname does not exist
815                                 err := fs.Rename(
816                                         fmt.Sprintf("dir%d/dir%d/missing", i, j),
817                                         fmt.Sprintf("dir%d/dir%d/file%d", outer-i-1, j, j))
818                                 c.Check(err, check.ErrorMatches, `.*does not exist`)
819
820                                 // newname parent dir does not exist
821                                 err = fs.Rename(
822                                         fmt.Sprintf("dir%d/dir%d", i, j),
823                                         fmt.Sprintf("dir%d/missing/irrelevant", outer-i-1))
824                                 c.Check(err, check.ErrorMatches, `.*does not exist`)
825
826                                 // oldname parent dir is a file
827                                 err = fs.Rename(
828                                         fmt.Sprintf("dir%d/file%d/patherror", i, j),
829                                         fmt.Sprintf("dir%d/irrelevant", i))
830                                 c.Check(err, check.ErrorMatches, `.*not a directory`)
831
832                                 // newname parent dir is a file
833                                 err = fs.Rename(
834                                         fmt.Sprintf("dir%d/dir%d/file%d", i, j, j),
835                                         fmt.Sprintf("dir%d/file%d/patherror", i, inner-j-1))
836                                 c.Check(err, check.ErrorMatches, `.*not a directory`)
837                         }(i, j)
838                 }
839         }
840         wg.Wait()
841
842         f, err := fs.OpenFile("dir1/newfile3", 0, 0)
843         c.Assert(err, check.IsNil)
844         c.Check(f.Size(), check.Equals, int64(4))
845         buf, err := ioutil.ReadAll(f)
846         c.Check(buf, check.DeepEquals, []byte("beep"))
847         c.Check(err, check.IsNil)
848         _, err = fs.Open("dir1/dir1/file1")
849         c.Check(err, check.Equals, os.ErrNotExist)
850 }
851
852 func (s *CollectionFSSuite) TestPersist(c *check.C) {
853         maxBlockSize = 1024
854         defer func() { maxBlockSize = 2 << 26 }()
855
856         var err error
857         s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
858         c.Assert(err, check.IsNil)
859         err = s.fs.Mkdir("d:r", 0755)
860         c.Assert(err, check.IsNil)
861
862         expect := map[string][]byte{}
863
864         var wg sync.WaitGroup
865         for _, name := range []string{"random 1", "random:2", "random\\3", "d:r/random4"} {
866                 buf := make([]byte, 500)
867                 rand.Read(buf)
868                 expect[name] = buf
869
870                 f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
871                 c.Assert(err, check.IsNil)
872                 // Note: we don't close the file until after the test
873                 // is done. Writes to unclosed files should persist.
874                 defer f.Close()
875
876                 wg.Add(1)
877                 go func() {
878                         defer wg.Done()
879                         for i := 0; i < len(buf); i += 5 {
880                                 _, err := f.Write(buf[i : i+5])
881                                 c.Assert(err, check.IsNil)
882                         }
883                 }()
884         }
885         wg.Wait()
886
887         m, err := s.fs.MarshalManifest(".")
888         c.Check(err, check.IsNil)
889         c.Logf("%q", m)
890
891         root, err := s.fs.Open("/")
892         c.Assert(err, check.IsNil)
893         defer root.Close()
894         fi, err := root.Readdir(-1)
895         c.Check(err, check.IsNil)
896         c.Check(len(fi), check.Equals, 4)
897
898         persisted, err := (&Collection{ManifestText: m}).FileSystem(s.client, s.kc)
899         c.Assert(err, check.IsNil)
900
901         root, err = persisted.Open("/")
902         c.Assert(err, check.IsNil)
903         defer root.Close()
904         fi, err = root.Readdir(-1)
905         c.Check(err, check.IsNil)
906         c.Check(len(fi), check.Equals, 4)
907
908         for name, content := range expect {
909                 c.Logf("read %q", name)
910                 f, err := persisted.Open(name)
911                 c.Assert(err, check.IsNil)
912                 defer f.Close()
913                 buf, err := ioutil.ReadAll(f)
914                 c.Check(err, check.IsNil)
915                 c.Check(buf, check.DeepEquals, content)
916         }
917 }
918
919 func (s *CollectionFSSuite) TestPersistEmptyFilesAndDirs(c *check.C) {
920         var err error
921         s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
922         c.Assert(err, check.IsNil)
923         for _, name := range []string{"dir", "dir/zerodir", "empty", "not empty", "not empty/empty", "zero", "zero/zero"} {
924                 err = s.fs.Mkdir(name, 0755)
925                 c.Assert(err, check.IsNil)
926         }
927
928         expect := map[string][]byte{
929                 "0":                nil,
930                 "00":               {},
931                 "one":              {1},
932                 "dir/0":            nil,
933                 "dir/two":          {1, 2},
934                 "dir/zero":         nil,
935                 "dir/zerodir/zero": nil,
936                 "zero/zero/zero":   nil,
937         }
938         for name, data := range expect {
939                 f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
940                 c.Assert(err, check.IsNil)
941                 if data != nil {
942                         _, err := f.Write(data)
943                         c.Assert(err, check.IsNil)
944                 }
945                 f.Close()
946         }
947
948         m, err := s.fs.MarshalManifest(".")
949         c.Check(err, check.IsNil)
950         c.Logf("%q", m)
951
952         persisted, err := (&Collection{ManifestText: m}).FileSystem(s.client, s.kc)
953         c.Assert(err, check.IsNil)
954
955         for name, data := range expect {
956                 _, err = persisted.Open("bogus-" + name)
957                 c.Check(err, check.NotNil)
958
959                 f, err := persisted.Open(name)
960                 c.Assert(err, check.IsNil)
961
962                 if data == nil {
963                         data = []byte{}
964                 }
965                 buf, err := ioutil.ReadAll(f)
966                 c.Check(err, check.IsNil)
967                 c.Check(buf, check.DeepEquals, data)
968         }
969
970         expectDir := map[string]int{
971                 "empty":           0,
972                 "not empty":       1,
973                 "not empty/empty": 0,
974         }
975         for name, expectLen := range expectDir {
976                 _, err := persisted.Open(name + "/bogus")
977                 c.Check(err, check.NotNil)
978
979                 d, err := persisted.Open(name)
980                 defer d.Close()
981                 c.Check(err, check.IsNil)
982                 fi, err := d.Readdir(-1)
983                 c.Check(err, check.IsNil)
984                 c.Check(fi, check.HasLen, expectLen)
985         }
986 }
987
988 func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
989         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
990         c.Assert(err, check.IsNil)
991
992         f, err := fs.OpenFile("missing", os.O_WRONLY, 0)
993         c.Check(f, check.IsNil)
994         c.Check(err, check.ErrorMatches, `file does not exist`)
995
996         f, err = fs.OpenFile("new", os.O_CREATE|os.O_RDONLY, 0)
997         c.Assert(err, check.IsNil)
998         defer f.Close()
999         n, err := f.Write([]byte{1, 2, 3})
1000         c.Check(n, check.Equals, 0)
1001         c.Check(err, check.ErrorMatches, `read-only file`)
1002         n, err = f.Read(make([]byte, 1))
1003         c.Check(n, check.Equals, 0)
1004         c.Check(err, check.Equals, io.EOF)
1005         f, err = fs.OpenFile("new", os.O_RDWR, 0)
1006         c.Assert(err, check.IsNil)
1007         defer f.Close()
1008         _, err = f.Write([]byte{4, 5, 6})
1009         c.Check(err, check.IsNil)
1010         fi, err := f.Stat()
1011         c.Assert(err, check.IsNil)
1012         c.Check(fi.Size(), check.Equals, int64(3))
1013
1014         f, err = fs.OpenFile("new", os.O_TRUNC|os.O_RDWR, 0)
1015         c.Assert(err, check.IsNil)
1016         defer f.Close()
1017         pos, err := f.Seek(0, io.SeekEnd)
1018         c.Check(pos, check.Equals, int64(0))
1019         c.Check(err, check.IsNil)
1020         fi, err = f.Stat()
1021         c.Assert(err, check.IsNil)
1022         c.Check(fi.Size(), check.Equals, int64(0))
1023         fs.Remove("new")
1024
1025         buf := make([]byte, 64)
1026         f, err = fs.OpenFile("append", os.O_EXCL|os.O_CREATE|os.O_RDWR|os.O_APPEND, 0)
1027         c.Assert(err, check.IsNil)
1028         f.Write([]byte{1, 2, 3})
1029         f.Seek(0, io.SeekStart)
1030         n, _ = f.Read(buf[:1])
1031         c.Check(n, check.Equals, 1)
1032         c.Check(buf[:1], check.DeepEquals, []byte{1})
1033         pos, err = f.Seek(0, io.SeekCurrent)
1034         c.Assert(err, check.IsNil)
1035         c.Check(pos, check.Equals, int64(1))
1036         f.Write([]byte{4, 5, 6})
1037         pos, err = f.Seek(0, io.SeekCurrent)
1038         c.Assert(err, check.IsNil)
1039         c.Check(pos, check.Equals, int64(6))
1040         f.Seek(0, io.SeekStart)
1041         n, err = f.Read(buf)
1042         c.Check(buf[:n], check.DeepEquals, []byte{1, 2, 3, 4, 5, 6})
1043         c.Check(err, check.Equals, io.EOF)
1044         f.Close()
1045
1046         f, err = fs.OpenFile("append", os.O_RDWR|os.O_APPEND, 0)
1047         c.Assert(err, check.IsNil)
1048         pos, err = f.Seek(0, io.SeekCurrent)
1049         c.Check(pos, check.Equals, int64(0))
1050         c.Check(err, check.IsNil)
1051         f.Read(buf[:3])
1052         pos, _ = f.Seek(0, io.SeekCurrent)
1053         c.Check(pos, check.Equals, int64(3))
1054         f.Write([]byte{7, 8, 9})
1055         pos, err = f.Seek(0, io.SeekCurrent)
1056         c.Check(err, check.IsNil)
1057         c.Check(pos, check.Equals, int64(9))
1058         f.Close()
1059
1060         f, err = fs.OpenFile("wronly", os.O_CREATE|os.O_WRONLY, 0)
1061         c.Assert(err, check.IsNil)
1062         n, err = f.Write([]byte{3, 2, 1})
1063         c.Check(n, check.Equals, 3)
1064         c.Check(err, check.IsNil)
1065         pos, _ = f.Seek(0, io.SeekCurrent)
1066         c.Check(pos, check.Equals, int64(3))
1067         pos, _ = f.Seek(0, io.SeekStart)
1068         c.Check(pos, check.Equals, int64(0))
1069         n, err = f.Read(buf)
1070         c.Check(n, check.Equals, 0)
1071         c.Check(err, check.ErrorMatches, `.*O_WRONLY.*`)
1072         f, err = fs.OpenFile("wronly", os.O_RDONLY, 0)
1073         c.Assert(err, check.IsNil)
1074         n, _ = f.Read(buf)
1075         c.Check(buf[:n], check.DeepEquals, []byte{3, 2, 1})
1076
1077         f, err = fs.OpenFile("unsupported", os.O_CREATE|os.O_SYNC, 0)
1078         c.Check(f, check.IsNil)
1079         c.Check(err, check.NotNil)
1080
1081         f, err = fs.OpenFile("append", os.O_RDWR|os.O_WRONLY, 0)
1082         c.Check(f, check.IsNil)
1083         c.Check(err, check.ErrorMatches, `invalid flag.*`)
1084 }
1085
1086 func (s *CollectionFSSuite) TestFlushFullBlocksWritingLongFile(c *check.C) {
1087         defer func(cw, mbs int) {
1088                 concurrentWriters = cw
1089                 maxBlockSize = mbs
1090         }(concurrentWriters, maxBlockSize)
1091         concurrentWriters = 2
1092         maxBlockSize = 1024
1093
1094         proceed := make(chan struct{})
1095         var started, concurrent int32
1096         blk2done := false
1097         s.kc.onWrite = func([]byte) {
1098                 atomic.AddInt32(&concurrent, 1)
1099                 switch atomic.AddInt32(&started, 1) {
1100                 case 1:
1101                         // Wait until block 2 starts and finishes, and block 3 starts
1102                         select {
1103                         case <-proceed:
1104                                 c.Check(blk2done, check.Equals, true)
1105                         case <-time.After(time.Second):
1106                                 c.Error("timed out")
1107                         }
1108                 case 2:
1109                         time.Sleep(time.Millisecond)
1110                         blk2done = true
1111                 case 3:
1112                         close(proceed)
1113                 default:
1114                         time.Sleep(time.Millisecond)
1115                 }
1116                 c.Check(atomic.AddInt32(&concurrent, -1) < int32(concurrentWriters), check.Equals, true)
1117         }
1118
1119         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1120         c.Assert(err, check.IsNil)
1121         f, err := fs.OpenFile("50K", os.O_WRONLY|os.O_CREATE, 0)
1122         c.Assert(err, check.IsNil)
1123         defer f.Close()
1124
1125         data := make([]byte, 500)
1126         rand.Read(data)
1127
1128         for i := 0; i < 100; i++ {
1129                 n, err := f.Write(data)
1130                 c.Assert(n, check.Equals, len(data))
1131                 c.Assert(err, check.IsNil)
1132         }
1133
1134         currentMemExtents := func() (memExtents []int) {
1135                 for idx, e := range f.(*filehandle).inode.(*filenode).segments {
1136                         switch e.(type) {
1137                         case *memSegment:
1138                                 memExtents = append(memExtents, idx)
1139                         }
1140                 }
1141                 return
1142         }
1143         f.(*filehandle).inode.(*filenode).waitPrune()
1144         c.Check(currentMemExtents(), check.HasLen, 1)
1145
1146         m, err := fs.MarshalManifest(".")
1147         c.Check(m, check.Matches, `[^:]* 0:50000:50K\n`)
1148         c.Check(err, check.IsNil)
1149         c.Check(currentMemExtents(), check.HasLen, 0)
1150 }
1151
1152 // Ensure blocks get flushed to disk if a lot of data is written to
1153 // small files/directories without calling sync().
1154 //
1155 // Write four 512KiB files into each of 256 top-level dirs (total
1156 // 512MiB), calling Flush() every 8 dirs. Ensure memory usage never
1157 // exceeds 24MiB (4 concurrentWriters * 2MiB + 8 unflushed dirs *
1158 // 2MiB).
1159 func (s *CollectionFSSuite) TestFlushAll(c *check.C) {
1160         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1161         c.Assert(err, check.IsNil)
1162
1163         s.kc.onWrite = func([]byte) {
1164                 // discard flushed data -- otherwise the stub will use
1165                 // unlimited memory
1166                 time.Sleep(time.Millisecond)
1167                 s.kc.Lock()
1168                 defer s.kc.Unlock()
1169                 s.kc.blocks = map[string][]byte{}
1170         }
1171         for i := 0; i < 256; i++ {
1172                 buf := bytes.NewBuffer(make([]byte, 524288))
1173                 fmt.Fprintf(buf, "test file in dir%d", i)
1174
1175                 dir := fmt.Sprintf("dir%d", i)
1176                 fs.Mkdir(dir, 0755)
1177                 for j := 0; j < 2; j++ {
1178                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1179                         c.Assert(err, check.IsNil)
1180                         defer f.Close()
1181                         _, err = io.Copy(f, buf)
1182                         c.Assert(err, check.IsNil)
1183                 }
1184
1185                 if i%8 == 0 {
1186                         fs.Flush("", true)
1187                 }
1188
1189                 size := fs.MemorySize()
1190                 if !c.Check(size <= 1<<24, check.Equals, true) {
1191                         c.Logf("at dir%d fs.MemorySize()=%d", i, size)
1192                         return
1193                 }
1194         }
1195 }
1196
1197 // Ensure short blocks at the end of a stream don't get flushed by
1198 // Flush(false).
1199 //
1200 // Write 67x 1MiB files to each of 8 dirs, and check that 8 full 64MiB
1201 // blocks have been flushed while 8x 3MiB is still buffered in memory.
1202 func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
1203         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1204         c.Assert(err, check.IsNil)
1205
1206         var flushed int64
1207         s.kc.onWrite = func(p []byte) {
1208                 atomic.AddInt64(&flushed, int64(len(p)))
1209         }
1210
1211         nDirs := int64(8)
1212         megabyte := make([]byte, 1<<20)
1213         for i := int64(0); i < nDirs; i++ {
1214                 dir := fmt.Sprintf("dir%d", i)
1215                 fs.Mkdir(dir, 0755)
1216                 for j := 0; j < 67; j++ {
1217                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1218                         c.Assert(err, check.IsNil)
1219                         defer f.Close()
1220                         _, err = f.Write(megabyte)
1221                         c.Assert(err, check.IsNil)
1222                 }
1223         }
1224         c.Check(fs.MemorySize(), check.Equals, int64(nDirs*67<<20))
1225         c.Check(flushed, check.Equals, int64(0))
1226
1227         waitForFlush := func(expectUnflushed, expectFlushed int64) {
1228                 for deadline := time.Now().Add(5 * time.Second); fs.MemorySize() > expectUnflushed && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
1229                 }
1230                 c.Check(fs.MemorySize(), check.Equals, expectUnflushed)
1231                 c.Check(flushed, check.Equals, expectFlushed)
1232         }
1233
1234         // Nothing flushed yet
1235         waitForFlush((nDirs*67)<<20, 0)
1236
1237         // Flushing a non-empty dir "/" is non-recursive and there are
1238         // no top-level files, so this has no effect
1239         fs.Flush("/", false)
1240         waitForFlush((nDirs*67)<<20, 0)
1241
1242         // Flush the full block in dir0
1243         fs.Flush("dir0", false)
1244         waitForFlush((nDirs*67-64)<<20, 64<<20)
1245
1246         err = fs.Flush("dir-does-not-exist", false)
1247         c.Check(err, check.NotNil)
1248
1249         // Flush full blocks in all dirs
1250         fs.Flush("", false)
1251         waitForFlush(nDirs*3<<20, nDirs*64<<20)
1252
1253         // Flush non-full blocks, too
1254         fs.Flush("", true)
1255         waitForFlush(0, nDirs*67<<20)
1256 }
1257
1258 // Even when writing lots of files/dirs from different goroutines, as
1259 // long as Flush(dir,false) is called after writing each file,
1260 // unflushed data should be limited to one full block per
1261 // concurrentWriter, plus one nearly-full block at the end of each
1262 // dir/stream.
1263 func (s *CollectionFSSuite) TestMaxUnflushed(c *check.C) {
1264         nDirs := int64(8)
1265         maxUnflushed := (int64(concurrentWriters) + nDirs) << 26
1266
1267         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1268         c.Assert(err, check.IsNil)
1269
1270         release := make(chan struct{})
1271         timeout := make(chan struct{})
1272         time.AfterFunc(10*time.Second, func() { close(timeout) })
1273         var putCount, concurrency int64
1274         var unflushed int64
1275         s.kc.onWrite = func(p []byte) {
1276                 defer atomic.AddInt64(&unflushed, -int64(len(p)))
1277                 cur := atomic.AddInt64(&concurrency, 1)
1278                 defer atomic.AddInt64(&concurrency, -1)
1279                 pc := atomic.AddInt64(&putCount, 1)
1280                 if pc < int64(concurrentWriters) {
1281                         // Block until we reach concurrentWriters, to
1282                         // make sure we're really accepting concurrent
1283                         // writes.
1284                         select {
1285                         case <-release:
1286                         case <-timeout:
1287                                 c.Error("timeout")
1288                         }
1289                 } else if pc == int64(concurrentWriters) {
1290                         // Unblock the first N-1 PUT reqs.
1291                         close(release)
1292                 }
1293                 c.Assert(cur <= int64(concurrentWriters), check.Equals, true)
1294                 c.Assert(atomic.LoadInt64(&unflushed) <= maxUnflushed, check.Equals, true)
1295         }
1296
1297         var owg sync.WaitGroup
1298         megabyte := make([]byte, 1<<20)
1299         for i := int64(0); i < nDirs; i++ {
1300                 dir := fmt.Sprintf("dir%d", i)
1301                 fs.Mkdir(dir, 0755)
1302                 owg.Add(1)
1303                 go func() {
1304                         defer owg.Done()
1305                         defer fs.Flush(dir, true)
1306                         var iwg sync.WaitGroup
1307                         defer iwg.Wait()
1308                         for j := 0; j < 67; j++ {
1309                                 iwg.Add(1)
1310                                 go func(j int) {
1311                                         defer iwg.Done()
1312                                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1313                                         c.Assert(err, check.IsNil)
1314                                         defer f.Close()
1315                                         n, err := f.Write(megabyte)
1316                                         c.Assert(err, check.IsNil)
1317                                         atomic.AddInt64(&unflushed, int64(n))
1318                                         fs.Flush(dir, false)
1319                                 }(j)
1320                         }
1321                 }()
1322         }
1323         owg.Wait()
1324         fs.Flush("", true)
1325 }
1326
1327 func (s *CollectionFSSuite) TestFlushStress(c *check.C) {
1328         done := false
1329         defer func() { done = true }()
1330         time.AfterFunc(10*time.Second, func() {
1331                 if !done {
1332                         pprof.Lookup("goroutine").WriteTo(os.Stderr, 1)
1333                         panic("timeout")
1334                 }
1335         })
1336
1337         wrote := 0
1338         s.kc.onWrite = func(p []byte) {
1339                 s.kc.Lock()
1340                 s.kc.blocks = map[string][]byte{}
1341                 wrote++
1342                 defer c.Logf("wrote block %d, %d bytes", wrote, len(p))
1343                 s.kc.Unlock()
1344                 time.Sleep(20 * time.Millisecond)
1345         }
1346
1347         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1348         c.Assert(err, check.IsNil)
1349
1350         data := make([]byte, 1<<20)
1351         for i := 0; i < 3; i++ {
1352                 dir := fmt.Sprintf("dir%d", i)
1353                 fs.Mkdir(dir, 0755)
1354                 for j := 0; j < 200; j++ {
1355                         data[0] = byte(j)
1356                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1357                         c.Assert(err, check.IsNil)
1358                         _, err = f.Write(data)
1359                         c.Assert(err, check.IsNil)
1360                         f.Close()
1361                         fs.Flush(dir, false)
1362                 }
1363                 _, err := fs.MarshalManifest(".")
1364                 c.Check(err, check.IsNil)
1365         }
1366 }
1367
1368 func (s *CollectionFSSuite) TestFlushShort(c *check.C) {
1369         s.kc.onWrite = func([]byte) {
1370                 s.kc.Lock()
1371                 s.kc.blocks = map[string][]byte{}
1372                 s.kc.Unlock()
1373         }
1374         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1375         c.Assert(err, check.IsNil)
1376         for _, blocksize := range []int{8, 1000000} {
1377                 dir := fmt.Sprintf("dir%d", blocksize)
1378                 err = fs.Mkdir(dir, 0755)
1379                 c.Assert(err, check.IsNil)
1380                 data := make([]byte, blocksize)
1381                 for i := 0; i < 100; i++ {
1382                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, i), os.O_WRONLY|os.O_CREATE, 0)
1383                         c.Assert(err, check.IsNil)
1384                         _, err = f.Write(data)
1385                         c.Assert(err, check.IsNil)
1386                         f.Close()
1387                         fs.Flush(dir, false)
1388                 }
1389                 fs.Flush(dir, true)
1390                 _, err := fs.MarshalManifest(".")
1391                 c.Check(err, check.IsNil)
1392         }
1393 }
1394
1395 func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
1396         for _, txt := range []string{
1397                 "\n",
1398                 ".\n",
1399                 ". \n",
1400                 ". d41d8cd98f00b204e9800998ecf8427e+0\n",
1401                 ". d41d8cd98f00b204e9800998ecf8427e+0 \n",
1402                 ". 0:0:foo\n",
1403                 ".  0:0:foo\n",
1404                 ". 0:0:foo 0:0:bar\n",
1405                 ". d41d8cd98f00b204e9800998ecf8427e 0:0:foo\n",
1406                 ". d41d8cd98f00b204e9800998ecf8427e+0 :0:0:foo\n",
1407                 ". d41d8cd98f00b204e9800998ecf8427e+0 foo:0:foo\n",
1408                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:foo:foo\n",
1409                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:foo 1:1:bar\n",
1410                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\n",
1411                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\\057\\056\n",
1412                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:.\n",
1413                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:..\n",
1414                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:..\n",
1415                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/..\n",
1416                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n",
1417                 "./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n. d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n",
1418         } {
1419                 c.Logf("<-%q", txt)
1420                 fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc)
1421                 c.Check(fs, check.IsNil)
1422                 c.Logf("-> %s", err)
1423                 c.Check(err, check.NotNil)
1424         }
1425 }
1426
1427 func (s *CollectionFSSuite) TestEdgeCaseManifests(c *check.C) {
1428         for _, txt := range []string{
1429                 "",
1430                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo\n",
1431                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:...\n",
1432                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:. 0:0:. 0:0:\\056 0:0:\\056\n",
1433                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/. 0:0:. 0:0:foo\\057bar\\057\\056\n",
1434                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n",
1435                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/bar\n./foo d41d8cd98f00b204e9800998ecf8427e+0 0:0:bar\n",
1436         } {
1437                 c.Logf("<-%q", txt)
1438                 fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc)
1439                 c.Check(err, check.IsNil)
1440                 c.Check(fs, check.NotNil)
1441         }
1442 }
1443
1444 func (s *CollectionFSSuite) TestSnapshotSplice(c *check.C) {
1445         filedata1 := "hello snapshot+splice world\n"
1446         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1447         c.Assert(err, check.IsNil)
1448         {
1449                 f, err := fs.OpenFile("file1", os.O_CREATE|os.O_RDWR, 0700)
1450                 c.Assert(err, check.IsNil)
1451                 _, err = f.Write([]byte(filedata1))
1452                 c.Assert(err, check.IsNil)
1453                 err = f.Close()
1454                 c.Assert(err, check.IsNil)
1455         }
1456
1457         snap, err := Snapshot(fs, "/")
1458         c.Assert(err, check.IsNil)
1459         err = Splice(fs, "dir1", snap)
1460         c.Assert(err, check.IsNil)
1461         f, err := fs.Open("dir1/file1")
1462         c.Assert(err, check.IsNil)
1463         buf, err := io.ReadAll(f)
1464         c.Assert(err, check.IsNil)
1465         c.Check(string(buf), check.Equals, filedata1)
1466 }
1467
1468 func (s *CollectionFSSuite) TestRefreshSignatures(c *check.C) {
1469         filedata1 := "hello refresh signatures world\n"
1470         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1471         c.Assert(err, check.IsNil)
1472         fs.Mkdir("d1", 0700)
1473         f, err := fs.OpenFile("d1/file1", os.O_CREATE|os.O_RDWR, 0700)
1474         c.Assert(err, check.IsNil)
1475         _, err = f.Write([]byte(filedata1))
1476         c.Assert(err, check.IsNil)
1477         err = f.Close()
1478         c.Assert(err, check.IsNil)
1479
1480         filedata2 := "hello refresh signatures universe\n"
1481         fs.Mkdir("d2", 0700)
1482         f, err = fs.OpenFile("d2/file2", os.O_CREATE|os.O_RDWR, 0700)
1483         c.Assert(err, check.IsNil)
1484         _, err = f.Write([]byte(filedata2))
1485         c.Assert(err, check.IsNil)
1486         err = f.Close()
1487         c.Assert(err, check.IsNil)
1488         txt, err := fs.MarshalManifest(".")
1489         c.Assert(err, check.IsNil)
1490         var saved Collection
1491         err = s.client.RequestAndDecode(&saved, "POST", "arvados/v1/collections", nil, map[string]interface{}{
1492                 "select": []string{"manifest_text", "uuid", "portable_data_hash"},
1493                 "collection": map[string]interface{}{
1494                         "manifest_text": txt,
1495                 },
1496         })
1497         c.Assert(err, check.IsNil)
1498
1499         // Update signatures synchronously if they are already expired
1500         // when Read() is called.
1501         {
1502                 saved.ManifestText = SignManifest(saved.ManifestText, s.kc.authToken, time.Now().Add(-2*time.Second), s.kc.sigttl, []byte(s.kc.sigkey))
1503                 fs, err := saved.FileSystem(s.client, s.kc)
1504                 c.Assert(err, check.IsNil)
1505                 f, err := fs.OpenFile("d1/file1", os.O_RDONLY, 0)
1506                 c.Assert(err, check.IsNil)
1507                 buf, err := ioutil.ReadAll(f)
1508                 c.Check(err, check.IsNil)
1509                 c.Check(string(buf), check.Equals, filedata1)
1510         }
1511
1512         // Update signatures asynchronously if we're more than half
1513         // way to TTL when Read() is called.
1514         {
1515                 exp := time.Now().Add(2 * time.Minute)
1516                 saved.ManifestText = SignManifest(saved.ManifestText, s.kc.authToken, exp, s.kc.sigttl, []byte(s.kc.sigkey))
1517                 fs, err := saved.FileSystem(s.client, s.kc)
1518                 c.Assert(err, check.IsNil)
1519                 f1, err := fs.OpenFile("d1/file1", os.O_RDONLY, 0)
1520                 c.Assert(err, check.IsNil)
1521                 f2, err := fs.OpenFile("d2/file2", os.O_RDONLY, 0)
1522                 c.Assert(err, check.IsNil)
1523                 buf, err := ioutil.ReadAll(f1)
1524                 c.Check(err, check.IsNil)
1525                 c.Check(string(buf), check.Equals, filedata1)
1526
1527                 // Ensure fs treats the 2-minute TTL as less than half
1528                 // the server's signing TTL. If we don't do this,
1529                 // collectionfs will guess the signature is fresh,
1530                 // i.e., signing TTL is 2 minutes, and won't do an
1531                 // async refresh.
1532                 fs.(*collectionFileSystem).guessSignatureTTL = time.Hour
1533
1534                 refreshed := false
1535                 for deadline := time.Now().Add(time.Second * 10); time.Now().Before(deadline) && !refreshed; time.Sleep(time.Second / 10) {
1536                         _, err = f1.Seek(0, io.SeekStart)
1537                         c.Assert(err, check.IsNil)
1538                         buf, err = ioutil.ReadAll(f1)
1539                         c.Assert(err, check.IsNil)
1540                         c.Assert(string(buf), check.Equals, filedata1)
1541                         loc := s.kc.reads[len(s.kc.reads)-1]
1542                         t, err := signatureExpiryTime(loc)
1543                         c.Assert(err, check.IsNil)
1544                         c.Logf("last read block %s had signature expiry time %v", loc, t)
1545                         if t.Sub(time.Now()) > time.Hour {
1546                                 refreshed = true
1547                         }
1548                 }
1549                 c.Check(refreshed, check.Equals, true)
1550
1551                 // Second locator should have been updated at the same
1552                 // time.
1553                 buf, err = ioutil.ReadAll(f2)
1554                 c.Assert(err, check.IsNil)
1555                 c.Assert(string(buf), check.Equals, filedata2)
1556                 loc := s.kc.reads[len(s.kc.reads)-1]
1557                 c.Check(loc, check.Not(check.Equals), s.kc.reads[len(s.kc.reads)-2])
1558                 t, err := signatureExpiryTime(s.kc.reads[len(s.kc.reads)-1])
1559                 c.Assert(err, check.IsNil)
1560                 c.Logf("last read block %s had signature expiry time %v", loc, t)
1561                 c.Check(t.Sub(time.Now()) > time.Hour, check.Equals, true)
1562         }
1563 }
1564
1565 var bigmanifest = func() string {
1566         var buf bytes.Buffer
1567         for i := 0; i < 2000; i++ {
1568                 fmt.Fprintf(&buf, "./dir%d", i)
1569                 for i := 0; i < 100; i++ {
1570                         fmt.Fprintf(&buf, " d41d8cd98f00b204e9800998ecf8427e+99999")
1571                 }
1572                 for i := 0; i < 2000; i++ {
1573                         fmt.Fprintf(&buf, " 1200000:300000:file%d", i)
1574                 }
1575                 fmt.Fprintf(&buf, "\n")
1576         }
1577         return buf.String()
1578 }()
1579
1580 func (s *CollectionFSSuite) BenchmarkParseManifest(c *check.C) {
1581         DebugLocksPanicMode = false
1582         c.Logf("test manifest is %d bytes", len(bigmanifest))
1583         for i := 0; i < c.N; i++ {
1584                 fs, err := (&Collection{ManifestText: bigmanifest}).FileSystem(s.client, s.kc)
1585                 c.Check(err, check.IsNil)
1586                 c.Check(fs, check.NotNil)
1587         }
1588 }
1589
1590 func (s *CollectionFSSuite) checkMemSize(c *check.C, f File) {
1591         fn := f.(*filehandle).inode.(*filenode)
1592         var memsize int64
1593         for _, seg := range fn.segments {
1594                 if e, ok := seg.(*memSegment); ok {
1595                         memsize += int64(len(e.buf))
1596                 }
1597         }
1598         c.Check(fn.memsize, check.Equals, memsize)
1599 }
1600
1601 type CollectionFSUnitSuite struct{}
1602
1603 var _ = check.Suite(&CollectionFSUnitSuite{})
1604
1605 // expect ~2 seconds to load a manifest with 256K files
1606 func (s *CollectionFSUnitSuite) TestLargeManifest(c *check.C) {
1607         if testing.Short() {
1608                 c.Skip("slow")
1609         }
1610
1611         const (
1612                 dirCount  = 512
1613                 fileCount = 512
1614         )
1615
1616         mb := bytes.NewBuffer(make([]byte, 0, 40000000))
1617         for i := 0; i < dirCount; i++ {
1618                 fmt.Fprintf(mb, "./dir%d", i)
1619                 for j := 0; j <= fileCount; j++ {
1620                         fmt.Fprintf(mb, " %032x+42+A%040x@%08x", j, j, j)
1621                 }
1622                 for j := 0; j < fileCount; j++ {
1623                         fmt.Fprintf(mb, " %d:%d:dir%d/file%d", j*42+21, 42, j, j)
1624                 }
1625                 mb.Write([]byte{'\n'})
1626         }
1627         coll := Collection{ManifestText: mb.String()}
1628         c.Logf("%s built", time.Now())
1629
1630         var memstats runtime.MemStats
1631         runtime.ReadMemStats(&memstats)
1632         c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys)
1633
1634         f, err := coll.FileSystem(nil, nil)
1635         c.Check(err, check.IsNil)
1636         c.Logf("%s loaded", time.Now())
1637         c.Check(f.Size(), check.Equals, int64(42*dirCount*fileCount))
1638
1639         for i := 0; i < dirCount; i++ {
1640                 for j := 0; j < fileCount; j++ {
1641                         f.Stat(fmt.Sprintf("./dir%d/dir%d/file%d", i, j, j))
1642                 }
1643         }
1644         c.Logf("%s Stat() x %d", time.Now(), dirCount*fileCount)
1645
1646         runtime.ReadMemStats(&memstats)
1647         c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys)
1648 }
1649
1650 // Gocheck boilerplate
1651 func Test(t *testing.T) {
1652         check.TestingT(t)
1653 }