Merge branch 'file-token-error' of git://github.com/MajewskiKrzysztof/arvados into...
[arvados.git] / sdk / go / arvados / fs_collection_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "bytes"
9         "crypto/md5"
10         "crypto/sha1"
11         "errors"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "math/rand"
16         "net/http"
17         "os"
18         "regexp"
19         "runtime"
20         "strings"
21         "sync"
22         "sync/atomic"
23         "testing"
24         "time"
25
26         check "gopkg.in/check.v1"
27 )
28
29 var _ = check.Suite(&CollectionFSSuite{})
30
31 type keepClientStub struct {
32         blocks      map[string][]byte
33         refreshable map[string]bool
34         onPut       func(bufcopy []byte) // called from PutB, before acquiring lock
35         sync.RWMutex
36 }
37
38 var errStub404 = errors.New("404 block not found")
39
40 func (kcs *keepClientStub) ReadAt(locator string, p []byte, off int) (int, error) {
41         kcs.RLock()
42         defer kcs.RUnlock()
43         buf := kcs.blocks[locator[:32]]
44         if buf == nil {
45                 return 0, errStub404
46         }
47         return copy(p, buf[off:]), nil
48 }
49
50 func (kcs *keepClientStub) PutB(p []byte) (string, int, error) {
51         locator := fmt.Sprintf("%x+%d+A12345@abcde", md5.Sum(p), len(p))
52         buf := make([]byte, len(p))
53         copy(buf, p)
54         if kcs.onPut != nil {
55                 kcs.onPut(buf)
56         }
57         kcs.Lock()
58         defer kcs.Unlock()
59         kcs.blocks[locator[:32]] = buf
60         return locator, 1, nil
61 }
62
63 var localOrRemoteSignature = regexp.MustCompile(`\+[AR][^+]*`)
64
65 func (kcs *keepClientStub) LocalLocator(locator string) (string, error) {
66         kcs.Lock()
67         defer kcs.Unlock()
68         if strings.Contains(locator, "+R") {
69                 if len(locator) < 32 {
70                         return "", fmt.Errorf("bad locator: %q", locator)
71                 }
72                 if _, ok := kcs.blocks[locator[:32]]; !ok && !kcs.refreshable[locator[:32]] {
73                         return "", fmt.Errorf("kcs.refreshable[%q]==false", locator)
74                 }
75         }
76         fakeSig := fmt.Sprintf("+A%x@%x", sha1.Sum(nil), time.Now().Add(time.Hour*24*14).Unix())
77         return localOrRemoteSignature.ReplaceAllLiteralString(locator, fakeSig), nil
78 }
79
80 type CollectionFSSuite struct {
81         client *Client
82         coll   Collection
83         fs     CollectionFileSystem
84         kc     *keepClientStub
85 }
86
87 func (s *CollectionFSSuite) SetUpTest(c *check.C) {
88         s.client = NewClientFromEnv()
89         err := s.client.RequestAndDecode(&s.coll, "GET", "arvados/v1/collections/"+fixtureFooAndBarFilesInDirUUID, nil, nil)
90         c.Assert(err, check.IsNil)
91         s.kc = &keepClientStub{
92                 blocks: map[string][]byte{
93                         "3858f62230ac3c915f300c664312c63f": []byte("foobar"),
94                 }}
95         s.fs, err = s.coll.FileSystem(s.client, s.kc)
96         c.Assert(err, check.IsNil)
97 }
98
99 func (s *CollectionFSSuite) TestHttpFileSystemInterface(c *check.C) {
100         _, ok := s.fs.(http.FileSystem)
101         c.Check(ok, check.Equals, true)
102 }
103
104 func (s *CollectionFSSuite) TestColonInFilename(c *check.C) {
105         fs, err := (&Collection{
106                 ManifestText: "./foo:foo 3858f62230ac3c915f300c664312c63f+3 0:3:bar:bar\n",
107         }).FileSystem(s.client, s.kc)
108         c.Assert(err, check.IsNil)
109
110         f, err := fs.Open("/foo:foo")
111         c.Assert(err, check.IsNil)
112
113         fis, err := f.Readdir(0)
114         c.Check(err, check.IsNil)
115         c.Check(len(fis), check.Equals, 1)
116         c.Check(fis[0].Name(), check.Equals, "bar:bar")
117 }
118
119 func (s *CollectionFSSuite) TestReaddirFull(c *check.C) {
120         f, err := s.fs.Open("/dir1")
121         c.Assert(err, check.IsNil)
122
123         st, err := f.Stat()
124         c.Assert(err, check.IsNil)
125         c.Check(st.Size(), check.Equals, int64(2))
126         c.Check(st.IsDir(), check.Equals, true)
127
128         fis, err := f.Readdir(0)
129         c.Check(err, check.IsNil)
130         c.Check(len(fis), check.Equals, 2)
131         if len(fis) > 0 {
132                 c.Check(fis[0].Size(), check.Equals, int64(3))
133         }
134 }
135
136 func (s *CollectionFSSuite) TestReaddirLimited(c *check.C) {
137         f, err := s.fs.Open("./dir1")
138         c.Assert(err, check.IsNil)
139
140         fis, err := f.Readdir(1)
141         c.Check(err, check.IsNil)
142         c.Check(len(fis), check.Equals, 1)
143         if len(fis) > 0 {
144                 c.Check(fis[0].Size(), check.Equals, int64(3))
145         }
146
147         fis, err = f.Readdir(1)
148         c.Check(err, check.IsNil)
149         c.Check(len(fis), check.Equals, 1)
150         if len(fis) > 0 {
151                 c.Check(fis[0].Size(), check.Equals, int64(3))
152         }
153
154         fis, err = f.Readdir(1)
155         c.Check(len(fis), check.Equals, 0)
156         c.Check(err, check.NotNil)
157         c.Check(err, check.Equals, io.EOF)
158
159         f, err = s.fs.Open("dir1")
160         c.Assert(err, check.IsNil)
161         fis, err = f.Readdir(1)
162         c.Check(len(fis), check.Equals, 1)
163         c.Assert(err, check.IsNil)
164         fis, err = f.Readdir(2)
165         c.Check(len(fis), check.Equals, 1)
166         c.Assert(err, check.IsNil)
167         fis, err = f.Readdir(2)
168         c.Check(len(fis), check.Equals, 0)
169         c.Assert(err, check.Equals, io.EOF)
170 }
171
172 func (s *CollectionFSSuite) TestPathMunge(c *check.C) {
173         for _, path := range []string{".", "/", "./", "///", "/../", "/./.."} {
174                 f, err := s.fs.Open(path)
175                 c.Assert(err, check.IsNil)
176
177                 st, err := f.Stat()
178                 c.Assert(err, check.IsNil)
179                 c.Check(st.Size(), check.Equals, int64(1))
180                 c.Check(st.IsDir(), check.Equals, true)
181         }
182         for _, path := range []string{"/dir1", "dir1", "./dir1", "///dir1//.//", "../dir1/../dir1/"} {
183                 c.Logf("%q", path)
184                 f, err := s.fs.Open(path)
185                 c.Assert(err, check.IsNil)
186
187                 st, err := f.Stat()
188                 c.Assert(err, check.IsNil)
189                 c.Check(st.Size(), check.Equals, int64(2))
190                 c.Check(st.IsDir(), check.Equals, true)
191         }
192 }
193
194 func (s *CollectionFSSuite) TestNotExist(c *check.C) {
195         for _, path := range []string{"/no", "no", "./no", "n/o", "/n/o"} {
196                 f, err := s.fs.Open(path)
197                 c.Assert(f, check.IsNil)
198                 c.Assert(err, check.NotNil)
199                 c.Assert(os.IsNotExist(err), check.Equals, true)
200         }
201 }
202
203 func (s *CollectionFSSuite) TestReadOnlyFile(c *check.C) {
204         f, err := s.fs.OpenFile("/dir1/foo", os.O_RDONLY, 0)
205         c.Assert(err, check.IsNil)
206         st, err := f.Stat()
207         c.Assert(err, check.IsNil)
208         c.Check(st.Size(), check.Equals, int64(3))
209         n, err := f.Write([]byte("bar"))
210         c.Check(n, check.Equals, 0)
211         c.Check(err, check.Equals, ErrReadOnlyFile)
212 }
213
214 func (s *CollectionFSSuite) TestCreateFile(c *check.C) {
215         f, err := s.fs.OpenFile("/new-file 1", os.O_RDWR|os.O_CREATE, 0)
216         c.Assert(err, check.IsNil)
217         st, err := f.Stat()
218         c.Assert(err, check.IsNil)
219         c.Check(st.Size(), check.Equals, int64(0))
220
221         n, err := f.Write([]byte("bar"))
222         c.Check(n, check.Equals, 3)
223         c.Check(err, check.IsNil)
224
225         c.Check(f.Close(), check.IsNil)
226
227         f, err = s.fs.OpenFile("/new-file 1", os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
228         c.Check(f, check.IsNil)
229         c.Assert(err, check.NotNil)
230
231         f, err = s.fs.OpenFile("/new-file 1", os.O_RDWR, 0)
232         c.Assert(err, check.IsNil)
233         st, err = f.Stat()
234         c.Assert(err, check.IsNil)
235         c.Check(st.Size(), check.Equals, int64(3))
236
237         c.Check(f.Close(), check.IsNil)
238
239         m, err := s.fs.MarshalManifest(".")
240         c.Assert(err, check.IsNil)
241         c.Check(m, check.Matches, `. 37b51d194a7513e45b56f6524f2d51f2\+3\+\S+ 0:3:new-file\\0401\n./dir1 .* 3:3:bar 0:3:foo\n`)
242 }
243
244 func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
245         maxBlockSize = 8
246         defer func() { maxBlockSize = 2 << 26 }()
247
248         f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
249         c.Assert(err, check.IsNil)
250         defer f.Close()
251         st, err := f.Stat()
252         c.Assert(err, check.IsNil)
253         c.Check(st.Size(), check.Equals, int64(3))
254
255         f2, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
256         c.Assert(err, check.IsNil)
257         defer f2.Close()
258
259         buf := make([]byte, 64)
260         n, err := f.Read(buf)
261         c.Check(n, check.Equals, 3)
262         c.Check(err, check.Equals, io.EOF)
263         c.Check(string(buf[:3]), check.DeepEquals, "foo")
264
265         pos, err := f.Seek(-2, io.SeekCurrent)
266         c.Check(pos, check.Equals, int64(1))
267         c.Check(err, check.IsNil)
268
269         // Split a storedExtent in two, and insert a memExtent
270         n, err = f.Write([]byte("*"))
271         c.Check(n, check.Equals, 1)
272         c.Check(err, check.IsNil)
273
274         pos, err = f.Seek(0, io.SeekCurrent)
275         c.Check(pos, check.Equals, int64(2))
276         c.Check(err, check.IsNil)
277
278         pos, err = f.Seek(0, io.SeekStart)
279         c.Check(pos, check.Equals, int64(0))
280         c.Check(err, check.IsNil)
281
282         rbuf, err := ioutil.ReadAll(f)
283         c.Check(len(rbuf), check.Equals, 3)
284         c.Check(err, check.IsNil)
285         c.Check(string(rbuf), check.Equals, "f*o")
286
287         // Write multiple blocks in one call
288         f.Seek(1, io.SeekStart)
289         n, err = f.Write([]byte("0123456789abcdefg"))
290         c.Check(n, check.Equals, 17)
291         c.Check(err, check.IsNil)
292         pos, err = f.Seek(0, io.SeekCurrent)
293         c.Check(pos, check.Equals, int64(18))
294         c.Check(err, check.IsNil)
295         pos, err = f.Seek(-18, io.SeekCurrent)
296         c.Check(pos, check.Equals, int64(0))
297         c.Check(err, check.IsNil)
298         n, err = io.ReadFull(f, buf)
299         c.Check(n, check.Equals, 18)
300         c.Check(err, check.Equals, io.ErrUnexpectedEOF)
301         c.Check(string(buf[:n]), check.Equals, "f0123456789abcdefg")
302
303         buf2, err := ioutil.ReadAll(f2)
304         c.Check(err, check.IsNil)
305         c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
306
307         // truncate to current size
308         err = f.Truncate(18)
309         c.Check(err, check.IsNil)
310         f2.Seek(0, io.SeekStart)
311         buf2, err = ioutil.ReadAll(f2)
312         c.Check(err, check.IsNil)
313         c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
314
315         // shrink to zero some data
316         f.Truncate(15)
317         f2.Seek(0, io.SeekStart)
318         buf2, err = ioutil.ReadAll(f2)
319         c.Check(err, check.IsNil)
320         c.Check(string(buf2), check.Equals, "f0123456789abcd")
321
322         // grow to partial block/extent
323         f.Truncate(20)
324         f2.Seek(0, io.SeekStart)
325         buf2, err = ioutil.ReadAll(f2)
326         c.Check(err, check.IsNil)
327         c.Check(string(buf2), check.Equals, "f0123456789abcd\x00\x00\x00\x00\x00")
328
329         f.Truncate(0)
330         f2.Seek(0, io.SeekStart)
331         f2.Write([]byte("12345678abcdefghijkl"))
332
333         // grow to block/extent boundary
334         f.Truncate(64)
335         f2.Seek(0, io.SeekStart)
336         buf2, err = ioutil.ReadAll(f2)
337         c.Check(err, check.IsNil)
338         c.Check(len(buf2), check.Equals, 64)
339         c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 8)
340
341         // shrink to block/extent boundary
342         err = f.Truncate(32)
343         c.Check(err, check.IsNil)
344         f2.Seek(0, io.SeekStart)
345         buf2, err = ioutil.ReadAll(f2)
346         c.Check(err, check.IsNil)
347         c.Check(len(buf2), check.Equals, 32)
348         c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 4)
349
350         // shrink to partial block/extent
351         err = f.Truncate(15)
352         c.Check(err, check.IsNil)
353         f2.Seek(0, io.SeekStart)
354         buf2, err = ioutil.ReadAll(f2)
355         c.Check(err, check.IsNil)
356         c.Check(string(buf2), check.Equals, "12345678abcdefg")
357         c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 2)
358
359         // Force flush to ensure the block "12345678" gets stored, so
360         // we know what to expect in the final manifest below.
361         _, err = s.fs.MarshalManifest(".")
362         c.Check(err, check.IsNil)
363
364         // Truncate to size=3 while f2's ptr is at 15
365         err = f.Truncate(3)
366         c.Check(err, check.IsNil)
367         buf2, err = ioutil.ReadAll(f2)
368         c.Check(err, check.IsNil)
369         c.Check(string(buf2), check.Equals, "")
370         f2.Seek(0, io.SeekStart)
371         buf2, err = ioutil.ReadAll(f2)
372         c.Check(err, check.IsNil)
373         c.Check(string(buf2), check.Equals, "123")
374         c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 1)
375
376         m, err := s.fs.MarshalManifest(".")
377         c.Check(err, check.IsNil)
378         m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
379         c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 25d55ad283aa400af464c76d713c07ad+8 3:3:bar 6:3:foo\n")
380         c.Check(s.fs.Size(), check.Equals, int64(6))
381 }
382
383 func (s *CollectionFSSuite) TestSeekSparse(c *check.C) {
384         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
385         c.Assert(err, check.IsNil)
386         f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755)
387         c.Assert(err, check.IsNil)
388         defer f.Close()
389
390         checkSize := func(size int64) {
391                 fi, err := f.Stat()
392                 c.Assert(err, check.IsNil)
393                 c.Check(fi.Size(), check.Equals, size)
394
395                 f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755)
396                 c.Assert(err, check.IsNil)
397                 defer f.Close()
398                 fi, err = f.Stat()
399                 c.Check(err, check.IsNil)
400                 c.Check(fi.Size(), check.Equals, size)
401                 pos, err := f.Seek(0, io.SeekEnd)
402                 c.Check(err, check.IsNil)
403                 c.Check(pos, check.Equals, size)
404         }
405
406         f.Seek(2, io.SeekEnd)
407         checkSize(0)
408         f.Write([]byte{1})
409         checkSize(3)
410
411         f.Seek(2, io.SeekCurrent)
412         checkSize(3)
413         f.Write([]byte{})
414         checkSize(5)
415
416         f.Seek(8, io.SeekStart)
417         checkSize(5)
418         n, err := f.Read(make([]byte, 1))
419         c.Check(n, check.Equals, 0)
420         c.Check(err, check.Equals, io.EOF)
421         checkSize(5)
422         f.Write([]byte{1, 2, 3})
423         checkSize(11)
424 }
425
426 func (s *CollectionFSSuite) TestMarshalCopiesRemoteBlocks(c *check.C) {
427         foo := "foo"
428         bar := "bar"
429         hash := map[string]string{
430                 foo: fmt.Sprintf("%x", md5.Sum([]byte(foo))),
431                 bar: fmt.Sprintf("%x", md5.Sum([]byte(bar))),
432         }
433
434         fs, err := (&Collection{
435                 ManifestText: ". " + hash[foo] + "+3+Rzaaaa-foo@bab " + hash[bar] + "+3+A12345@ffffff 0:2:fo.txt 2:4:obar.txt\n",
436         }).FileSystem(s.client, s.kc)
437         c.Assert(err, check.IsNil)
438         manifest, err := fs.MarshalManifest(".")
439         c.Check(manifest, check.Equals, "")
440         c.Check(err, check.NotNil)
441
442         s.kc.refreshable = map[string]bool{hash[bar]: true}
443
444         for _, sigIn := range []string{"Rzaaaa-foo@bab", "A12345@abcde"} {
445                 fs, err = (&Collection{
446                         ManifestText: ". " + hash[foo] + "+3+A12345@fffff " + hash[bar] + "+3+" + sigIn + " 0:2:fo.txt 2:4:obar.txt\n",
447                 }).FileSystem(s.client, s.kc)
448                 c.Assert(err, check.IsNil)
449                 manifest, err := fs.MarshalManifest(".")
450                 c.Check(err, check.IsNil)
451                 // Both blocks should now have +A signatures.
452                 c.Check(manifest, check.Matches, `.*\+A.* .*\+A.*\n`)
453                 c.Check(manifest, check.Not(check.Matches), `.*\+R.*\n`)
454         }
455 }
456
457 func (s *CollectionFSSuite) TestMarshalSmallBlocks(c *check.C) {
458         maxBlockSize = 8
459         defer func() { maxBlockSize = 2 << 26 }()
460
461         var err error
462         s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
463         c.Assert(err, check.IsNil)
464         for _, name := range []string{"foo", "bar", "baz"} {
465                 f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
466                 c.Assert(err, check.IsNil)
467                 f.Write([]byte(name))
468                 f.Close()
469         }
470
471         m, err := s.fs.MarshalManifest(".")
472         c.Check(err, check.IsNil)
473         m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
474         c.Check(m, check.Equals, ". c3c23db5285662ef7172373df0003206+6 acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:bar 3:3:baz 6:3:foo\n")
475 }
476
477 func (s *CollectionFSSuite) TestMkdir(c *check.C) {
478         err := s.fs.Mkdir("foo/bar", 0755)
479         c.Check(err, check.Equals, os.ErrNotExist)
480
481         f, err := s.fs.OpenFile("foo/bar", os.O_CREATE, 0)
482         c.Check(err, check.Equals, os.ErrNotExist)
483
484         err = s.fs.Mkdir("foo", 0755)
485         c.Check(err, check.IsNil)
486
487         f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_WRONLY, 0)
488         c.Check(err, check.IsNil)
489         if err == nil {
490                 defer f.Close()
491                 f.Write([]byte("foo"))
492         }
493
494         // mkdir fails if a file already exists with that name
495         err = s.fs.Mkdir("foo/bar", 0755)
496         c.Check(err, check.NotNil)
497
498         err = s.fs.Remove("foo/bar")
499         c.Check(err, check.IsNil)
500
501         // mkdir succeeds after the file is deleted
502         err = s.fs.Mkdir("foo/bar", 0755)
503         c.Check(err, check.IsNil)
504
505         // creating a file in a nonexistent subdir should still fail
506         f, err = s.fs.OpenFile("foo/bar/baz/foo.txt", os.O_CREATE|os.O_WRONLY, 0)
507         c.Check(err, check.Equals, os.ErrNotExist)
508
509         f, err = s.fs.OpenFile("foo/bar/foo.txt", os.O_CREATE|os.O_WRONLY, 0)
510         c.Check(err, check.IsNil)
511         if err == nil {
512                 defer f.Close()
513                 f.Write([]byte("foo"))
514         }
515
516         // creating foo/bar as a regular file should fail
517         f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_EXCL, 0)
518         c.Check(err, check.NotNil)
519
520         // creating foo/bar as a directory should fail
521         f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_EXCL, os.ModeDir)
522         c.Check(err, check.NotNil)
523         err = s.fs.Mkdir("foo/bar", 0755)
524         c.Check(err, check.NotNil)
525
526         m, err := s.fs.MarshalManifest(".")
527         c.Check(err, check.IsNil)
528         m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
529         c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 3:3:bar 0:3:foo\n./foo/bar acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo.txt\n")
530 }
531
532 func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
533         if testing.Short() {
534                 c.Skip("slow")
535         }
536
537         maxBlockSize = 8
538         defer func() { maxBlockSize = 1 << 26 }()
539
540         var wg sync.WaitGroup
541         for n := 0; n < 128; n++ {
542                 wg.Add(1)
543                 go func() {
544                         defer wg.Done()
545                         f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
546                         c.Assert(err, check.IsNil)
547                         defer f.Close()
548                         for i := 0; i < 1024; i++ {
549                                 r := rand.Uint32()
550                                 switch {
551                                 case r%11 == 0:
552                                         _, err := s.fs.MarshalManifest(".")
553                                         c.Check(err, check.IsNil)
554                                 case r&3 == 0:
555                                         f.Truncate(int64(rand.Intn(64)))
556                                 case r&3 == 1:
557                                         f.Seek(int64(rand.Intn(64)), io.SeekStart)
558                                 case r&3 == 2:
559                                         _, err := f.Write([]byte("beep boop"))
560                                         c.Check(err, check.IsNil)
561                                 case r&3 == 3:
562                                         _, err := ioutil.ReadAll(f)
563                                         c.Check(err, check.IsNil)
564                                 }
565                         }
566                 }()
567         }
568         wg.Wait()
569
570         f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
571         c.Assert(err, check.IsNil)
572         defer f.Close()
573         buf, err := ioutil.ReadAll(f)
574         c.Check(err, check.IsNil)
575         c.Logf("after lots of random r/w/seek/trunc, buf is %q", buf)
576 }
577
578 func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
579         maxBlockSize = 40
580         defer func() { maxBlockSize = 2 << 26 }()
581
582         var err error
583         s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
584         c.Assert(err, check.IsNil)
585
586         const nfiles = 256
587         const ngoroutines = 256
588
589         var wg sync.WaitGroup
590         for n := 0; n < ngoroutines; n++ {
591                 wg.Add(1)
592                 go func(n int) {
593                         defer wg.Done()
594                         expect := make([]byte, 0, 64)
595                         wbytes := []byte("there's no simple explanation for anything important that any of us do")
596                         f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
597                         c.Assert(err, check.IsNil)
598                         defer f.Close()
599                         for i := 0; i < nfiles; i++ {
600                                 trunc := rand.Intn(65)
601                                 woff := rand.Intn(trunc + 1)
602                                 wbytes = wbytes[:rand.Intn(64-woff+1)]
603                                 for buf, i := expect[:cap(expect)], len(expect); i < trunc; i++ {
604                                         buf[i] = 0
605                                 }
606                                 expect = expect[:trunc]
607                                 if trunc < woff+len(wbytes) {
608                                         expect = expect[:woff+len(wbytes)]
609                                 }
610                                 copy(expect[woff:], wbytes)
611                                 f.Truncate(int64(trunc))
612                                 pos, err := f.Seek(int64(woff), io.SeekStart)
613                                 c.Check(pos, check.Equals, int64(woff))
614                                 c.Check(err, check.IsNil)
615                                 n, err := f.Write(wbytes)
616                                 c.Check(n, check.Equals, len(wbytes))
617                                 c.Check(err, check.IsNil)
618                                 pos, err = f.Seek(0, io.SeekStart)
619                                 c.Check(pos, check.Equals, int64(0))
620                                 c.Check(err, check.IsNil)
621                                 buf, err := ioutil.ReadAll(f)
622                                 c.Check(string(buf), check.Equals, string(expect))
623                                 c.Check(err, check.IsNil)
624                         }
625                 }(n)
626         }
627         wg.Wait()
628
629         for n := 0; n < ngoroutines; n++ {
630                 f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDONLY, 0)
631                 c.Assert(err, check.IsNil)
632                 f.(*filehandle).inode.(*filenode).waitPrune()
633                 s.checkMemSize(c, f)
634                 defer f.Close()
635         }
636
637         root, err := s.fs.Open("/")
638         c.Assert(err, check.IsNil)
639         defer root.Close()
640         fi, err := root.Readdir(-1)
641         c.Check(err, check.IsNil)
642         c.Check(len(fi), check.Equals, nfiles)
643
644         _, err = s.fs.MarshalManifest(".")
645         c.Check(err, check.IsNil)
646         // TODO: check manifest content
647 }
648
649 func (s *CollectionFSSuite) TestRemove(c *check.C) {
650         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
651         c.Assert(err, check.IsNil)
652         err = fs.Mkdir("dir0", 0755)
653         c.Assert(err, check.IsNil)
654         err = fs.Mkdir("dir1", 0755)
655         c.Assert(err, check.IsNil)
656         err = fs.Mkdir("dir1/dir2", 0755)
657         c.Assert(err, check.IsNil)
658         err = fs.Mkdir("dir1/dir3", 0755)
659         c.Assert(err, check.IsNil)
660
661         err = fs.Remove("dir0")
662         c.Check(err, check.IsNil)
663         err = fs.Remove("dir0")
664         c.Check(err, check.Equals, os.ErrNotExist)
665
666         err = fs.Remove("dir1/dir2/.")
667         c.Check(err, check.Equals, ErrInvalidArgument)
668         err = fs.Remove("dir1/dir2/..")
669         c.Check(err, check.Equals, ErrInvalidArgument)
670         err = fs.Remove("dir1")
671         c.Check(err, check.Equals, ErrDirectoryNotEmpty)
672         err = fs.Remove("dir1/dir2/../../../dir1")
673         c.Check(err, check.Equals, ErrDirectoryNotEmpty)
674         err = fs.Remove("dir1/dir3/")
675         c.Check(err, check.IsNil)
676         err = fs.RemoveAll("dir1")
677         c.Check(err, check.IsNil)
678         err = fs.RemoveAll("dir1")
679         c.Check(err, check.IsNil)
680 }
681
682 func (s *CollectionFSSuite) TestRenameError(c *check.C) {
683         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
684         c.Assert(err, check.IsNil)
685         err = fs.Mkdir("first", 0755)
686         c.Assert(err, check.IsNil)
687         err = fs.Mkdir("first/second", 0755)
688         c.Assert(err, check.IsNil)
689         f, err := fs.OpenFile("first/second/file", os.O_CREATE|os.O_WRONLY, 0755)
690         c.Assert(err, check.IsNil)
691         f.Write([]byte{1, 2, 3, 4, 5})
692         f.Close()
693         err = fs.Rename("first", "first/second/third")
694         c.Check(err, check.Equals, ErrInvalidArgument)
695         err = fs.Rename("first", "first/third")
696         c.Check(err, check.Equals, ErrInvalidArgument)
697         err = fs.Rename("first/second", "second")
698         c.Check(err, check.IsNil)
699         f, err = fs.OpenFile("second/file", 0, 0)
700         c.Assert(err, check.IsNil)
701         data, err := ioutil.ReadAll(f)
702         c.Check(err, check.IsNil)
703         c.Check(data, check.DeepEquals, []byte{1, 2, 3, 4, 5})
704 }
705
706 func (s *CollectionFSSuite) TestRenameDirectory(c *check.C) {
707         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
708         c.Assert(err, check.IsNil)
709         err = fs.Mkdir("foo", 0755)
710         c.Assert(err, check.IsNil)
711         err = fs.Mkdir("bar", 0755)
712         c.Assert(err, check.IsNil)
713         err = fs.Rename("bar", "baz")
714         c.Check(err, check.IsNil)
715         err = fs.Rename("foo", "baz")
716         c.Check(err, check.NotNil)
717         err = fs.Rename("foo", "baz/")
718         c.Check(err, check.IsNil)
719         err = fs.Rename("baz/foo", ".")
720         c.Check(err, check.Equals, ErrInvalidArgument)
721         err = fs.Rename("baz/foo/", ".")
722         c.Check(err, check.Equals, ErrInvalidArgument)
723 }
724
725 func (s *CollectionFSSuite) TestRename(c *check.C) {
726         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
727         c.Assert(err, check.IsNil)
728         const (
729                 outer = 16
730                 inner = 16
731         )
732         for i := 0; i < outer; i++ {
733                 err = fs.Mkdir(fmt.Sprintf("dir%d", i), 0755)
734                 c.Assert(err, check.IsNil)
735                 for j := 0; j < inner; j++ {
736                         err = fs.Mkdir(fmt.Sprintf("dir%d/dir%d", i, j), 0755)
737                         c.Assert(err, check.IsNil)
738                         for _, fnm := range []string{
739                                 fmt.Sprintf("dir%d/file%d", i, j),
740                                 fmt.Sprintf("dir%d/dir%d/file%d", i, j, j),
741                         } {
742                                 f, err := fs.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0755)
743                                 c.Assert(err, check.IsNil)
744                                 _, err = f.Write([]byte("beep"))
745                                 c.Assert(err, check.IsNil)
746                                 f.Close()
747                         }
748                 }
749         }
750         var wg sync.WaitGroup
751         for i := 0; i < outer; i++ {
752                 for j := 0; j < inner; j++ {
753                         wg.Add(1)
754                         go func(i, j int) {
755                                 defer wg.Done()
756                                 oldname := fmt.Sprintf("dir%d/dir%d/file%d", i, j, j)
757                                 newname := fmt.Sprintf("dir%d/newfile%d", i, inner-j-1)
758                                 _, err := fs.Open(newname)
759                                 c.Check(err, check.Equals, os.ErrNotExist)
760                                 err = fs.Rename(oldname, newname)
761                                 c.Check(err, check.IsNil)
762                                 f, err := fs.Open(newname)
763                                 c.Check(err, check.IsNil)
764                                 f.Close()
765                         }(i, j)
766
767                         wg.Add(1)
768                         go func(i, j int) {
769                                 defer wg.Done()
770                                 // oldname does not exist
771                                 err := fs.Rename(
772                                         fmt.Sprintf("dir%d/dir%d/missing", i, j),
773                                         fmt.Sprintf("dir%d/dir%d/file%d", outer-i-1, j, j))
774                                 c.Check(err, check.ErrorMatches, `.*does not exist`)
775
776                                 // newname parent dir does not exist
777                                 err = fs.Rename(
778                                         fmt.Sprintf("dir%d/dir%d", i, j),
779                                         fmt.Sprintf("dir%d/missing/irrelevant", outer-i-1))
780                                 c.Check(err, check.ErrorMatches, `.*does not exist`)
781
782                                 // oldname parent dir is a file
783                                 err = fs.Rename(
784                                         fmt.Sprintf("dir%d/file%d/patherror", i, j),
785                                         fmt.Sprintf("dir%d/irrelevant", i))
786                                 c.Check(err, check.ErrorMatches, `.*not a directory`)
787
788                                 // newname parent dir is a file
789                                 err = fs.Rename(
790                                         fmt.Sprintf("dir%d/dir%d/file%d", i, j, j),
791                                         fmt.Sprintf("dir%d/file%d/patherror", i, inner-j-1))
792                                 c.Check(err, check.ErrorMatches, `.*not a directory`)
793                         }(i, j)
794                 }
795         }
796         wg.Wait()
797
798         f, err := fs.OpenFile("dir1/newfile3", 0, 0)
799         c.Assert(err, check.IsNil)
800         c.Check(f.Size(), check.Equals, int64(4))
801         buf, err := ioutil.ReadAll(f)
802         c.Check(buf, check.DeepEquals, []byte("beep"))
803         c.Check(err, check.IsNil)
804         _, err = fs.Open("dir1/dir1/file1")
805         c.Check(err, check.Equals, os.ErrNotExist)
806 }
807
808 func (s *CollectionFSSuite) TestPersist(c *check.C) {
809         maxBlockSize = 1024
810         defer func() { maxBlockSize = 2 << 26 }()
811
812         var err error
813         s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
814         c.Assert(err, check.IsNil)
815         err = s.fs.Mkdir("d:r", 0755)
816         c.Assert(err, check.IsNil)
817
818         expect := map[string][]byte{}
819
820         var wg sync.WaitGroup
821         for _, name := range []string{"random 1", "random:2", "random\\3", "d:r/random4"} {
822                 buf := make([]byte, 500)
823                 rand.Read(buf)
824                 expect[name] = buf
825
826                 f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
827                 c.Assert(err, check.IsNil)
828                 // Note: we don't close the file until after the test
829                 // is done. Writes to unclosed files should persist.
830                 defer f.Close()
831
832                 wg.Add(1)
833                 go func() {
834                         defer wg.Done()
835                         for i := 0; i < len(buf); i += 5 {
836                                 _, err := f.Write(buf[i : i+5])
837                                 c.Assert(err, check.IsNil)
838                         }
839                 }()
840         }
841         wg.Wait()
842
843         m, err := s.fs.MarshalManifest(".")
844         c.Check(err, check.IsNil)
845         c.Logf("%q", m)
846
847         root, err := s.fs.Open("/")
848         c.Assert(err, check.IsNil)
849         defer root.Close()
850         fi, err := root.Readdir(-1)
851         c.Check(err, check.IsNil)
852         c.Check(len(fi), check.Equals, 4)
853
854         persisted, err := (&Collection{ManifestText: m}).FileSystem(s.client, s.kc)
855         c.Assert(err, check.IsNil)
856
857         root, err = persisted.Open("/")
858         c.Assert(err, check.IsNil)
859         defer root.Close()
860         fi, err = root.Readdir(-1)
861         c.Check(err, check.IsNil)
862         c.Check(len(fi), check.Equals, 4)
863
864         for name, content := range expect {
865                 c.Logf("read %q", name)
866                 f, err := persisted.Open(name)
867                 c.Assert(err, check.IsNil)
868                 defer f.Close()
869                 buf, err := ioutil.ReadAll(f)
870                 c.Check(err, check.IsNil)
871                 c.Check(buf, check.DeepEquals, content)
872         }
873 }
874
875 func (s *CollectionFSSuite) TestPersistEmptyFilesAndDirs(c *check.C) {
876         var err error
877         s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
878         c.Assert(err, check.IsNil)
879         for _, name := range []string{"dir", "dir/zerodir", "empty", "not empty", "not empty/empty", "zero", "zero/zero"} {
880                 err = s.fs.Mkdir(name, 0755)
881                 c.Assert(err, check.IsNil)
882         }
883
884         expect := map[string][]byte{
885                 "0":                nil,
886                 "00":               {},
887                 "one":              {1},
888                 "dir/0":            nil,
889                 "dir/two":          {1, 2},
890                 "dir/zero":         nil,
891                 "dir/zerodir/zero": nil,
892                 "zero/zero/zero":   nil,
893         }
894         for name, data := range expect {
895                 f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
896                 c.Assert(err, check.IsNil)
897                 if data != nil {
898                         _, err := f.Write(data)
899                         c.Assert(err, check.IsNil)
900                 }
901                 f.Close()
902         }
903
904         m, err := s.fs.MarshalManifest(".")
905         c.Check(err, check.IsNil)
906         c.Logf("%q", m)
907
908         persisted, err := (&Collection{ManifestText: m}).FileSystem(s.client, s.kc)
909         c.Assert(err, check.IsNil)
910
911         for name, data := range expect {
912                 _, err = persisted.Open("bogus-" + name)
913                 c.Check(err, check.NotNil)
914
915                 f, err := persisted.Open(name)
916                 c.Assert(err, check.IsNil)
917
918                 if data == nil {
919                         data = []byte{}
920                 }
921                 buf, err := ioutil.ReadAll(f)
922                 c.Check(err, check.IsNil)
923                 c.Check(buf, check.DeepEquals, data)
924         }
925
926         expectDir := map[string]int{
927                 "empty":           0,
928                 "not empty":       1,
929                 "not empty/empty": 0,
930         }
931         for name, expectLen := range expectDir {
932                 _, err := persisted.Open(name + "/bogus")
933                 c.Check(err, check.NotNil)
934
935                 d, err := persisted.Open(name)
936                 defer d.Close()
937                 c.Check(err, check.IsNil)
938                 fi, err := d.Readdir(-1)
939                 c.Check(err, check.IsNil)
940                 c.Check(fi, check.HasLen, expectLen)
941         }
942 }
943
944 func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
945         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
946         c.Assert(err, check.IsNil)
947
948         f, err := fs.OpenFile("missing", os.O_WRONLY, 0)
949         c.Check(f, check.IsNil)
950         c.Check(err, check.ErrorMatches, `file does not exist`)
951
952         f, err = fs.OpenFile("new", os.O_CREATE|os.O_RDONLY, 0)
953         c.Assert(err, check.IsNil)
954         defer f.Close()
955         n, err := f.Write([]byte{1, 2, 3})
956         c.Check(n, check.Equals, 0)
957         c.Check(err, check.ErrorMatches, `read-only file`)
958         n, err = f.Read(make([]byte, 1))
959         c.Check(n, check.Equals, 0)
960         c.Check(err, check.Equals, io.EOF)
961         f, err = fs.OpenFile("new", os.O_RDWR, 0)
962         c.Assert(err, check.IsNil)
963         defer f.Close()
964         _, err = f.Write([]byte{4, 5, 6})
965         c.Check(err, check.IsNil)
966         fi, err := f.Stat()
967         c.Assert(err, check.IsNil)
968         c.Check(fi.Size(), check.Equals, int64(3))
969
970         f, err = fs.OpenFile("new", os.O_TRUNC|os.O_RDWR, 0)
971         c.Assert(err, check.IsNil)
972         defer f.Close()
973         pos, err := f.Seek(0, io.SeekEnd)
974         c.Check(pos, check.Equals, int64(0))
975         c.Check(err, check.IsNil)
976         fi, err = f.Stat()
977         c.Assert(err, check.IsNil)
978         c.Check(fi.Size(), check.Equals, int64(0))
979         fs.Remove("new")
980
981         buf := make([]byte, 64)
982         f, err = fs.OpenFile("append", os.O_EXCL|os.O_CREATE|os.O_RDWR|os.O_APPEND, 0)
983         c.Assert(err, check.IsNil)
984         f.Write([]byte{1, 2, 3})
985         f.Seek(0, io.SeekStart)
986         n, _ = f.Read(buf[:1])
987         c.Check(n, check.Equals, 1)
988         c.Check(buf[:1], check.DeepEquals, []byte{1})
989         pos, err = f.Seek(0, io.SeekCurrent)
990         c.Assert(err, check.IsNil)
991         c.Check(pos, check.Equals, int64(1))
992         f.Write([]byte{4, 5, 6})
993         pos, err = f.Seek(0, io.SeekCurrent)
994         c.Assert(err, check.IsNil)
995         c.Check(pos, check.Equals, int64(6))
996         f.Seek(0, io.SeekStart)
997         n, err = f.Read(buf)
998         c.Check(buf[:n], check.DeepEquals, []byte{1, 2, 3, 4, 5, 6})
999         c.Check(err, check.Equals, io.EOF)
1000         f.Close()
1001
1002         f, err = fs.OpenFile("append", os.O_RDWR|os.O_APPEND, 0)
1003         c.Assert(err, check.IsNil)
1004         pos, err = f.Seek(0, io.SeekCurrent)
1005         c.Check(pos, check.Equals, int64(0))
1006         c.Check(err, check.IsNil)
1007         f.Read(buf[:3])
1008         pos, _ = f.Seek(0, io.SeekCurrent)
1009         c.Check(pos, check.Equals, int64(3))
1010         f.Write([]byte{7, 8, 9})
1011         pos, err = f.Seek(0, io.SeekCurrent)
1012         c.Check(err, check.IsNil)
1013         c.Check(pos, check.Equals, int64(9))
1014         f.Close()
1015
1016         f, err = fs.OpenFile("wronly", os.O_CREATE|os.O_WRONLY, 0)
1017         c.Assert(err, check.IsNil)
1018         n, err = f.Write([]byte{3, 2, 1})
1019         c.Check(n, check.Equals, 3)
1020         c.Check(err, check.IsNil)
1021         pos, _ = f.Seek(0, io.SeekCurrent)
1022         c.Check(pos, check.Equals, int64(3))
1023         pos, _ = f.Seek(0, io.SeekStart)
1024         c.Check(pos, check.Equals, int64(0))
1025         n, err = f.Read(buf)
1026         c.Check(n, check.Equals, 0)
1027         c.Check(err, check.ErrorMatches, `.*O_WRONLY.*`)
1028         f, err = fs.OpenFile("wronly", os.O_RDONLY, 0)
1029         c.Assert(err, check.IsNil)
1030         n, _ = f.Read(buf)
1031         c.Check(buf[:n], check.DeepEquals, []byte{3, 2, 1})
1032
1033         f, err = fs.OpenFile("unsupported", os.O_CREATE|os.O_SYNC, 0)
1034         c.Check(f, check.IsNil)
1035         c.Check(err, check.NotNil)
1036
1037         f, err = fs.OpenFile("append", os.O_RDWR|os.O_WRONLY, 0)
1038         c.Check(f, check.IsNil)
1039         c.Check(err, check.ErrorMatches, `invalid flag.*`)
1040 }
1041
1042 func (s *CollectionFSSuite) TestFlushFullBlocksWritingLongFile(c *check.C) {
1043         defer func(cw, mbs int) {
1044                 concurrentWriters = cw
1045                 maxBlockSize = mbs
1046         }(concurrentWriters, maxBlockSize)
1047         concurrentWriters = 2
1048         maxBlockSize = 1024
1049
1050         proceed := make(chan struct{})
1051         var started, concurrent int32
1052         blk2done := false
1053         s.kc.onPut = func([]byte) {
1054                 atomic.AddInt32(&concurrent, 1)
1055                 switch atomic.AddInt32(&started, 1) {
1056                 case 1:
1057                         // Wait until block 2 starts and finishes, and block 3 starts
1058                         select {
1059                         case <-proceed:
1060                                 c.Check(blk2done, check.Equals, true)
1061                         case <-time.After(time.Second):
1062                                 c.Error("timed out")
1063                         }
1064                 case 2:
1065                         time.Sleep(time.Millisecond)
1066                         blk2done = true
1067                 case 3:
1068                         close(proceed)
1069                 default:
1070                         time.Sleep(time.Millisecond)
1071                 }
1072                 c.Check(atomic.AddInt32(&concurrent, -1) < int32(concurrentWriters), check.Equals, true)
1073         }
1074
1075         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1076         c.Assert(err, check.IsNil)
1077         f, err := fs.OpenFile("50K", os.O_WRONLY|os.O_CREATE, 0)
1078         c.Assert(err, check.IsNil)
1079         defer f.Close()
1080
1081         data := make([]byte, 500)
1082         rand.Read(data)
1083
1084         for i := 0; i < 100; i++ {
1085                 n, err := f.Write(data)
1086                 c.Assert(n, check.Equals, len(data))
1087                 c.Assert(err, check.IsNil)
1088         }
1089
1090         currentMemExtents := func() (memExtents []int) {
1091                 for idx, e := range f.(*filehandle).inode.(*filenode).segments {
1092                         switch e.(type) {
1093                         case *memSegment:
1094                                 memExtents = append(memExtents, idx)
1095                         }
1096                 }
1097                 return
1098         }
1099         f.(*filehandle).inode.(*filenode).waitPrune()
1100         c.Check(currentMemExtents(), check.HasLen, 1)
1101
1102         m, err := fs.MarshalManifest(".")
1103         c.Check(m, check.Matches, `[^:]* 0:50000:50K\n`)
1104         c.Check(err, check.IsNil)
1105         c.Check(currentMemExtents(), check.HasLen, 0)
1106 }
1107
1108 // Ensure blocks get flushed to disk if a lot of data is written to
1109 // small files/directories without calling sync().
1110 //
1111 // Write four 512KiB files into each of 256 top-level dirs (total
1112 // 512MiB), calling Flush() every 8 dirs. Ensure memory usage never
1113 // exceeds 24MiB (4 concurrentWriters * 2MiB + 8 unflushed dirs *
1114 // 2MiB).
1115 func (s *CollectionFSSuite) TestFlushAll(c *check.C) {
1116         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1117         c.Assert(err, check.IsNil)
1118
1119         s.kc.onPut = func([]byte) {
1120                 // discard flushed data -- otherwise the stub will use
1121                 // unlimited memory
1122                 time.Sleep(time.Millisecond)
1123                 s.kc.Lock()
1124                 defer s.kc.Unlock()
1125                 s.kc.blocks = map[string][]byte{}
1126         }
1127         for i := 0; i < 256; i++ {
1128                 buf := bytes.NewBuffer(make([]byte, 524288))
1129                 fmt.Fprintf(buf, "test file in dir%d", i)
1130
1131                 dir := fmt.Sprintf("dir%d", i)
1132                 fs.Mkdir(dir, 0755)
1133                 for j := 0; j < 2; j++ {
1134                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1135                         c.Assert(err, check.IsNil)
1136                         defer f.Close()
1137                         _, err = io.Copy(f, buf)
1138                         c.Assert(err, check.IsNil)
1139                 }
1140
1141                 if i%8 == 0 {
1142                         fs.Flush("", true)
1143                 }
1144
1145                 size := fs.memorySize()
1146                 if !c.Check(size <= 1<<24, check.Equals, true) {
1147                         c.Logf("at dir%d fs.memorySize()=%d", i, size)
1148                         return
1149                 }
1150         }
1151 }
1152
1153 // Ensure short blocks at the end of a stream don't get flushed by
1154 // Flush(false).
1155 //
1156 // Write 67x 1MiB files to each of 8 dirs, and check that 8 full 64MiB
1157 // blocks have been flushed while 8x 3MiB is still buffered in memory.
1158 func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
1159         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1160         c.Assert(err, check.IsNil)
1161
1162         var flushed int64
1163         s.kc.onPut = func(p []byte) {
1164                 atomic.AddInt64(&flushed, int64(len(p)))
1165         }
1166
1167         nDirs := int64(8)
1168         megabyte := make([]byte, 1<<20)
1169         for i := int64(0); i < nDirs; i++ {
1170                 dir := fmt.Sprintf("dir%d", i)
1171                 fs.Mkdir(dir, 0755)
1172                 for j := 0; j < 67; j++ {
1173                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1174                         c.Assert(err, check.IsNil)
1175                         defer f.Close()
1176                         _, err = f.Write(megabyte)
1177                         c.Assert(err, check.IsNil)
1178                 }
1179         }
1180         c.Check(fs.memorySize(), check.Equals, int64(nDirs*67<<20))
1181         c.Check(flushed, check.Equals, int64(0))
1182
1183         waitForFlush := func(expectUnflushed, expectFlushed int64) {
1184                 for deadline := time.Now().Add(5 * time.Second); fs.memorySize() > expectUnflushed && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
1185                 }
1186                 c.Check(fs.memorySize(), check.Equals, expectUnflushed)
1187                 c.Check(flushed, check.Equals, expectFlushed)
1188         }
1189
1190         // Nothing flushed yet
1191         waitForFlush((nDirs*67)<<20, 0)
1192
1193         // Flushing a non-empty dir "/" is non-recursive and there are
1194         // no top-level files, so this has no effect
1195         fs.Flush("/", false)
1196         waitForFlush((nDirs*67)<<20, 0)
1197
1198         // Flush the full block in dir0
1199         fs.Flush("dir0", false)
1200         waitForFlush((nDirs*67-64)<<20, 64<<20)
1201
1202         err = fs.Flush("dir-does-not-exist", false)
1203         c.Check(err, check.NotNil)
1204
1205         // Flush full blocks in all dirs
1206         fs.Flush("", false)
1207         waitForFlush(nDirs*3<<20, nDirs*64<<20)
1208
1209         // Flush non-full blocks, too
1210         fs.Flush("", true)
1211         waitForFlush(0, nDirs*67<<20)
1212 }
1213
1214 // Even when writing lots of files/dirs from different goroutines, as
1215 // long as Flush(dir,false) is called after writing each file,
1216 // unflushed data should be limited to one full block per
1217 // concurrentWriter, plus one nearly-full block at the end of each
1218 // dir/stream.
1219 func (s *CollectionFSSuite) TestMaxUnflushed(c *check.C) {
1220         nDirs := int64(8)
1221         maxUnflushed := (int64(concurrentWriters) + nDirs) << 26
1222
1223         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1224         c.Assert(err, check.IsNil)
1225
1226         release := make(chan struct{})
1227         timeout := make(chan struct{})
1228         time.AfterFunc(10*time.Second, func() { close(timeout) })
1229         var putCount, concurrency int64
1230         var unflushed int64
1231         s.kc.onPut = func(p []byte) {
1232                 defer atomic.AddInt64(&unflushed, -int64(len(p)))
1233                 cur := atomic.AddInt64(&concurrency, 1)
1234                 defer atomic.AddInt64(&concurrency, -1)
1235                 pc := atomic.AddInt64(&putCount, 1)
1236                 if pc < int64(concurrentWriters) {
1237                         // Block until we reach concurrentWriters, to
1238                         // make sure we're really accepting concurrent
1239                         // writes.
1240                         select {
1241                         case <-release:
1242                         case <-timeout:
1243                                 c.Error("timeout")
1244                         }
1245                 } else if pc == int64(concurrentWriters) {
1246                         // Unblock the first N-1 PUT reqs.
1247                         close(release)
1248                 }
1249                 c.Assert(cur <= int64(concurrentWriters), check.Equals, true)
1250                 c.Assert(atomic.LoadInt64(&unflushed) <= maxUnflushed, check.Equals, true)
1251         }
1252
1253         var owg sync.WaitGroup
1254         megabyte := make([]byte, 1<<20)
1255         for i := int64(0); i < nDirs; i++ {
1256                 dir := fmt.Sprintf("dir%d", i)
1257                 fs.Mkdir(dir, 0755)
1258                 owg.Add(1)
1259                 go func() {
1260                         defer owg.Done()
1261                         defer fs.Flush(dir, true)
1262                         var iwg sync.WaitGroup
1263                         defer iwg.Wait()
1264                         for j := 0; j < 67; j++ {
1265                                 iwg.Add(1)
1266                                 go func(j int) {
1267                                         defer iwg.Done()
1268                                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1269                                         c.Assert(err, check.IsNil)
1270                                         defer f.Close()
1271                                         n, err := f.Write(megabyte)
1272                                         c.Assert(err, check.IsNil)
1273                                         atomic.AddInt64(&unflushed, int64(n))
1274                                         fs.Flush(dir, false)
1275                                 }(j)
1276                         }
1277                 }()
1278         }
1279         owg.Wait()
1280         fs.Flush("", true)
1281 }
1282
1283 func (s *CollectionFSSuite) TestFlushShort(c *check.C) {
1284         s.kc.onPut = func([]byte) {
1285                 s.kc.Lock()
1286                 s.kc.blocks = map[string][]byte{}
1287                 s.kc.Unlock()
1288         }
1289         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1290         c.Assert(err, check.IsNil)
1291         for _, blocksize := range []int{8, 1000000} {
1292                 dir := fmt.Sprintf("dir%d", blocksize)
1293                 err = fs.Mkdir(dir, 0755)
1294                 c.Assert(err, check.IsNil)
1295                 data := make([]byte, blocksize)
1296                 for i := 0; i < 100; i++ {
1297                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, i), os.O_WRONLY|os.O_CREATE, 0)
1298                         c.Assert(err, check.IsNil)
1299                         _, err = f.Write(data)
1300                         c.Assert(err, check.IsNil)
1301                         f.Close()
1302                         fs.Flush(dir, false)
1303                 }
1304         }
1305 }
1306
1307 func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
1308         for _, txt := range []string{
1309                 "\n",
1310                 ".\n",
1311                 ". \n",
1312                 ". d41d8cd98f00b204e9800998ecf8427e+0\n",
1313                 ". d41d8cd98f00b204e9800998ecf8427e+0 \n",
1314                 ". 0:0:foo\n",
1315                 ".  0:0:foo\n",
1316                 ". 0:0:foo 0:0:bar\n",
1317                 ". d41d8cd98f00b204e9800998ecf8427e 0:0:foo\n",
1318                 ". d41d8cd98f00b204e9800998ecf8427e+0 :0:0:foo\n",
1319                 ". d41d8cd98f00b204e9800998ecf8427e+0 foo:0:foo\n",
1320                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:foo:foo\n",
1321                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:foo 1:1:bar\n",
1322                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\n",
1323                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\\057\\056\n",
1324                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:.\n",
1325                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:..\n",
1326                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:..\n",
1327                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/..\n",
1328                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n",
1329                 "./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n. d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n",
1330         } {
1331                 c.Logf("<-%q", txt)
1332                 fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc)
1333                 c.Check(fs, check.IsNil)
1334                 c.Logf("-> %s", err)
1335                 c.Check(err, check.NotNil)
1336         }
1337 }
1338
1339 func (s *CollectionFSSuite) TestEdgeCaseManifests(c *check.C) {
1340         for _, txt := range []string{
1341                 "",
1342                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo\n",
1343                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:...\n",
1344                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:. 0:0:. 0:0:\\056 0:0:\\056\n",
1345                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/. 0:0:. 0:0:foo\\057bar\\057\\056\n",
1346                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n",
1347                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/bar\n./foo d41d8cd98f00b204e9800998ecf8427e+0 0:0:bar\n",
1348         } {
1349                 c.Logf("<-%q", txt)
1350                 fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc)
1351                 c.Check(err, check.IsNil)
1352                 c.Check(fs, check.NotNil)
1353         }
1354 }
1355
1356 func (s *CollectionFSSuite) checkMemSize(c *check.C, f File) {
1357         fn := f.(*filehandle).inode.(*filenode)
1358         var memsize int64
1359         for _, seg := range fn.segments {
1360                 if e, ok := seg.(*memSegment); ok {
1361                         memsize += int64(len(e.buf))
1362                 }
1363         }
1364         c.Check(fn.memsize, check.Equals, memsize)
1365 }
1366
1367 type CollectionFSUnitSuite struct{}
1368
1369 var _ = check.Suite(&CollectionFSUnitSuite{})
1370
1371 // expect ~2 seconds to load a manifest with 256K files
1372 func (s *CollectionFSUnitSuite) TestLargeManifest(c *check.C) {
1373         if testing.Short() {
1374                 c.Skip("slow")
1375         }
1376
1377         const (
1378                 dirCount  = 512
1379                 fileCount = 512
1380         )
1381
1382         mb := bytes.NewBuffer(make([]byte, 0, 40000000))
1383         for i := 0; i < dirCount; i++ {
1384                 fmt.Fprintf(mb, "./dir%d", i)
1385                 for j := 0; j <= fileCount; j++ {
1386                         fmt.Fprintf(mb, " %032x+42+A%040x@%08x", j, j, j)
1387                 }
1388                 for j := 0; j < fileCount; j++ {
1389                         fmt.Fprintf(mb, " %d:%d:dir%d/file%d", j*42+21, 42, j, j)
1390                 }
1391                 mb.Write([]byte{'\n'})
1392         }
1393         coll := Collection{ManifestText: mb.String()}
1394         c.Logf("%s built", time.Now())
1395
1396         var memstats runtime.MemStats
1397         runtime.ReadMemStats(&memstats)
1398         c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys)
1399
1400         f, err := coll.FileSystem(nil, nil)
1401         c.Check(err, check.IsNil)
1402         c.Logf("%s loaded", time.Now())
1403         c.Check(f.Size(), check.Equals, int64(42*dirCount*fileCount))
1404
1405         for i := 0; i < dirCount; i++ {
1406                 for j := 0; j < fileCount; j++ {
1407                         f.Stat(fmt.Sprintf("./dir%d/dir%d/file%d", i, j, j))
1408                 }
1409         }
1410         c.Logf("%s Stat() x %d", time.Now(), dirCount*fileCount)
1411
1412         runtime.ReadMemStats(&memstats)
1413         c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys)
1414 }
1415
1416 // Gocheck boilerplate
1417 func Test(t *testing.T) {
1418         check.TestingT(t)
1419 }