1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
27 check "gopkg.in/check.v1"
30 var _ = check.Suite(&CollectionFSSuite{})
32 type keepClientStub struct {
33 blocks map[string][]byte
34 refreshable map[string]bool
35 reads []string // locators from ReadAt() calls
36 onWrite func(bufcopy []byte) // called from WriteBlock, before acquiring lock
37 authToken string // client's auth token (used for signing locators)
38 sigkey string // blob signing key
39 sigttl time.Duration // blob signing ttl
43 var errStub404 = errors.New("404 block not found")
45 func (kcs *keepClientStub) ReadAt(locator string, p []byte, off int) (int, error) {
47 kcs.reads = append(kcs.reads, locator)
51 if err := VerifySignature(locator, kcs.authToken, kcs.sigttl, []byte(kcs.sigkey)); err != nil {
54 buf := kcs.blocks[locator[:32]]
58 return copy(p, buf[off:]), nil
61 func (kcs *keepClientStub) BlockWrite(_ context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
63 panic("oops, stub is not made for this")
65 locator := SignLocator(fmt.Sprintf("%x+%d", md5.Sum(opts.Data), len(opts.Data)), kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
66 buf := make([]byte, len(opts.Data))
68 if kcs.onWrite != nil {
71 for _, sc := range opts.StorageClasses {
73 return BlockWriteResponse{}, fmt.Errorf("stub does not write storage class %q", sc)
78 kcs.blocks[locator[:32]] = buf
79 return BlockWriteResponse{Locator: locator, Replicas: 1}, nil
82 var reRemoteSignature = regexp.MustCompile(`\+[AR][^+]*`)
84 func (kcs *keepClientStub) LocalLocator(locator string) (string, error) {
85 if strings.Contains(locator, "+A") {
90 if strings.Contains(locator, "+R") {
91 if len(locator) < 32 {
92 return "", fmt.Errorf("bad locator: %q", locator)
94 if _, ok := kcs.blocks[locator[:32]]; !ok && !kcs.refreshable[locator[:32]] {
95 return "", fmt.Errorf("kcs.refreshable[%q]==false", locator)
98 locator = reRemoteSignature.ReplaceAllLiteralString(locator, "")
99 locator = SignLocator(locator, kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
103 type CollectionFSSuite struct {
106 fs CollectionFileSystem
110 func (s *CollectionFSSuite) SetUpTest(c *check.C) {
111 s.client = NewClientFromEnv()
112 s.client.AuthToken = fixtureActiveToken
113 err := s.client.RequestAndDecode(&s.coll, "GET", "arvados/v1/collections/"+fixtureFooAndBarFilesInDirUUID, nil, nil)
114 c.Assert(err, check.IsNil)
115 s.kc = &keepClientStub{
116 blocks: map[string][]byte{
117 "3858f62230ac3c915f300c664312c63f": []byte("foobar"),
119 sigkey: fixtureBlobSigningKey,
120 sigttl: fixtureBlobSigningTTL,
121 authToken: fixtureActiveToken,
123 s.fs, err = s.coll.FileSystem(s.client, s.kc)
124 c.Assert(err, check.IsNil)
127 func (s *CollectionFSSuite) TestHttpFileSystemInterface(c *check.C) {
128 _, ok := s.fs.(http.FileSystem)
129 c.Check(ok, check.Equals, true)
132 func (s *CollectionFSSuite) TestUnattainableStorageClasses(c *check.C) {
133 fs, err := (&Collection{
134 StorageClassesDesired: []string{"unobtainium"},
135 }).FileSystem(s.client, s.kc)
136 c.Assert(err, check.IsNil)
138 f, err := fs.OpenFile("/foo", os.O_CREATE|os.O_WRONLY, 0777)
139 c.Assert(err, check.IsNil)
140 _, err = f.Write([]byte("food"))
141 c.Assert(err, check.IsNil)
143 c.Assert(err, check.IsNil)
144 _, err = fs.MarshalManifest(".")
145 c.Assert(err, check.ErrorMatches, `.*stub does not write storage class \"unobtainium\"`)
148 func (s *CollectionFSSuite) TestColonInFilename(c *check.C) {
149 fs, err := (&Collection{
150 ManifestText: "./foo:foo 3858f62230ac3c915f300c664312c63f+3 0:3:bar:bar\n",
151 }).FileSystem(s.client, s.kc)
152 c.Assert(err, check.IsNil)
154 f, err := fs.Open("/foo:foo")
155 c.Assert(err, check.IsNil)
157 fis, err := f.Readdir(0)
158 c.Check(err, check.IsNil)
159 c.Check(len(fis), check.Equals, 1)
160 c.Check(fis[0].Name(), check.Equals, "bar:bar")
163 func (s *CollectionFSSuite) TestReaddirFull(c *check.C) {
164 f, err := s.fs.Open("/dir1")
165 c.Assert(err, check.IsNil)
168 c.Assert(err, check.IsNil)
169 c.Check(st.Size(), check.Equals, int64(2))
170 c.Check(st.IsDir(), check.Equals, true)
172 fis, err := f.Readdir(0)
173 c.Check(err, check.IsNil)
174 c.Check(len(fis), check.Equals, 2)
176 c.Check(fis[0].Size(), check.Equals, int64(3))
180 func (s *CollectionFSSuite) TestReaddirLimited(c *check.C) {
181 f, err := s.fs.Open("./dir1")
182 c.Assert(err, check.IsNil)
184 fis, err := f.Readdir(1)
185 c.Check(err, check.IsNil)
186 c.Check(len(fis), check.Equals, 1)
188 c.Check(fis[0].Size(), check.Equals, int64(3))
191 fis, err = f.Readdir(1)
192 c.Check(err, check.IsNil)
193 c.Check(len(fis), check.Equals, 1)
195 c.Check(fis[0].Size(), check.Equals, int64(3))
198 fis, err = f.Readdir(1)
199 c.Check(len(fis), check.Equals, 0)
200 c.Check(err, check.NotNil)
201 c.Check(err, check.Equals, io.EOF)
203 f, err = s.fs.Open("dir1")
204 c.Assert(err, check.IsNil)
205 fis, err = f.Readdir(1)
206 c.Check(len(fis), check.Equals, 1)
207 c.Assert(err, check.IsNil)
208 fis, err = f.Readdir(2)
209 c.Check(len(fis), check.Equals, 1)
210 c.Assert(err, check.IsNil)
211 fis, err = f.Readdir(2)
212 c.Check(len(fis), check.Equals, 0)
213 c.Assert(err, check.Equals, io.EOF)
216 func (s *CollectionFSSuite) TestPathMunge(c *check.C) {
217 for _, path := range []string{".", "/", "./", "///", "/../", "/./.."} {
218 f, err := s.fs.Open(path)
219 c.Assert(err, check.IsNil)
222 c.Assert(err, check.IsNil)
223 c.Check(st.Size(), check.Equals, int64(1))
224 c.Check(st.IsDir(), check.Equals, true)
226 for _, path := range []string{"/dir1", "dir1", "./dir1", "///dir1//.//", "../dir1/../dir1/"} {
228 f, err := s.fs.Open(path)
229 c.Assert(err, check.IsNil)
232 c.Assert(err, check.IsNil)
233 c.Check(st.Size(), check.Equals, int64(2))
234 c.Check(st.IsDir(), check.Equals, true)
238 func (s *CollectionFSSuite) TestNotExist(c *check.C) {
239 for _, path := range []string{"/no", "no", "./no", "n/o", "/n/o"} {
240 f, err := s.fs.Open(path)
241 c.Assert(f, check.IsNil)
242 c.Assert(err, check.NotNil)
243 c.Assert(os.IsNotExist(err), check.Equals, true)
247 func (s *CollectionFSSuite) TestReadOnlyFile(c *check.C) {
248 f, err := s.fs.OpenFile("/dir1/foo", os.O_RDONLY, 0)
249 c.Assert(err, check.IsNil)
251 c.Assert(err, check.IsNil)
252 c.Check(st.Size(), check.Equals, int64(3))
253 n, err := f.Write([]byte("bar"))
254 c.Check(n, check.Equals, 0)
255 c.Check(err, check.Equals, ErrReadOnlyFile)
258 func (s *CollectionFSSuite) TestCreateFile(c *check.C) {
259 f, err := s.fs.OpenFile("/new-file 1", os.O_RDWR|os.O_CREATE, 0)
260 c.Assert(err, check.IsNil)
262 c.Assert(err, check.IsNil)
263 c.Check(st.Size(), check.Equals, int64(0))
265 n, err := f.Write([]byte("bar"))
266 c.Check(n, check.Equals, 3)
267 c.Check(err, check.IsNil)
269 c.Check(f.Close(), check.IsNil)
271 f, err = s.fs.OpenFile("/new-file 1", os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
272 c.Check(f, check.IsNil)
273 c.Assert(err, check.NotNil)
275 f, err = s.fs.OpenFile("/new-file 1", os.O_RDWR, 0)
276 c.Assert(err, check.IsNil)
278 c.Assert(err, check.IsNil)
279 c.Check(st.Size(), check.Equals, int64(3))
281 c.Check(f.Close(), check.IsNil)
283 m, err := s.fs.MarshalManifest(".")
284 c.Assert(err, check.IsNil)
285 c.Check(m, check.Matches, `. 37b51d194a7513e45b56f6524f2d51f2\+3\+\S+ 0:3:new-file\\0401\n./dir1 .* 3:3:bar 0:3:foo\n`)
288 func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
290 defer func() { maxBlockSize = 2 << 26 }()
292 f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
293 c.Assert(err, check.IsNil)
296 c.Assert(err, check.IsNil)
297 c.Check(st.Size(), check.Equals, int64(3))
299 f2, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
300 c.Assert(err, check.IsNil)
303 buf := make([]byte, 64)
304 n, err := f.Read(buf)
305 c.Check(n, check.Equals, 3)
306 c.Check(err, check.Equals, io.EOF)
307 c.Check(string(buf[:3]), check.DeepEquals, "foo")
309 pos, err := f.Seek(-2, io.SeekCurrent)
310 c.Check(pos, check.Equals, int64(1))
311 c.Check(err, check.IsNil)
313 // Split a storedExtent in two, and insert a memExtent
314 n, err = f.Write([]byte("*"))
315 c.Check(n, check.Equals, 1)
316 c.Check(err, check.IsNil)
318 pos, err = f.Seek(0, io.SeekCurrent)
319 c.Check(pos, check.Equals, int64(2))
320 c.Check(err, check.IsNil)
322 pos, err = f.Seek(0, io.SeekStart)
323 c.Check(pos, check.Equals, int64(0))
324 c.Check(err, check.IsNil)
326 rbuf, err := ioutil.ReadAll(f)
327 c.Check(len(rbuf), check.Equals, 3)
328 c.Check(err, check.IsNil)
329 c.Check(string(rbuf), check.Equals, "f*o")
331 // Write multiple blocks in one call
332 f.Seek(1, io.SeekStart)
333 n, err = f.Write([]byte("0123456789abcdefg"))
334 c.Check(n, check.Equals, 17)
335 c.Check(err, check.IsNil)
336 pos, err = f.Seek(0, io.SeekCurrent)
337 c.Check(pos, check.Equals, int64(18))
338 c.Check(err, check.IsNil)
339 pos, err = f.Seek(-18, io.SeekCurrent)
340 c.Check(pos, check.Equals, int64(0))
341 c.Check(err, check.IsNil)
342 n, err = io.ReadFull(f, buf)
343 c.Check(n, check.Equals, 18)
344 c.Check(err, check.Equals, io.ErrUnexpectedEOF)
345 c.Check(string(buf[:n]), check.Equals, "f0123456789abcdefg")
347 buf2, err := ioutil.ReadAll(f2)
348 c.Check(err, check.IsNil)
349 c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
351 // truncate to current size
353 c.Check(err, check.IsNil)
354 f2.Seek(0, io.SeekStart)
355 buf2, err = ioutil.ReadAll(f2)
356 c.Check(err, check.IsNil)
357 c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
359 // shrink to zero some data
361 f2.Seek(0, io.SeekStart)
362 buf2, err = ioutil.ReadAll(f2)
363 c.Check(err, check.IsNil)
364 c.Check(string(buf2), check.Equals, "f0123456789abcd")
366 // grow to partial block/extent
368 f2.Seek(0, io.SeekStart)
369 buf2, err = ioutil.ReadAll(f2)
370 c.Check(err, check.IsNil)
371 c.Check(string(buf2), check.Equals, "f0123456789abcd\x00\x00\x00\x00\x00")
374 f2.Seek(0, io.SeekStart)
375 f2.Write([]byte("12345678abcdefghijkl"))
377 // grow to block/extent boundary
379 f2.Seek(0, io.SeekStart)
380 buf2, err = ioutil.ReadAll(f2)
381 c.Check(err, check.IsNil)
382 c.Check(len(buf2), check.Equals, 64)
383 c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 8)
385 // shrink to block/extent boundary
387 c.Check(err, check.IsNil)
388 f2.Seek(0, io.SeekStart)
389 buf2, err = ioutil.ReadAll(f2)
390 c.Check(err, check.IsNil)
391 c.Check(len(buf2), check.Equals, 32)
392 c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 4)
394 // shrink to partial block/extent
396 c.Check(err, check.IsNil)
397 f2.Seek(0, io.SeekStart)
398 buf2, err = ioutil.ReadAll(f2)
399 c.Check(err, check.IsNil)
400 c.Check(string(buf2), check.Equals, "12345678abcdefg")
401 c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 2)
403 // Force flush to ensure the block "12345678" gets stored, so
404 // we know what to expect in the final manifest below.
405 _, err = s.fs.MarshalManifest(".")
406 c.Check(err, check.IsNil)
408 // Truncate to size=3 while f2's ptr is at 15
410 c.Check(err, check.IsNil)
411 buf2, err = ioutil.ReadAll(f2)
412 c.Check(err, check.IsNil)
413 c.Check(string(buf2), check.Equals, "")
414 f2.Seek(0, io.SeekStart)
415 buf2, err = ioutil.ReadAll(f2)
416 c.Check(err, check.IsNil)
417 c.Check(string(buf2), check.Equals, "123")
418 c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 1)
420 m, err := s.fs.MarshalManifest(".")
421 c.Check(err, check.IsNil)
422 m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
423 c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 25d55ad283aa400af464c76d713c07ad+8 3:3:bar 6:3:foo\n")
424 c.Check(s.fs.Size(), check.Equals, int64(6))
427 func (s *CollectionFSSuite) TestSeekSparse(c *check.C) {
428 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
429 c.Assert(err, check.IsNil)
430 f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755)
431 c.Assert(err, check.IsNil)
434 checkSize := func(size int64) {
436 c.Assert(err, check.IsNil)
437 c.Check(fi.Size(), check.Equals, size)
439 f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755)
440 c.Assert(err, check.IsNil)
443 c.Check(err, check.IsNil)
444 c.Check(fi.Size(), check.Equals, size)
445 pos, err := f.Seek(0, io.SeekEnd)
446 c.Check(err, check.IsNil)
447 c.Check(pos, check.Equals, size)
450 f.Seek(2, io.SeekEnd)
455 f.Seek(2, io.SeekCurrent)
460 f.Seek(8, io.SeekStart)
462 n, err := f.Read(make([]byte, 1))
463 c.Check(n, check.Equals, 0)
464 c.Check(err, check.Equals, io.EOF)
466 f.Write([]byte{1, 2, 3})
470 func (s *CollectionFSSuite) TestMarshalCopiesRemoteBlocks(c *check.C) {
473 hash := map[string]string{
474 foo: fmt.Sprintf("%x", md5.Sum([]byte(foo))),
475 bar: fmt.Sprintf("%x", md5.Sum([]byte(bar))),
478 fs, err := (&Collection{
479 ManifestText: ". " + hash[foo] + "+3+Rzaaaa-foo@bab " + hash[bar] + "+3+A12345@ffffff 0:2:fo.txt 2:4:obar.txt\n",
480 }).FileSystem(s.client, s.kc)
481 c.Assert(err, check.IsNil)
482 manifest, err := fs.MarshalManifest(".")
483 c.Check(manifest, check.Equals, "")
484 c.Check(err, check.NotNil)
486 s.kc.refreshable = map[string]bool{hash[bar]: true}
488 for _, sigIn := range []string{"Rzaaaa-foo@bab", "A12345@abcde"} {
489 fs, err = (&Collection{
490 ManifestText: ". " + hash[foo] + "+3+A12345@fffff " + hash[bar] + "+3+" + sigIn + " 0:2:fo.txt 2:4:obar.txt\n",
491 }).FileSystem(s.client, s.kc)
492 c.Assert(err, check.IsNil)
493 manifest, err := fs.MarshalManifest(".")
494 c.Check(err, check.IsNil)
495 // Both blocks should now have +A signatures.
496 c.Check(manifest, check.Matches, `.*\+A.* .*\+A.*\n`)
497 c.Check(manifest, check.Not(check.Matches), `.*\+R.*\n`)
501 func (s *CollectionFSSuite) TestMarshalSmallBlocks(c *check.C) {
503 defer func() { maxBlockSize = 2 << 26 }()
506 s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
507 c.Assert(err, check.IsNil)
508 for _, name := range []string{"foo", "bar", "baz"} {
509 f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
510 c.Assert(err, check.IsNil)
511 f.Write([]byte(name))
515 m, err := s.fs.MarshalManifest(".")
516 c.Check(err, check.IsNil)
517 m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
518 c.Check(m, check.Equals, ". c3c23db5285662ef7172373df0003206+6 acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:bar 3:3:baz 6:3:foo\n")
521 func (s *CollectionFSSuite) TestMkdir(c *check.C) {
522 err := s.fs.Mkdir("foo/bar", 0755)
523 c.Check(err, check.Equals, os.ErrNotExist)
525 f, err := s.fs.OpenFile("foo/bar", os.O_CREATE, 0)
526 c.Check(err, check.Equals, os.ErrNotExist)
528 err = s.fs.Mkdir("foo", 0755)
529 c.Check(err, check.IsNil)
531 f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_WRONLY, 0)
532 c.Check(err, check.IsNil)
535 f.Write([]byte("foo"))
538 // mkdir fails if a file already exists with that name
539 err = s.fs.Mkdir("foo/bar", 0755)
540 c.Check(err, check.NotNil)
542 err = s.fs.Remove("foo/bar")
543 c.Check(err, check.IsNil)
545 // mkdir succeeds after the file is deleted
546 err = s.fs.Mkdir("foo/bar", 0755)
547 c.Check(err, check.IsNil)
549 // creating a file in a nonexistent subdir should still fail
550 f, err = s.fs.OpenFile("foo/bar/baz/foo.txt", os.O_CREATE|os.O_WRONLY, 0)
551 c.Check(err, check.Equals, os.ErrNotExist)
553 f, err = s.fs.OpenFile("foo/bar/foo.txt", os.O_CREATE|os.O_WRONLY, 0)
554 c.Check(err, check.IsNil)
557 f.Write([]byte("foo"))
560 // creating foo/bar as a regular file should fail
561 f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_EXCL, 0)
562 c.Check(err, check.NotNil)
564 // creating foo/bar as a directory should fail
565 f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_EXCL, os.ModeDir)
566 c.Check(err, check.NotNil)
567 err = s.fs.Mkdir("foo/bar", 0755)
568 c.Check(err, check.NotNil)
570 m, err := s.fs.MarshalManifest(".")
571 c.Check(err, check.IsNil)
572 m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
573 c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 3:3:bar 0:3:foo\n./foo/bar acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo.txt\n")
576 func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
582 defer func() { maxBlockSize = 1 << 26 }()
584 var wg sync.WaitGroup
585 for n := 0; n < 128; n++ {
589 f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
590 c.Assert(err, check.IsNil)
592 for i := 0; i < 1024; i++ {
596 _, err := s.fs.MarshalManifest(".")
597 c.Check(err, check.IsNil)
599 f.Truncate(int64(rand.Intn(64)))
601 f.Seek(int64(rand.Intn(64)), io.SeekStart)
603 _, err := f.Write([]byte("beep boop"))
604 c.Check(err, check.IsNil)
606 _, err := ioutil.ReadAll(f)
607 c.Check(err, check.IsNil)
614 f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
615 c.Assert(err, check.IsNil)
617 buf, err := ioutil.ReadAll(f)
618 c.Check(err, check.IsNil)
619 c.Logf("after lots of random r/w/seek/trunc, buf is %q", buf)
622 func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
624 defer func() { maxBlockSize = 2 << 26 }()
627 s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
628 c.Assert(err, check.IsNil)
631 const ngoroutines = 256
633 var wg sync.WaitGroup
634 for n := 0; n < ngoroutines; n++ {
638 expect := make([]byte, 0, 64)
639 wbytes := []byte("there's no simple explanation for anything important that any of us do")
640 f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
641 c.Assert(err, check.IsNil)
643 for i := 0; i < nfiles; i++ {
644 trunc := rand.Intn(65)
645 woff := rand.Intn(trunc + 1)
646 wbytes = wbytes[:rand.Intn(64-woff+1)]
647 for buf, i := expect[:cap(expect)], len(expect); i < trunc; i++ {
650 expect = expect[:trunc]
651 if trunc < woff+len(wbytes) {
652 expect = expect[:woff+len(wbytes)]
654 copy(expect[woff:], wbytes)
655 f.Truncate(int64(trunc))
656 pos, err := f.Seek(int64(woff), io.SeekStart)
657 c.Check(pos, check.Equals, int64(woff))
658 c.Check(err, check.IsNil)
659 n, err := f.Write(wbytes)
660 c.Check(n, check.Equals, len(wbytes))
661 c.Check(err, check.IsNil)
662 pos, err = f.Seek(0, io.SeekStart)
663 c.Check(pos, check.Equals, int64(0))
664 c.Check(err, check.IsNil)
665 buf, err := ioutil.ReadAll(f)
666 c.Check(string(buf), check.Equals, string(expect))
667 c.Check(err, check.IsNil)
673 for n := 0; n < ngoroutines; n++ {
674 f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDONLY, 0)
675 c.Assert(err, check.IsNil)
676 f.(*filehandle).inode.(*filenode).waitPrune()
681 root, err := s.fs.Open("/")
682 c.Assert(err, check.IsNil)
684 fi, err := root.Readdir(-1)
685 c.Check(err, check.IsNil)
686 c.Check(len(fi), check.Equals, nfiles)
688 _, err = s.fs.MarshalManifest(".")
689 c.Check(err, check.IsNil)
690 // TODO: check manifest content
693 func (s *CollectionFSSuite) TestRemove(c *check.C) {
694 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
695 c.Assert(err, check.IsNil)
696 err = fs.Mkdir("dir0", 0755)
697 c.Assert(err, check.IsNil)
698 err = fs.Mkdir("dir1", 0755)
699 c.Assert(err, check.IsNil)
700 err = fs.Mkdir("dir1/dir2", 0755)
701 c.Assert(err, check.IsNil)
702 err = fs.Mkdir("dir1/dir3", 0755)
703 c.Assert(err, check.IsNil)
705 err = fs.Remove("dir0")
706 c.Check(err, check.IsNil)
707 err = fs.Remove("dir0")
708 c.Check(err, check.Equals, os.ErrNotExist)
710 err = fs.Remove("dir1/dir2/.")
711 c.Check(err, check.Equals, ErrInvalidArgument)
712 err = fs.Remove("dir1/dir2/..")
713 c.Check(err, check.Equals, ErrInvalidArgument)
714 err = fs.Remove("dir1")
715 c.Check(err, check.Equals, ErrDirectoryNotEmpty)
716 err = fs.Remove("dir1/dir2/../../../dir1")
717 c.Check(err, check.Equals, ErrDirectoryNotEmpty)
718 err = fs.Remove("dir1/dir3/")
719 c.Check(err, check.IsNil)
720 err = fs.RemoveAll("dir1")
721 c.Check(err, check.IsNil)
722 err = fs.RemoveAll("dir1")
723 c.Check(err, check.IsNil)
726 func (s *CollectionFSSuite) TestRenameError(c *check.C) {
727 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
728 c.Assert(err, check.IsNil)
729 err = fs.Mkdir("first", 0755)
730 c.Assert(err, check.IsNil)
731 err = fs.Mkdir("first/second", 0755)
732 c.Assert(err, check.IsNil)
733 f, err := fs.OpenFile("first/second/file", os.O_CREATE|os.O_WRONLY, 0755)
734 c.Assert(err, check.IsNil)
735 f.Write([]byte{1, 2, 3, 4, 5})
737 err = fs.Rename("first", "first/second/third")
738 c.Check(err, check.Equals, ErrInvalidArgument)
739 err = fs.Rename("first", "first/third")
740 c.Check(err, check.Equals, ErrInvalidArgument)
741 err = fs.Rename("first/second", "second")
742 c.Check(err, check.IsNil)
743 f, err = fs.OpenFile("second/file", 0, 0)
744 c.Assert(err, check.IsNil)
745 data, err := ioutil.ReadAll(f)
746 c.Check(err, check.IsNil)
747 c.Check(data, check.DeepEquals, []byte{1, 2, 3, 4, 5})
750 func (s *CollectionFSSuite) TestRenameDirectory(c *check.C) {
751 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
752 c.Assert(err, check.IsNil)
753 err = fs.Mkdir("foo", 0755)
754 c.Assert(err, check.IsNil)
755 err = fs.Mkdir("bar", 0755)
756 c.Assert(err, check.IsNil)
757 err = fs.Rename("bar", "baz")
758 c.Check(err, check.IsNil)
759 err = fs.Rename("foo", "baz")
760 c.Check(err, check.NotNil)
761 err = fs.Rename("foo", "baz/")
762 c.Check(err, check.IsNil)
763 err = fs.Rename("baz/foo", ".")
764 c.Check(err, check.Equals, ErrInvalidArgument)
765 err = fs.Rename("baz/foo/", ".")
766 c.Check(err, check.Equals, ErrInvalidArgument)
769 func (s *CollectionFSSuite) TestRename(c *check.C) {
770 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
771 c.Assert(err, check.IsNil)
776 for i := 0; i < outer; i++ {
777 err = fs.Mkdir(fmt.Sprintf("dir%d", i), 0755)
778 c.Assert(err, check.IsNil)
779 for j := 0; j < inner; j++ {
780 err = fs.Mkdir(fmt.Sprintf("dir%d/dir%d", i, j), 0755)
781 c.Assert(err, check.IsNil)
782 for _, fnm := range []string{
783 fmt.Sprintf("dir%d/file%d", i, j),
784 fmt.Sprintf("dir%d/dir%d/file%d", i, j, j),
786 f, err := fs.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0755)
787 c.Assert(err, check.IsNil)
788 _, err = f.Write([]byte("beep"))
789 c.Assert(err, check.IsNil)
794 var wg sync.WaitGroup
795 for i := 0; i < outer; i++ {
796 for j := 0; j < inner; j++ {
800 oldname := fmt.Sprintf("dir%d/dir%d/file%d", i, j, j)
801 newname := fmt.Sprintf("dir%d/newfile%d", i, inner-j-1)
802 _, err := fs.Open(newname)
803 c.Check(err, check.Equals, os.ErrNotExist)
804 err = fs.Rename(oldname, newname)
805 c.Check(err, check.IsNil)
806 f, err := fs.Open(newname)
807 c.Check(err, check.IsNil)
814 // oldname does not exist
816 fmt.Sprintf("dir%d/dir%d/missing", i, j),
817 fmt.Sprintf("dir%d/dir%d/file%d", outer-i-1, j, j))
818 c.Check(err, check.ErrorMatches, `.*does not exist`)
820 // newname parent dir does not exist
822 fmt.Sprintf("dir%d/dir%d", i, j),
823 fmt.Sprintf("dir%d/missing/irrelevant", outer-i-1))
824 c.Check(err, check.ErrorMatches, `.*does not exist`)
826 // oldname parent dir is a file
828 fmt.Sprintf("dir%d/file%d/patherror", i, j),
829 fmt.Sprintf("dir%d/irrelevant", i))
830 c.Check(err, check.ErrorMatches, `.*not a directory`)
832 // newname parent dir is a file
834 fmt.Sprintf("dir%d/dir%d/file%d", i, j, j),
835 fmt.Sprintf("dir%d/file%d/patherror", i, inner-j-1))
836 c.Check(err, check.ErrorMatches, `.*not a directory`)
842 f, err := fs.OpenFile("dir1/newfile3", 0, 0)
843 c.Assert(err, check.IsNil)
844 c.Check(f.Size(), check.Equals, int64(4))
845 buf, err := ioutil.ReadAll(f)
846 c.Check(buf, check.DeepEquals, []byte("beep"))
847 c.Check(err, check.IsNil)
848 _, err = fs.Open("dir1/dir1/file1")
849 c.Check(err, check.Equals, os.ErrNotExist)
852 func (s *CollectionFSSuite) TestPersist(c *check.C) {
854 defer func() { maxBlockSize = 2 << 26 }()
857 s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
858 c.Assert(err, check.IsNil)
859 err = s.fs.Mkdir("d:r", 0755)
860 c.Assert(err, check.IsNil)
862 expect := map[string][]byte{}
864 var wg sync.WaitGroup
865 for _, name := range []string{"random 1", "random:2", "random\\3", "d:r/random4"} {
866 buf := make([]byte, 500)
870 f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
871 c.Assert(err, check.IsNil)
872 // Note: we don't close the file until after the test
873 // is done. Writes to unclosed files should persist.
879 for i := 0; i < len(buf); i += 5 {
880 _, err := f.Write(buf[i : i+5])
881 c.Assert(err, check.IsNil)
887 m, err := s.fs.MarshalManifest(".")
888 c.Check(err, check.IsNil)
891 root, err := s.fs.Open("/")
892 c.Assert(err, check.IsNil)
894 fi, err := root.Readdir(-1)
895 c.Check(err, check.IsNil)
896 c.Check(len(fi), check.Equals, 4)
898 persisted, err := (&Collection{ManifestText: m}).FileSystem(s.client, s.kc)
899 c.Assert(err, check.IsNil)
901 root, err = persisted.Open("/")
902 c.Assert(err, check.IsNil)
904 fi, err = root.Readdir(-1)
905 c.Check(err, check.IsNil)
906 c.Check(len(fi), check.Equals, 4)
908 for name, content := range expect {
909 c.Logf("read %q", name)
910 f, err := persisted.Open(name)
911 c.Assert(err, check.IsNil)
913 buf, err := ioutil.ReadAll(f)
914 c.Check(err, check.IsNil)
915 c.Check(buf, check.DeepEquals, content)
919 func (s *CollectionFSSuite) TestPersistEmptyFilesAndDirs(c *check.C) {
921 s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
922 c.Assert(err, check.IsNil)
923 for _, name := range []string{"dir", "dir/zerodir", "empty", "not empty", "not empty/empty", "zero", "zero/zero"} {
924 err = s.fs.Mkdir(name, 0755)
925 c.Assert(err, check.IsNil)
928 expect := map[string][]byte{
935 "dir/zerodir/zero": nil,
936 "zero/zero/zero": nil,
938 for name, data := range expect {
939 f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
940 c.Assert(err, check.IsNil)
942 _, err := f.Write(data)
943 c.Assert(err, check.IsNil)
948 m, err := s.fs.MarshalManifest(".")
949 c.Check(err, check.IsNil)
952 persisted, err := (&Collection{ManifestText: m}).FileSystem(s.client, s.kc)
953 c.Assert(err, check.IsNil)
955 for name, data := range expect {
956 _, err = persisted.Open("bogus-" + name)
957 c.Check(err, check.NotNil)
959 f, err := persisted.Open(name)
960 c.Assert(err, check.IsNil)
965 buf, err := ioutil.ReadAll(f)
966 c.Check(err, check.IsNil)
967 c.Check(buf, check.DeepEquals, data)
970 expectDir := map[string]int{
973 "not empty/empty": 0,
975 for name, expectLen := range expectDir {
976 _, err := persisted.Open(name + "/bogus")
977 c.Check(err, check.NotNil)
979 d, err := persisted.Open(name)
981 c.Check(err, check.IsNil)
982 fi, err := d.Readdir(-1)
983 c.Check(err, check.IsNil)
984 c.Check(fi, check.HasLen, expectLen)
988 func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
989 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
990 c.Assert(err, check.IsNil)
992 f, err := fs.OpenFile("missing", os.O_WRONLY, 0)
993 c.Check(f, check.IsNil)
994 c.Check(err, check.ErrorMatches, `file does not exist`)
996 f, err = fs.OpenFile("new", os.O_CREATE|os.O_RDONLY, 0)
997 c.Assert(err, check.IsNil)
999 n, err := f.Write([]byte{1, 2, 3})
1000 c.Check(n, check.Equals, 0)
1001 c.Check(err, check.ErrorMatches, `read-only file`)
1002 n, err = f.Read(make([]byte, 1))
1003 c.Check(n, check.Equals, 0)
1004 c.Check(err, check.Equals, io.EOF)
1005 f, err = fs.OpenFile("new", os.O_RDWR, 0)
1006 c.Assert(err, check.IsNil)
1008 _, err = f.Write([]byte{4, 5, 6})
1009 c.Check(err, check.IsNil)
1011 c.Assert(err, check.IsNil)
1012 c.Check(fi.Size(), check.Equals, int64(3))
1014 f, err = fs.OpenFile("new", os.O_TRUNC|os.O_RDWR, 0)
1015 c.Assert(err, check.IsNil)
1017 pos, err := f.Seek(0, io.SeekEnd)
1018 c.Check(pos, check.Equals, int64(0))
1019 c.Check(err, check.IsNil)
1021 c.Assert(err, check.IsNil)
1022 c.Check(fi.Size(), check.Equals, int64(0))
1025 buf := make([]byte, 64)
1026 f, err = fs.OpenFile("append", os.O_EXCL|os.O_CREATE|os.O_RDWR|os.O_APPEND, 0)
1027 c.Assert(err, check.IsNil)
1028 f.Write([]byte{1, 2, 3})
1029 f.Seek(0, io.SeekStart)
1030 n, _ = f.Read(buf[:1])
1031 c.Check(n, check.Equals, 1)
1032 c.Check(buf[:1], check.DeepEquals, []byte{1})
1033 pos, err = f.Seek(0, io.SeekCurrent)
1034 c.Assert(err, check.IsNil)
1035 c.Check(pos, check.Equals, int64(1))
1036 f.Write([]byte{4, 5, 6})
1037 pos, err = f.Seek(0, io.SeekCurrent)
1038 c.Assert(err, check.IsNil)
1039 c.Check(pos, check.Equals, int64(6))
1040 f.Seek(0, io.SeekStart)
1041 n, err = f.Read(buf)
1042 c.Check(buf[:n], check.DeepEquals, []byte{1, 2, 3, 4, 5, 6})
1043 c.Check(err, check.Equals, io.EOF)
1046 f, err = fs.OpenFile("append", os.O_RDWR|os.O_APPEND, 0)
1047 c.Assert(err, check.IsNil)
1048 pos, err = f.Seek(0, io.SeekCurrent)
1049 c.Check(pos, check.Equals, int64(0))
1050 c.Check(err, check.IsNil)
1052 pos, _ = f.Seek(0, io.SeekCurrent)
1053 c.Check(pos, check.Equals, int64(3))
1054 f.Write([]byte{7, 8, 9})
1055 pos, err = f.Seek(0, io.SeekCurrent)
1056 c.Check(err, check.IsNil)
1057 c.Check(pos, check.Equals, int64(9))
1060 f, err = fs.OpenFile("wronly", os.O_CREATE|os.O_WRONLY, 0)
1061 c.Assert(err, check.IsNil)
1062 n, err = f.Write([]byte{3, 2, 1})
1063 c.Check(n, check.Equals, 3)
1064 c.Check(err, check.IsNil)
1065 pos, _ = f.Seek(0, io.SeekCurrent)
1066 c.Check(pos, check.Equals, int64(3))
1067 pos, _ = f.Seek(0, io.SeekStart)
1068 c.Check(pos, check.Equals, int64(0))
1069 n, err = f.Read(buf)
1070 c.Check(n, check.Equals, 0)
1071 c.Check(err, check.ErrorMatches, `.*O_WRONLY.*`)
1072 f, err = fs.OpenFile("wronly", os.O_RDONLY, 0)
1073 c.Assert(err, check.IsNil)
1075 c.Check(buf[:n], check.DeepEquals, []byte{3, 2, 1})
1077 f, err = fs.OpenFile("unsupported", os.O_CREATE|os.O_SYNC, 0)
1078 c.Check(f, check.IsNil)
1079 c.Check(err, check.NotNil)
1081 f, err = fs.OpenFile("append", os.O_RDWR|os.O_WRONLY, 0)
1082 c.Check(f, check.IsNil)
1083 c.Check(err, check.ErrorMatches, `invalid flag.*`)
1086 func (s *CollectionFSSuite) TestFlushFullBlocksWritingLongFile(c *check.C) {
1087 defer func(cw, mbs int) {
1088 concurrentWriters = cw
1090 }(concurrentWriters, maxBlockSize)
1091 concurrentWriters = 2
1094 proceed := make(chan struct{})
1095 var started, concurrent int32
1097 s.kc.onWrite = func([]byte) {
1098 atomic.AddInt32(&concurrent, 1)
1099 switch atomic.AddInt32(&started, 1) {
1101 // Wait until block 2 starts and finishes, and block 3 starts
1104 c.Check(blk2done, check.Equals, true)
1105 case <-time.After(time.Second):
1106 c.Error("timed out")
1109 time.Sleep(time.Millisecond)
1114 time.Sleep(time.Millisecond)
1116 c.Check(atomic.AddInt32(&concurrent, -1) < int32(concurrentWriters), check.Equals, true)
1119 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1120 c.Assert(err, check.IsNil)
1121 f, err := fs.OpenFile("50K", os.O_WRONLY|os.O_CREATE, 0)
1122 c.Assert(err, check.IsNil)
1125 data := make([]byte, 500)
1128 for i := 0; i < 100; i++ {
1129 n, err := f.Write(data)
1130 c.Assert(n, check.Equals, len(data))
1131 c.Assert(err, check.IsNil)
1134 currentMemExtents := func() (memExtents []int) {
1135 for idx, e := range f.(*filehandle).inode.(*filenode).segments {
1138 memExtents = append(memExtents, idx)
1143 f.(*filehandle).inode.(*filenode).waitPrune()
1144 c.Check(currentMemExtents(), check.HasLen, 1)
1146 m, err := fs.MarshalManifest(".")
1147 c.Check(m, check.Matches, `[^:]* 0:50000:50K\n`)
1148 c.Check(err, check.IsNil)
1149 c.Check(currentMemExtents(), check.HasLen, 0)
1152 // Ensure blocks get flushed to disk if a lot of data is written to
1153 // small files/directories without calling sync().
1155 // Write four 512KiB files into each of 256 top-level dirs (total
1156 // 512MiB), calling Flush() every 8 dirs. Ensure memory usage never
1157 // exceeds 24MiB (4 concurrentWriters * 2MiB + 8 unflushed dirs *
1159 func (s *CollectionFSSuite) TestFlushAll(c *check.C) {
1160 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1161 c.Assert(err, check.IsNil)
1163 s.kc.onWrite = func([]byte) {
1164 // discard flushed data -- otherwise the stub will use
1166 time.Sleep(time.Millisecond)
1169 s.kc.blocks = map[string][]byte{}
1171 for i := 0; i < 256; i++ {
1172 buf := bytes.NewBuffer(make([]byte, 524288))
1173 fmt.Fprintf(buf, "test file in dir%d", i)
1175 dir := fmt.Sprintf("dir%d", i)
1177 for j := 0; j < 2; j++ {
1178 f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1179 c.Assert(err, check.IsNil)
1181 _, err = io.Copy(f, buf)
1182 c.Assert(err, check.IsNil)
1189 size := fs.MemorySize()
1190 if !c.Check(size <= 1<<24, check.Equals, true) {
1191 c.Logf("at dir%d fs.MemorySize()=%d", i, size)
1197 // Ensure short blocks at the end of a stream don't get flushed by
1200 // Write 67x 1MiB files to each of 8 dirs, and check that 8 full 64MiB
1201 // blocks have been flushed while 8x 3MiB is still buffered in memory.
1202 func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
1203 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1204 c.Assert(err, check.IsNil)
1207 s.kc.onWrite = func(p []byte) {
1208 atomic.AddInt64(&flushed, int64(len(p)))
1213 megabyte := make([]byte, 1<<20)
1214 for i := int64(0); i < nDirs; i++ {
1215 dir := fmt.Sprintf("dir%d", i)
1217 for j := int64(0); j < nFiles; j++ {
1218 f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1219 c.Assert(err, check.IsNil)
1221 _, err = f.Write(megabyte)
1222 c.Assert(err, check.IsNil)
1225 inodebytes := int64((nDirs*(nFiles+1) + 1) * 64)
1226 c.Check(fs.MemorySize(), check.Equals, nDirs*nFiles*(1<<20+64)+inodebytes)
1227 c.Check(flushed, check.Equals, int64(0))
1229 waitForFlush := func(expectUnflushed, expectFlushed int64) {
1230 for deadline := time.Now().Add(5 * time.Second); fs.MemorySize() > expectUnflushed && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
1232 c.Check(fs.MemorySize(), check.Equals, expectUnflushed)
1233 c.Check(flushed, check.Equals, expectFlushed)
1236 // Nothing flushed yet
1237 waitForFlush(nDirs*nFiles*(1<<20+64)+inodebytes, 0)
1239 // Flushing a non-empty dir "/" is non-recursive and there are
1240 // no top-level files, so this has no effect
1241 fs.Flush("/", false)
1242 waitForFlush(nDirs*nFiles*(1<<20+64)+inodebytes, 0)
1244 // Flush the full block in dir0
1245 fs.Flush("dir0", false)
1246 bigloclen := int64(32 + 9 + 51 + 64) // md5 + "+" + "67xxxxxx" + "+Axxxxxx..." + 64 (see (storedSegment)memorySize)
1247 waitForFlush((nDirs*nFiles-64)*(1<<20+64)+inodebytes+bigloclen*64, 64<<20)
1249 err = fs.Flush("dir-does-not-exist", false)
1250 c.Check(err, check.NotNil)
1252 // Flush full blocks in all dirs
1254 waitForFlush(nDirs*3*(1<<20+64)+inodebytes+bigloclen*64*nDirs, nDirs*64<<20)
1256 // Flush non-full blocks, too
1258 smallloclen := int64(32 + 8 + 51 + 64) // md5 + "+" + "3xxxxxx" + "+Axxxxxx..." + 64 (see (storedSegment)memorySize)
1259 waitForFlush(inodebytes+bigloclen*64*nDirs+smallloclen*3*nDirs, nDirs*67<<20)
1262 // Even when writing lots of files/dirs from different goroutines, as
1263 // long as Flush(dir,false) is called after writing each file,
1264 // unflushed data should be limited to one full block per
1265 // concurrentWriter, plus one nearly-full block at the end of each
1267 func (s *CollectionFSSuite) TestMaxUnflushed(c *check.C) {
1269 maxUnflushed := (int64(concurrentWriters) + nDirs) << 26
1271 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1272 c.Assert(err, check.IsNil)
1274 release := make(chan struct{})
1275 timeout := make(chan struct{})
1276 time.AfterFunc(10*time.Second, func() { close(timeout) })
1277 var putCount, concurrency int64
1279 s.kc.onWrite = func(p []byte) {
1280 defer atomic.AddInt64(&unflushed, -int64(len(p)))
1281 cur := atomic.AddInt64(&concurrency, 1)
1282 defer atomic.AddInt64(&concurrency, -1)
1283 pc := atomic.AddInt64(&putCount, 1)
1284 if pc < int64(concurrentWriters) {
1285 // Block until we reach concurrentWriters, to
1286 // make sure we're really accepting concurrent
1293 } else if pc == int64(concurrentWriters) {
1294 // Unblock the first N-1 PUT reqs.
1297 c.Assert(cur <= int64(concurrentWriters), check.Equals, true)
1298 c.Assert(atomic.LoadInt64(&unflushed) <= maxUnflushed, check.Equals, true)
1301 var owg sync.WaitGroup
1302 megabyte := make([]byte, 1<<20)
1303 for i := int64(0); i < nDirs; i++ {
1304 dir := fmt.Sprintf("dir%d", i)
1309 defer fs.Flush(dir, true)
1310 var iwg sync.WaitGroup
1312 for j := 0; j < 67; j++ {
1316 f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1317 c.Assert(err, check.IsNil)
1319 n, err := f.Write(megabyte)
1320 c.Assert(err, check.IsNil)
1321 atomic.AddInt64(&unflushed, int64(n))
1322 fs.Flush(dir, false)
1331 func (s *CollectionFSSuite) TestFlushStress(c *check.C) {
1333 defer func() { done = true }()
1334 time.AfterFunc(10*time.Second, func() {
1336 pprof.Lookup("goroutine").WriteTo(os.Stderr, 1)
1342 s.kc.onWrite = func(p []byte) {
1344 s.kc.blocks = map[string][]byte{}
1346 defer c.Logf("wrote block %d, %d bytes", wrote, len(p))
1348 time.Sleep(20 * time.Millisecond)
1351 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1352 c.Assert(err, check.IsNil)
1354 data := make([]byte, 1<<20)
1355 for i := 0; i < 3; i++ {
1356 dir := fmt.Sprintf("dir%d", i)
1358 for j := 0; j < 200; j++ {
1360 f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1361 c.Assert(err, check.IsNil)
1362 _, err = f.Write(data)
1363 c.Assert(err, check.IsNil)
1365 fs.Flush(dir, false)
1367 _, err := fs.MarshalManifest(".")
1368 c.Check(err, check.IsNil)
1372 func (s *CollectionFSSuite) TestFlushShort(c *check.C) {
1373 s.kc.onWrite = func([]byte) {
1375 s.kc.blocks = map[string][]byte{}
1378 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1379 c.Assert(err, check.IsNil)
1380 for _, blocksize := range []int{8, 1000000} {
1381 dir := fmt.Sprintf("dir%d", blocksize)
1382 err = fs.Mkdir(dir, 0755)
1383 c.Assert(err, check.IsNil)
1384 data := make([]byte, blocksize)
1385 for i := 0; i < 100; i++ {
1386 f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, i), os.O_WRONLY|os.O_CREATE, 0)
1387 c.Assert(err, check.IsNil)
1388 _, err = f.Write(data)
1389 c.Assert(err, check.IsNil)
1391 fs.Flush(dir, false)
1394 _, err := fs.MarshalManifest(".")
1395 c.Check(err, check.IsNil)
1399 func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
1400 for _, txt := range []string{
1404 ". d41d8cd98f00b204e9800998ecf8427e+0\n",
1405 ". d41d8cd98f00b204e9800998ecf8427e+0 \n",
1408 ". 0:0:foo 0:0:bar\n",
1409 ". d41d8cd98f00b204e9800998ecf8427e 0:0:foo\n",
1410 ". d41d8cd98f00b204e9800998ecf8427e+0 :0:0:foo\n",
1411 ". d41d8cd98f00b204e9800998ecf8427e+0 foo:0:foo\n",
1412 ". d41d8cd98f00b204e9800998ecf8427e+0 0:foo:foo\n",
1413 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:foo 1:1:bar\n",
1414 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\n",
1415 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\\057\\056\n",
1416 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:.\n",
1417 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:..\n",
1418 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:..\n",
1419 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/..\n",
1420 ". d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n",
1421 "./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n. d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n",
1424 fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc)
1425 c.Check(fs, check.IsNil)
1426 c.Logf("-> %s", err)
1427 c.Check(err, check.NotNil)
1431 func (s *CollectionFSSuite) TestEdgeCaseManifests(c *check.C) {
1432 for _, txt := range []string{
1434 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo\n",
1435 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:...\n",
1436 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:. 0:0:. 0:0:\\056 0:0:\\056\n",
1437 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/. 0:0:. 0:0:foo\\057bar\\057\\056\n",
1438 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n",
1439 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/bar\n./foo d41d8cd98f00b204e9800998ecf8427e+0 0:0:bar\n",
1442 fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc)
1443 c.Check(err, check.IsNil)
1444 c.Check(fs, check.NotNil)
1448 func (s *CollectionFSSuite) TestSnapshotSplice(c *check.C) {
1449 filedata1 := "hello snapshot+splice world\n"
1450 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1451 c.Assert(err, check.IsNil)
1453 f, err := fs.OpenFile("file1", os.O_CREATE|os.O_RDWR, 0700)
1454 c.Assert(err, check.IsNil)
1455 _, err = f.Write([]byte(filedata1))
1456 c.Assert(err, check.IsNil)
1458 c.Assert(err, check.IsNil)
1461 snap, err := Snapshot(fs, "/")
1462 c.Assert(err, check.IsNil)
1463 err = Splice(fs, "dir1", snap)
1464 c.Assert(err, check.IsNil)
1465 f, err := fs.Open("dir1/file1")
1466 c.Assert(err, check.IsNil)
1467 buf, err := io.ReadAll(f)
1468 c.Assert(err, check.IsNil)
1469 c.Check(string(buf), check.Equals, filedata1)
1472 func (s *CollectionFSSuite) TestRefreshSignatures(c *check.C) {
1473 filedata1 := "hello refresh signatures world\n"
1474 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1475 c.Assert(err, check.IsNil)
1476 fs.Mkdir("d1", 0700)
1477 f, err := fs.OpenFile("d1/file1", os.O_CREATE|os.O_RDWR, 0700)
1478 c.Assert(err, check.IsNil)
1479 _, err = f.Write([]byte(filedata1))
1480 c.Assert(err, check.IsNil)
1482 c.Assert(err, check.IsNil)
1484 filedata2 := "hello refresh signatures universe\n"
1485 fs.Mkdir("d2", 0700)
1486 f, err = fs.OpenFile("d2/file2", os.O_CREATE|os.O_RDWR, 0700)
1487 c.Assert(err, check.IsNil)
1488 _, err = f.Write([]byte(filedata2))
1489 c.Assert(err, check.IsNil)
1491 c.Assert(err, check.IsNil)
1492 txt, err := fs.MarshalManifest(".")
1493 c.Assert(err, check.IsNil)
1494 var saved Collection
1495 err = s.client.RequestAndDecode(&saved, "POST", "arvados/v1/collections", nil, map[string]interface{}{
1496 "select": []string{"manifest_text", "uuid", "portable_data_hash"},
1497 "collection": map[string]interface{}{
1498 "manifest_text": txt,
1501 c.Assert(err, check.IsNil)
1503 // Update signatures synchronously if they are already expired
1504 // when Read() is called.
1506 saved.ManifestText = SignManifest(saved.ManifestText, s.kc.authToken, time.Now().Add(-2*time.Second), s.kc.sigttl, []byte(s.kc.sigkey))
1507 fs, err := saved.FileSystem(s.client, s.kc)
1508 c.Assert(err, check.IsNil)
1509 f, err := fs.OpenFile("d1/file1", os.O_RDONLY, 0)
1510 c.Assert(err, check.IsNil)
1511 buf, err := ioutil.ReadAll(f)
1512 c.Check(err, check.IsNil)
1513 c.Check(string(buf), check.Equals, filedata1)
1516 // Update signatures asynchronously if we're more than half
1517 // way to TTL when Read() is called.
1519 exp := time.Now().Add(2 * time.Minute)
1520 saved.ManifestText = SignManifest(saved.ManifestText, s.kc.authToken, exp, s.kc.sigttl, []byte(s.kc.sigkey))
1521 fs, err := saved.FileSystem(s.client, s.kc)
1522 c.Assert(err, check.IsNil)
1523 f1, err := fs.OpenFile("d1/file1", os.O_RDONLY, 0)
1524 c.Assert(err, check.IsNil)
1525 f2, err := fs.OpenFile("d2/file2", os.O_RDONLY, 0)
1526 c.Assert(err, check.IsNil)
1527 buf, err := ioutil.ReadAll(f1)
1528 c.Check(err, check.IsNil)
1529 c.Check(string(buf), check.Equals, filedata1)
1531 // Ensure fs treats the 2-minute TTL as less than half
1532 // the server's signing TTL. If we don't do this,
1533 // collectionfs will guess the signature is fresh,
1534 // i.e., signing TTL is 2 minutes, and won't do an
1536 fs.(*collectionFileSystem).guessSignatureTTL = time.Hour
1539 for deadline := time.Now().Add(time.Second * 10); time.Now().Before(deadline) && !refreshed; time.Sleep(time.Second / 10) {
1540 _, err = f1.Seek(0, io.SeekStart)
1541 c.Assert(err, check.IsNil)
1542 buf, err = ioutil.ReadAll(f1)
1543 c.Assert(err, check.IsNil)
1544 c.Assert(string(buf), check.Equals, filedata1)
1545 loc := s.kc.reads[len(s.kc.reads)-1]
1546 t, err := signatureExpiryTime(loc)
1547 c.Assert(err, check.IsNil)
1548 c.Logf("last read block %s had signature expiry time %v", loc, t)
1549 if t.Sub(time.Now()) > time.Hour {
1553 c.Check(refreshed, check.Equals, true)
1555 // Second locator should have been updated at the same
1557 buf, err = ioutil.ReadAll(f2)
1558 c.Assert(err, check.IsNil)
1559 c.Assert(string(buf), check.Equals, filedata2)
1560 loc := s.kc.reads[len(s.kc.reads)-1]
1561 c.Check(loc, check.Not(check.Equals), s.kc.reads[len(s.kc.reads)-2])
1562 t, err := signatureExpiryTime(s.kc.reads[len(s.kc.reads)-1])
1563 c.Assert(err, check.IsNil)
1564 c.Logf("last read block %s had signature expiry time %v", loc, t)
1565 c.Check(t.Sub(time.Now()) > time.Hour, check.Equals, true)
1569 var bigmanifest = func() string {
1570 var buf bytes.Buffer
1571 for i := 0; i < 2000; i++ {
1572 fmt.Fprintf(&buf, "./dir%d", i)
1573 for i := 0; i < 100; i++ {
1574 fmt.Fprintf(&buf, " d41d8cd98f00b204e9800998ecf8427e+99999")
1576 for i := 0; i < 2000; i++ {
1577 fmt.Fprintf(&buf, " 1200000:300000:file%d", i)
1579 fmt.Fprintf(&buf, "\n")
1584 func (s *CollectionFSSuite) BenchmarkParseManifest(c *check.C) {
1585 DebugLocksPanicMode = false
1586 c.Logf("test manifest is %d bytes", len(bigmanifest))
1587 for i := 0; i < c.N; i++ {
1588 fs, err := (&Collection{ManifestText: bigmanifest}).FileSystem(s.client, s.kc)
1589 c.Check(err, check.IsNil)
1590 c.Check(fs, check.NotNil)
1594 func (s *CollectionFSSuite) checkMemSize(c *check.C, f File) {
1595 fn := f.(*filehandle).inode.(*filenode)
1597 for _, seg := range fn.segments {
1598 if e, ok := seg.(*memSegment); ok {
1599 memsize += int64(len(e.buf))
1602 c.Check(fn.memsize, check.Equals, memsize)
1605 type CollectionFSUnitSuite struct{}
1607 var _ = check.Suite(&CollectionFSUnitSuite{})
1609 // expect ~2 seconds to load a manifest with 256K files
1610 func (s *CollectionFSUnitSuite) TestLargeManifest(c *check.C) {
1611 if testing.Short() {
1620 mb := bytes.NewBuffer(make([]byte, 0, 40000000))
1621 for i := 0; i < dirCount; i++ {
1622 fmt.Fprintf(mb, "./dir%d", i)
1623 for j := 0; j <= fileCount; j++ {
1624 fmt.Fprintf(mb, " %032x+42+A%040x@%08x", j, j, j)
1626 for j := 0; j < fileCount; j++ {
1627 fmt.Fprintf(mb, " %d:%d:dir%d/file%d", j*42+21, 42, j, j)
1629 mb.Write([]byte{'\n'})
1631 coll := Collection{ManifestText: mb.String()}
1632 c.Logf("%s built", time.Now())
1634 var memstats runtime.MemStats
1635 runtime.ReadMemStats(&memstats)
1636 c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys)
1638 f, err := coll.FileSystem(nil, nil)
1639 c.Check(err, check.IsNil)
1640 c.Logf("%s loaded", time.Now())
1641 c.Check(f.Size(), check.Equals, int64(42*dirCount*fileCount))
1643 for i := 0; i < dirCount; i++ {
1644 for j := 0; j < fileCount; j++ {
1645 f.Stat(fmt.Sprintf("./dir%d/dir%d/file%d", i, j, j))
1648 c.Logf("%s Stat() x %d", time.Now(), dirCount*fileCount)
1650 runtime.ReadMemStats(&memstats)
1651 c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys)
1654 // Gocheck boilerplate
1655 func Test(t *testing.T) {