1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
27 check "gopkg.in/check.v1"
30 var _ = check.Suite(&CollectionFSSuite{})
32 type keepClientStub struct {
33 blocks map[string][]byte
34 refreshable map[string]bool
35 reads []string // locators from ReadAt() calls
36 onWrite func(bufcopy []byte) // called from WriteBlock, before acquiring lock
37 authToken string // client's auth token (used for signing locators)
38 sigkey string // blob signing key
39 sigttl time.Duration // blob signing ttl
43 var errStub404 = errors.New("404 block not found")
45 func (kcs *keepClientStub) ReadAt(locator string, p []byte, off int) (int, error) {
47 kcs.reads = append(kcs.reads, locator)
51 if err := VerifySignature(locator, kcs.authToken, kcs.sigttl, []byte(kcs.sigkey)); err != nil {
54 buf := kcs.blocks[locator[:32]]
58 return copy(p, buf[off:]), nil
61 func (kcs *keepClientStub) BlockWrite(_ context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
63 panic("oops, stub is not made for this")
65 locator := SignLocator(fmt.Sprintf("%x+%d", md5.Sum(opts.Data), len(opts.Data)), kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
66 buf := make([]byte, len(opts.Data))
68 if kcs.onWrite != nil {
71 for _, sc := range opts.StorageClasses {
73 return BlockWriteResponse{}, fmt.Errorf("stub does not write storage class %q", sc)
78 kcs.blocks[locator[:32]] = buf
79 return BlockWriteResponse{Locator: locator, Replicas: 1}, nil
82 var reRemoteSignature = regexp.MustCompile(`\+[AR][^+]*`)
84 func (kcs *keepClientStub) LocalLocator(locator string) (string, error) {
85 if strings.Contains(locator, "+A") {
90 if strings.Contains(locator, "+R") {
91 if len(locator) < 32 {
92 return "", fmt.Errorf("bad locator: %q", locator)
94 if _, ok := kcs.blocks[locator[:32]]; !ok && !kcs.refreshable[locator[:32]] {
95 return "", fmt.Errorf("kcs.refreshable[%q]==false", locator)
98 locator = reRemoteSignature.ReplaceAllLiteralString(locator, "")
99 locator = SignLocator(locator, kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
103 type CollectionFSSuite struct {
106 fs CollectionFileSystem
110 func (s *CollectionFSSuite) SetUpTest(c *check.C) {
111 s.client = NewClientFromEnv()
112 s.client.AuthToken = fixtureActiveToken
113 err := s.client.RequestAndDecode(&s.coll, "GET", "arvados/v1/collections/"+fixtureFooAndBarFilesInDirUUID, nil, nil)
114 c.Assert(err, check.IsNil)
115 s.kc = &keepClientStub{
116 blocks: map[string][]byte{
117 "3858f62230ac3c915f300c664312c63f": []byte("foobar"),
119 sigkey: fixtureBlobSigningKey,
120 sigttl: fixtureBlobSigningTTL,
121 authToken: fixtureActiveToken,
123 s.fs, err = s.coll.FileSystem(s.client, s.kc)
124 c.Assert(err, check.IsNil)
127 func (s *CollectionFSSuite) TestSyncNonCanonicalManifest(c *check.C) {
129 err := s.client.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+fixtureFooAndBarFilesInDirUUID, nil, nil)
130 c.Assert(err, check.IsNil)
131 mtxt := strings.Replace(coll.ManifestText, "3:3:bar 0:3:foo", "0:3:foo 3:3:bar", -1)
132 c.Assert(mtxt, check.Not(check.Equals), coll.ManifestText)
133 err = s.client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
134 "collection": map[string]interface{}{
135 "manifest_text": mtxt}})
136 c.Assert(err, check.IsNil)
137 // In order for the rest of the test to work as intended, the API server
138 // needs to retain the file ordering we set manually. We check that here.
139 // We can't check `mtxt == coll.ManifestText` because the API server
140 // might've returned new block signatures if the GET and POST happened in
141 // different seconds.
142 expectPattern := `\./dir1 \S+ 0:3:foo 3:3:bar\n`
143 c.Assert(coll.ManifestText, check.Matches, expectPattern)
145 fs, err := coll.FileSystem(s.client, s.kc)
146 c.Assert(err, check.IsNil)
148 c.Check(err, check.IsNil)
150 // fs had no local changes, so Sync should not have saved
151 // anything back to the API/database. (If it did, we would see
152 // the manifest rewritten in canonical order.)
154 err = s.client.RequestAndDecode(&saved, "GET", "arvados/v1/collections/"+coll.UUID, nil, nil)
155 c.Assert(err, check.IsNil)
156 c.Check(saved.ManifestText, check.Matches, expectPattern)
159 func (s *CollectionFSSuite) TestHttpFileSystemInterface(c *check.C) {
160 _, ok := s.fs.(http.FileSystem)
161 c.Check(ok, check.Equals, true)
164 func (s *CollectionFSSuite) TestUnattainableStorageClasses(c *check.C) {
165 fs, err := (&Collection{
166 StorageClassesDesired: []string{"unobtainium"},
167 }).FileSystem(s.client, s.kc)
168 c.Assert(err, check.IsNil)
170 f, err := fs.OpenFile("/foo", os.O_CREATE|os.O_WRONLY, 0777)
171 c.Assert(err, check.IsNil)
172 _, err = f.Write([]byte("food"))
173 c.Assert(err, check.IsNil)
175 c.Assert(err, check.IsNil)
176 _, err = fs.MarshalManifest(".")
177 c.Assert(err, check.ErrorMatches, `.*stub does not write storage class \"unobtainium\"`)
180 func (s *CollectionFSSuite) TestColonInFilename(c *check.C) {
181 fs, err := (&Collection{
182 ManifestText: "./foo:foo 3858f62230ac3c915f300c664312c63f+3 0:3:bar:bar\n",
183 }).FileSystem(s.client, s.kc)
184 c.Assert(err, check.IsNil)
186 f, err := fs.Open("/foo:foo")
187 c.Assert(err, check.IsNil)
189 fis, err := f.Readdir(0)
190 c.Check(err, check.IsNil)
191 c.Check(len(fis), check.Equals, 1)
192 c.Check(fis[0].Name(), check.Equals, "bar:bar")
195 func (s *CollectionFSSuite) TestReaddirFull(c *check.C) {
196 f, err := s.fs.Open("/dir1")
197 c.Assert(err, check.IsNil)
200 c.Assert(err, check.IsNil)
201 c.Check(st.Size(), check.Equals, int64(2))
202 c.Check(st.IsDir(), check.Equals, true)
204 fis, err := f.Readdir(0)
205 c.Check(err, check.IsNil)
206 c.Check(len(fis), check.Equals, 2)
208 c.Check(fis[0].Size(), check.Equals, int64(3))
212 func (s *CollectionFSSuite) TestReaddirLimited(c *check.C) {
213 f, err := s.fs.Open("./dir1")
214 c.Assert(err, check.IsNil)
216 fis, err := f.Readdir(1)
217 c.Check(err, check.IsNil)
218 c.Check(len(fis), check.Equals, 1)
220 c.Check(fis[0].Size(), check.Equals, int64(3))
223 fis, err = f.Readdir(1)
224 c.Check(err, check.IsNil)
225 c.Check(len(fis), check.Equals, 1)
227 c.Check(fis[0].Size(), check.Equals, int64(3))
230 fis, err = f.Readdir(1)
231 c.Check(len(fis), check.Equals, 0)
232 c.Check(err, check.NotNil)
233 c.Check(err, check.Equals, io.EOF)
235 f, err = s.fs.Open("dir1")
236 c.Assert(err, check.IsNil)
237 fis, err = f.Readdir(1)
238 c.Check(len(fis), check.Equals, 1)
239 c.Assert(err, check.IsNil)
240 fis, err = f.Readdir(2)
241 c.Check(len(fis), check.Equals, 1)
242 c.Assert(err, check.IsNil)
243 fis, err = f.Readdir(2)
244 c.Check(len(fis), check.Equals, 0)
245 c.Assert(err, check.Equals, io.EOF)
248 func (s *CollectionFSSuite) TestPathMunge(c *check.C) {
249 for _, path := range []string{".", "/", "./", "///", "/../", "/./.."} {
250 f, err := s.fs.Open(path)
251 c.Assert(err, check.IsNil)
254 c.Assert(err, check.IsNil)
255 c.Check(st.Size(), check.Equals, int64(1))
256 c.Check(st.IsDir(), check.Equals, true)
258 for _, path := range []string{"/dir1", "dir1", "./dir1", "///dir1//.//", "../dir1/../dir1/"} {
260 f, err := s.fs.Open(path)
261 c.Assert(err, check.IsNil)
264 c.Assert(err, check.IsNil)
265 c.Check(st.Size(), check.Equals, int64(2))
266 c.Check(st.IsDir(), check.Equals, true)
270 func (s *CollectionFSSuite) TestNotExist(c *check.C) {
271 for _, path := range []string{"/no", "no", "./no", "n/o", "/n/o"} {
272 f, err := s.fs.Open(path)
273 c.Assert(f, check.IsNil)
274 c.Assert(err, check.NotNil)
275 c.Assert(os.IsNotExist(err), check.Equals, true)
279 func (s *CollectionFSSuite) TestReadOnlyFile(c *check.C) {
280 f, err := s.fs.OpenFile("/dir1/foo", os.O_RDONLY, 0)
281 c.Assert(err, check.IsNil)
283 c.Assert(err, check.IsNil)
284 c.Check(st.Size(), check.Equals, int64(3))
285 n, err := f.Write([]byte("bar"))
286 c.Check(n, check.Equals, 0)
287 c.Check(err, check.Equals, ErrReadOnlyFile)
290 func (s *CollectionFSSuite) TestCreateFile(c *check.C) {
291 f, err := s.fs.OpenFile("/new-file 1", os.O_RDWR|os.O_CREATE, 0)
292 c.Assert(err, check.IsNil)
294 c.Assert(err, check.IsNil)
295 c.Check(st.Size(), check.Equals, int64(0))
297 n, err := f.Write([]byte("bar"))
298 c.Check(n, check.Equals, 3)
299 c.Check(err, check.IsNil)
301 c.Check(f.Close(), check.IsNil)
303 f, err = s.fs.OpenFile("/new-file 1", os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
304 c.Check(f, check.IsNil)
305 c.Assert(err, check.NotNil)
307 f, err = s.fs.OpenFile("/new-file 1", os.O_RDWR, 0)
308 c.Assert(err, check.IsNil)
310 c.Assert(err, check.IsNil)
311 c.Check(st.Size(), check.Equals, int64(3))
313 c.Check(f.Close(), check.IsNil)
315 m, err := s.fs.MarshalManifest(".")
316 c.Assert(err, check.IsNil)
317 c.Check(m, check.Matches, `. 37b51d194a7513e45b56f6524f2d51f2\+3\+\S+ 0:3:new-file\\0401\n./dir1 .* 3:3:bar 0:3:foo\n`)
320 func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
322 defer func() { maxBlockSize = 2 << 26 }()
324 f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
325 c.Assert(err, check.IsNil)
328 c.Assert(err, check.IsNil)
329 c.Check(st.Size(), check.Equals, int64(3))
331 f2, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
332 c.Assert(err, check.IsNil)
335 buf := make([]byte, 64)
336 n, err := f.Read(buf)
337 c.Check(n, check.Equals, 3)
338 c.Check(err, check.Equals, io.EOF)
339 c.Check(string(buf[:3]), check.DeepEquals, "foo")
341 pos, err := f.Seek(-2, io.SeekCurrent)
342 c.Check(pos, check.Equals, int64(1))
343 c.Check(err, check.IsNil)
345 // Split a storedExtent in two, and insert a memExtent
346 n, err = f.Write([]byte("*"))
347 c.Check(n, check.Equals, 1)
348 c.Check(err, check.IsNil)
350 pos, err = f.Seek(0, io.SeekCurrent)
351 c.Check(pos, check.Equals, int64(2))
352 c.Check(err, check.IsNil)
354 pos, err = f.Seek(0, io.SeekStart)
355 c.Check(pos, check.Equals, int64(0))
356 c.Check(err, check.IsNil)
358 rbuf, err := ioutil.ReadAll(f)
359 c.Check(len(rbuf), check.Equals, 3)
360 c.Check(err, check.IsNil)
361 c.Check(string(rbuf), check.Equals, "f*o")
363 // Write multiple blocks in one call
364 f.Seek(1, io.SeekStart)
365 n, err = f.Write([]byte("0123456789abcdefg"))
366 c.Check(n, check.Equals, 17)
367 c.Check(err, check.IsNil)
368 pos, err = f.Seek(0, io.SeekCurrent)
369 c.Check(pos, check.Equals, int64(18))
370 c.Check(err, check.IsNil)
371 pos, err = f.Seek(-18, io.SeekCurrent)
372 c.Check(pos, check.Equals, int64(0))
373 c.Check(err, check.IsNil)
374 n, err = io.ReadFull(f, buf)
375 c.Check(n, check.Equals, 18)
376 c.Check(err, check.Equals, io.ErrUnexpectedEOF)
377 c.Check(string(buf[:n]), check.Equals, "f0123456789abcdefg")
379 buf2, err := ioutil.ReadAll(f2)
380 c.Check(err, check.IsNil)
381 c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
383 // truncate to current size
385 c.Check(err, check.IsNil)
386 f2.Seek(0, io.SeekStart)
387 buf2, err = ioutil.ReadAll(f2)
388 c.Check(err, check.IsNil)
389 c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
391 // shrink to zero some data
393 f2.Seek(0, io.SeekStart)
394 buf2, err = ioutil.ReadAll(f2)
395 c.Check(err, check.IsNil)
396 c.Check(string(buf2), check.Equals, "f0123456789abcd")
398 // grow to partial block/extent
400 f2.Seek(0, io.SeekStart)
401 buf2, err = ioutil.ReadAll(f2)
402 c.Check(err, check.IsNil)
403 c.Check(string(buf2), check.Equals, "f0123456789abcd\x00\x00\x00\x00\x00")
406 f2.Seek(0, io.SeekStart)
407 f2.Write([]byte("12345678abcdefghijkl"))
409 // grow to block/extent boundary
411 f2.Seek(0, io.SeekStart)
412 buf2, err = ioutil.ReadAll(f2)
413 c.Check(err, check.IsNil)
414 c.Check(len(buf2), check.Equals, 64)
415 c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 8)
417 // shrink to block/extent boundary
419 c.Check(err, check.IsNil)
420 f2.Seek(0, io.SeekStart)
421 buf2, err = ioutil.ReadAll(f2)
422 c.Check(err, check.IsNil)
423 c.Check(len(buf2), check.Equals, 32)
424 c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 4)
426 // shrink to partial block/extent
428 c.Check(err, check.IsNil)
429 f2.Seek(0, io.SeekStart)
430 buf2, err = ioutil.ReadAll(f2)
431 c.Check(err, check.IsNil)
432 c.Check(string(buf2), check.Equals, "12345678abcdefg")
433 c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 2)
435 // Force flush to ensure the block "12345678" gets stored, so
436 // we know what to expect in the final manifest below.
437 _, err = s.fs.MarshalManifest(".")
438 c.Check(err, check.IsNil)
440 // Truncate to size=3 while f2's ptr is at 15
442 c.Check(err, check.IsNil)
443 buf2, err = ioutil.ReadAll(f2)
444 c.Check(err, check.IsNil)
445 c.Check(string(buf2), check.Equals, "")
446 f2.Seek(0, io.SeekStart)
447 buf2, err = ioutil.ReadAll(f2)
448 c.Check(err, check.IsNil)
449 c.Check(string(buf2), check.Equals, "123")
450 c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 1)
452 m, err := s.fs.MarshalManifest(".")
453 c.Check(err, check.IsNil)
454 m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
455 c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 25d55ad283aa400af464c76d713c07ad+8 3:3:bar 6:3:foo\n")
456 c.Check(s.fs.Size(), check.Equals, int64(6))
459 func (s *CollectionFSSuite) TestSeekSparse(c *check.C) {
460 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
461 c.Assert(err, check.IsNil)
462 f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755)
463 c.Assert(err, check.IsNil)
466 checkSize := func(size int64) {
468 c.Assert(err, check.IsNil)
469 c.Check(fi.Size(), check.Equals, size)
471 f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755)
472 c.Assert(err, check.IsNil)
475 c.Check(err, check.IsNil)
476 c.Check(fi.Size(), check.Equals, size)
477 pos, err := f.Seek(0, io.SeekEnd)
478 c.Check(err, check.IsNil)
479 c.Check(pos, check.Equals, size)
482 f.Seek(2, io.SeekEnd)
487 f.Seek(2, io.SeekCurrent)
492 f.Seek(8, io.SeekStart)
494 n, err := f.Read(make([]byte, 1))
495 c.Check(n, check.Equals, 0)
496 c.Check(err, check.Equals, io.EOF)
498 f.Write([]byte{1, 2, 3})
502 func (s *CollectionFSSuite) TestMarshalCopiesRemoteBlocks(c *check.C) {
505 hash := map[string]string{
506 foo: fmt.Sprintf("%x", md5.Sum([]byte(foo))),
507 bar: fmt.Sprintf("%x", md5.Sum([]byte(bar))),
510 fs, err := (&Collection{
511 ManifestText: ". " + hash[foo] + "+3+Rzaaaa-foo@bab " + hash[bar] + "+3+A12345@ffffff 0:2:fo.txt 2:4:obar.txt\n",
512 }).FileSystem(s.client, s.kc)
513 c.Assert(err, check.IsNil)
514 manifest, err := fs.MarshalManifest(".")
515 c.Check(manifest, check.Equals, "")
516 c.Check(err, check.NotNil)
518 s.kc.refreshable = map[string]bool{hash[bar]: true}
520 for _, sigIn := range []string{"Rzaaaa-foo@bab", "A12345@abcde"} {
521 fs, err = (&Collection{
522 ManifestText: ". " + hash[foo] + "+3+A12345@fffff " + hash[bar] + "+3+" + sigIn + " 0:2:fo.txt 2:4:obar.txt\n",
523 }).FileSystem(s.client, s.kc)
524 c.Assert(err, check.IsNil)
525 manifest, err := fs.MarshalManifest(".")
526 c.Check(err, check.IsNil)
527 // Both blocks should now have +A signatures.
528 c.Check(manifest, check.Matches, `.*\+A.* .*\+A.*\n`)
529 c.Check(manifest, check.Not(check.Matches), `.*\+R.*\n`)
533 func (s *CollectionFSSuite) TestMarshalSmallBlocks(c *check.C) {
535 defer func() { maxBlockSize = 2 << 26 }()
538 s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
539 c.Assert(err, check.IsNil)
540 for _, name := range []string{"foo", "bar", "baz"} {
541 f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
542 c.Assert(err, check.IsNil)
543 f.Write([]byte(name))
547 m, err := s.fs.MarshalManifest(".")
548 c.Check(err, check.IsNil)
549 m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
550 c.Check(m, check.Equals, ". c3c23db5285662ef7172373df0003206+6 acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:bar 3:3:baz 6:3:foo\n")
553 func (s *CollectionFSSuite) TestMkdir(c *check.C) {
554 err := s.fs.Mkdir("foo/bar", 0755)
555 c.Check(err, check.Equals, os.ErrNotExist)
557 f, err := s.fs.OpenFile("foo/bar", os.O_CREATE, 0)
558 c.Check(err, check.Equals, os.ErrNotExist)
560 err = s.fs.Mkdir("foo", 0755)
561 c.Check(err, check.IsNil)
563 f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_WRONLY, 0)
564 c.Check(err, check.IsNil)
567 f.Write([]byte("foo"))
570 // mkdir fails if a file already exists with that name
571 err = s.fs.Mkdir("foo/bar", 0755)
572 c.Check(err, check.NotNil)
574 err = s.fs.Remove("foo/bar")
575 c.Check(err, check.IsNil)
577 // mkdir succeeds after the file is deleted
578 err = s.fs.Mkdir("foo/bar", 0755)
579 c.Check(err, check.IsNil)
581 // creating a file in a nonexistent subdir should still fail
582 f, err = s.fs.OpenFile("foo/bar/baz/foo.txt", os.O_CREATE|os.O_WRONLY, 0)
583 c.Check(err, check.Equals, os.ErrNotExist)
585 f, err = s.fs.OpenFile("foo/bar/foo.txt", os.O_CREATE|os.O_WRONLY, 0)
586 c.Check(err, check.IsNil)
589 f.Write([]byte("foo"))
592 // creating foo/bar as a regular file should fail
593 f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_EXCL, 0)
594 c.Check(err, check.NotNil)
596 // creating foo/bar as a directory should fail
597 f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_EXCL, os.ModeDir)
598 c.Check(err, check.NotNil)
599 err = s.fs.Mkdir("foo/bar", 0755)
600 c.Check(err, check.NotNil)
602 m, err := s.fs.MarshalManifest(".")
603 c.Check(err, check.IsNil)
604 m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
605 c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 3:3:bar 0:3:foo\n./foo/bar acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo.txt\n")
608 func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
614 defer func() { maxBlockSize = 1 << 26 }()
616 var wg sync.WaitGroup
617 for n := 0; n < 128; n++ {
621 f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
622 c.Assert(err, check.IsNil)
624 for i := 0; i < 1024; i++ {
628 _, err := s.fs.MarshalManifest(".")
629 c.Check(err, check.IsNil)
631 f.Truncate(int64(rand.Intn(64)))
633 f.Seek(int64(rand.Intn(64)), io.SeekStart)
635 _, err := f.Write([]byte("beep boop"))
636 c.Check(err, check.IsNil)
638 _, err := ioutil.ReadAll(f)
639 c.Check(err, check.IsNil)
646 f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
647 c.Assert(err, check.IsNil)
649 buf, err := ioutil.ReadAll(f)
650 c.Check(err, check.IsNil)
651 c.Logf("after lots of random r/w/seek/trunc, buf is %q", buf)
654 func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
656 defer func() { maxBlockSize = 2 << 26 }()
659 s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
660 c.Assert(err, check.IsNil)
663 const ngoroutines = 256
665 var wg sync.WaitGroup
666 for n := 0; n < ngoroutines; n++ {
670 expect := make([]byte, 0, 64)
671 wbytes := []byte("there's no simple explanation for anything important that any of us do")
672 f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
673 c.Assert(err, check.IsNil)
675 for i := 0; i < nfiles; i++ {
676 trunc := rand.Intn(65)
677 woff := rand.Intn(trunc + 1)
678 wbytes = wbytes[:rand.Intn(64-woff+1)]
679 for buf, i := expect[:cap(expect)], len(expect); i < trunc; i++ {
682 expect = expect[:trunc]
683 if trunc < woff+len(wbytes) {
684 expect = expect[:woff+len(wbytes)]
686 copy(expect[woff:], wbytes)
687 f.Truncate(int64(trunc))
688 pos, err := f.Seek(int64(woff), io.SeekStart)
689 c.Check(pos, check.Equals, int64(woff))
690 c.Check(err, check.IsNil)
691 n, err := f.Write(wbytes)
692 c.Check(n, check.Equals, len(wbytes))
693 c.Check(err, check.IsNil)
694 pos, err = f.Seek(0, io.SeekStart)
695 c.Check(pos, check.Equals, int64(0))
696 c.Check(err, check.IsNil)
697 buf, err := ioutil.ReadAll(f)
698 c.Check(string(buf), check.Equals, string(expect))
699 c.Check(err, check.IsNil)
705 for n := 0; n < ngoroutines; n++ {
706 f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDONLY, 0)
707 c.Assert(err, check.IsNil)
708 f.(*filehandle).inode.(*filenode).waitPrune()
713 root, err := s.fs.Open("/")
714 c.Assert(err, check.IsNil)
716 fi, err := root.Readdir(-1)
717 c.Check(err, check.IsNil)
718 c.Check(len(fi), check.Equals, nfiles)
720 _, err = s.fs.MarshalManifest(".")
721 c.Check(err, check.IsNil)
722 // TODO: check manifest content
725 func (s *CollectionFSSuite) TestRemove(c *check.C) {
726 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
727 c.Assert(err, check.IsNil)
728 err = fs.Mkdir("dir0", 0755)
729 c.Assert(err, check.IsNil)
730 err = fs.Mkdir("dir1", 0755)
731 c.Assert(err, check.IsNil)
732 err = fs.Mkdir("dir1/dir2", 0755)
733 c.Assert(err, check.IsNil)
734 err = fs.Mkdir("dir1/dir3", 0755)
735 c.Assert(err, check.IsNil)
737 err = fs.Remove("dir0")
738 c.Check(err, check.IsNil)
739 err = fs.Remove("dir0")
740 c.Check(err, check.Equals, os.ErrNotExist)
742 err = fs.Remove("dir1/dir2/.")
743 c.Check(err, check.Equals, ErrInvalidArgument)
744 err = fs.Remove("dir1/dir2/..")
745 c.Check(err, check.Equals, ErrInvalidArgument)
746 err = fs.Remove("dir1")
747 c.Check(err, check.Equals, ErrDirectoryNotEmpty)
748 err = fs.Remove("dir1/dir2/../../../dir1")
749 c.Check(err, check.Equals, ErrDirectoryNotEmpty)
750 err = fs.Remove("dir1/dir3/")
751 c.Check(err, check.IsNil)
752 err = fs.RemoveAll("dir1")
753 c.Check(err, check.IsNil)
754 err = fs.RemoveAll("dir1")
755 c.Check(err, check.IsNil)
758 func (s *CollectionFSSuite) TestRenameError(c *check.C) {
759 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
760 c.Assert(err, check.IsNil)
761 err = fs.Mkdir("first", 0755)
762 c.Assert(err, check.IsNil)
763 err = fs.Mkdir("first/second", 0755)
764 c.Assert(err, check.IsNil)
765 f, err := fs.OpenFile("first/second/file", os.O_CREATE|os.O_WRONLY, 0755)
766 c.Assert(err, check.IsNil)
767 f.Write([]byte{1, 2, 3, 4, 5})
769 err = fs.Rename("first", "first/second/third")
770 c.Check(err, check.Equals, ErrInvalidArgument)
771 err = fs.Rename("first", "first/third")
772 c.Check(err, check.Equals, ErrInvalidArgument)
773 err = fs.Rename("first/second", "second")
774 c.Check(err, check.IsNil)
775 f, err = fs.OpenFile("second/file", 0, 0)
776 c.Assert(err, check.IsNil)
777 data, err := ioutil.ReadAll(f)
778 c.Check(err, check.IsNil)
779 c.Check(data, check.DeepEquals, []byte{1, 2, 3, 4, 5})
782 func (s *CollectionFSSuite) TestRenameDirectory(c *check.C) {
783 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
784 c.Assert(err, check.IsNil)
785 err = fs.Mkdir("foo", 0755)
786 c.Assert(err, check.IsNil)
787 err = fs.Mkdir("bar", 0755)
788 c.Assert(err, check.IsNil)
789 err = fs.Rename("bar", "baz")
790 c.Check(err, check.IsNil)
791 err = fs.Rename("foo", "baz")
792 c.Check(err, check.NotNil)
793 err = fs.Rename("foo", "baz/")
794 c.Check(err, check.IsNil)
795 err = fs.Rename("baz/foo", ".")
796 c.Check(err, check.Equals, ErrInvalidArgument)
797 err = fs.Rename("baz/foo/", ".")
798 c.Check(err, check.Equals, ErrInvalidArgument)
801 func (s *CollectionFSSuite) TestRename(c *check.C) {
802 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
803 c.Assert(err, check.IsNil)
808 for i := 0; i < outer; i++ {
809 err = fs.Mkdir(fmt.Sprintf("dir%d", i), 0755)
810 c.Assert(err, check.IsNil)
811 for j := 0; j < inner; j++ {
812 err = fs.Mkdir(fmt.Sprintf("dir%d/dir%d", i, j), 0755)
813 c.Assert(err, check.IsNil)
814 for _, fnm := range []string{
815 fmt.Sprintf("dir%d/file%d", i, j),
816 fmt.Sprintf("dir%d/dir%d/file%d", i, j, j),
818 f, err := fs.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0755)
819 c.Assert(err, check.IsNil)
820 _, err = f.Write([]byte("beep"))
821 c.Assert(err, check.IsNil)
826 var wg sync.WaitGroup
827 for i := 0; i < outer; i++ {
828 for j := 0; j < inner; j++ {
832 oldname := fmt.Sprintf("dir%d/dir%d/file%d", i, j, j)
833 newname := fmt.Sprintf("dir%d/newfile%d", i, inner-j-1)
834 _, err := fs.Open(newname)
835 c.Check(err, check.Equals, os.ErrNotExist)
836 err = fs.Rename(oldname, newname)
837 c.Check(err, check.IsNil)
838 f, err := fs.Open(newname)
839 c.Check(err, check.IsNil)
846 // oldname does not exist
848 fmt.Sprintf("dir%d/dir%d/missing", i, j),
849 fmt.Sprintf("dir%d/dir%d/file%d", outer-i-1, j, j))
850 c.Check(err, check.ErrorMatches, `.*does not exist`)
852 // newname parent dir does not exist
854 fmt.Sprintf("dir%d/dir%d", i, j),
855 fmt.Sprintf("dir%d/missing/irrelevant", outer-i-1))
856 c.Check(err, check.ErrorMatches, `.*does not exist`)
858 // oldname parent dir is a file
860 fmt.Sprintf("dir%d/file%d/patherror", i, j),
861 fmt.Sprintf("dir%d/irrelevant", i))
862 c.Check(err, check.ErrorMatches, `.*not a directory`)
864 // newname parent dir is a file
866 fmt.Sprintf("dir%d/dir%d/file%d", i, j, j),
867 fmt.Sprintf("dir%d/file%d/patherror", i, inner-j-1))
868 c.Check(err, check.ErrorMatches, `.*not a directory`)
874 f, err := fs.OpenFile("dir1/newfile3", 0, 0)
875 c.Assert(err, check.IsNil)
876 c.Check(f.Size(), check.Equals, int64(4))
877 buf, err := ioutil.ReadAll(f)
878 c.Check(buf, check.DeepEquals, []byte("beep"))
879 c.Check(err, check.IsNil)
880 _, err = fs.Open("dir1/dir1/file1")
881 c.Check(err, check.Equals, os.ErrNotExist)
884 func (s *CollectionFSSuite) TestPersist(c *check.C) {
886 defer func() { maxBlockSize = 2 << 26 }()
889 s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
890 c.Assert(err, check.IsNil)
891 err = s.fs.Mkdir("d:r", 0755)
892 c.Assert(err, check.IsNil)
894 expect := map[string][]byte{}
896 var wg sync.WaitGroup
897 for _, name := range []string{"random 1", "random:2", "random\\3", "d:r/random4"} {
898 buf := make([]byte, 500)
902 f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
903 c.Assert(err, check.IsNil)
904 // Note: we don't close the file until after the test
905 // is done. Writes to unclosed files should persist.
911 for i := 0; i < len(buf); i += 5 {
912 _, err := f.Write(buf[i : i+5])
913 c.Assert(err, check.IsNil)
919 m, err := s.fs.MarshalManifest(".")
920 c.Check(err, check.IsNil)
923 root, err := s.fs.Open("/")
924 c.Assert(err, check.IsNil)
926 fi, err := root.Readdir(-1)
927 c.Check(err, check.IsNil)
928 c.Check(len(fi), check.Equals, 4)
930 persisted, err := (&Collection{ManifestText: m}).FileSystem(s.client, s.kc)
931 c.Assert(err, check.IsNil)
933 root, err = persisted.Open("/")
934 c.Assert(err, check.IsNil)
936 fi, err = root.Readdir(-1)
937 c.Check(err, check.IsNil)
938 c.Check(len(fi), check.Equals, 4)
940 for name, content := range expect {
941 c.Logf("read %q", name)
942 f, err := persisted.Open(name)
943 c.Assert(err, check.IsNil)
945 buf, err := ioutil.ReadAll(f)
946 c.Check(err, check.IsNil)
947 c.Check(buf, check.DeepEquals, content)
951 func (s *CollectionFSSuite) TestPersistEmptyFilesAndDirs(c *check.C) {
953 s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
954 c.Assert(err, check.IsNil)
955 for _, name := range []string{"dir", "dir/zerodir", "empty", "not empty", "not empty/empty", "zero", "zero/zero"} {
956 err = s.fs.Mkdir(name, 0755)
957 c.Assert(err, check.IsNil)
960 expect := map[string][]byte{
967 "dir/zerodir/zero": nil,
968 "zero/zero/zero": nil,
970 for name, data := range expect {
971 f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
972 c.Assert(err, check.IsNil)
974 _, err := f.Write(data)
975 c.Assert(err, check.IsNil)
980 m, err := s.fs.MarshalManifest(".")
981 c.Check(err, check.IsNil)
984 persisted, err := (&Collection{ManifestText: m}).FileSystem(s.client, s.kc)
985 c.Assert(err, check.IsNil)
987 for name, data := range expect {
988 _, err = persisted.Open("bogus-" + name)
989 c.Check(err, check.NotNil)
991 f, err := persisted.Open(name)
992 c.Assert(err, check.IsNil)
997 buf, err := ioutil.ReadAll(f)
998 c.Check(err, check.IsNil)
999 c.Check(buf, check.DeepEquals, data)
1002 expectDir := map[string]int{
1005 "not empty/empty": 0,
1007 for name, expectLen := range expectDir {
1008 _, err := persisted.Open(name + "/bogus")
1009 c.Check(err, check.NotNil)
1011 d, err := persisted.Open(name)
1013 c.Check(err, check.IsNil)
1014 fi, err := d.Readdir(-1)
1015 c.Check(err, check.IsNil)
1016 c.Check(fi, check.HasLen, expectLen)
1020 func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
1021 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1022 c.Assert(err, check.IsNil)
1024 f, err := fs.OpenFile("missing", os.O_WRONLY, 0)
1025 c.Check(f, check.IsNil)
1026 c.Check(err, check.ErrorMatches, `file does not exist`)
1028 f, err = fs.OpenFile("new", os.O_CREATE|os.O_RDONLY, 0)
1029 c.Assert(err, check.IsNil)
1031 n, err := f.Write([]byte{1, 2, 3})
1032 c.Check(n, check.Equals, 0)
1033 c.Check(err, check.ErrorMatches, `read-only file`)
1034 n, err = f.Read(make([]byte, 1))
1035 c.Check(n, check.Equals, 0)
1036 c.Check(err, check.Equals, io.EOF)
1037 f, err = fs.OpenFile("new", os.O_RDWR, 0)
1038 c.Assert(err, check.IsNil)
1040 _, err = f.Write([]byte{4, 5, 6})
1041 c.Check(err, check.IsNil)
1043 c.Assert(err, check.IsNil)
1044 c.Check(fi.Size(), check.Equals, int64(3))
1046 f, err = fs.OpenFile("new", os.O_TRUNC|os.O_RDWR, 0)
1047 c.Assert(err, check.IsNil)
1049 pos, err := f.Seek(0, io.SeekEnd)
1050 c.Check(pos, check.Equals, int64(0))
1051 c.Check(err, check.IsNil)
1053 c.Assert(err, check.IsNil)
1054 c.Check(fi.Size(), check.Equals, int64(0))
1057 buf := make([]byte, 64)
1058 f, err = fs.OpenFile("append", os.O_EXCL|os.O_CREATE|os.O_RDWR|os.O_APPEND, 0)
1059 c.Assert(err, check.IsNil)
1060 f.Write([]byte{1, 2, 3})
1061 f.Seek(0, io.SeekStart)
1062 n, _ = f.Read(buf[:1])
1063 c.Check(n, check.Equals, 1)
1064 c.Check(buf[:1], check.DeepEquals, []byte{1})
1065 pos, err = f.Seek(0, io.SeekCurrent)
1066 c.Assert(err, check.IsNil)
1067 c.Check(pos, check.Equals, int64(1))
1068 f.Write([]byte{4, 5, 6})
1069 pos, err = f.Seek(0, io.SeekCurrent)
1070 c.Assert(err, check.IsNil)
1071 c.Check(pos, check.Equals, int64(6))
1072 f.Seek(0, io.SeekStart)
1073 n, err = f.Read(buf)
1074 c.Check(buf[:n], check.DeepEquals, []byte{1, 2, 3, 4, 5, 6})
1075 c.Check(err, check.Equals, io.EOF)
1078 f, err = fs.OpenFile("append", os.O_RDWR|os.O_APPEND, 0)
1079 c.Assert(err, check.IsNil)
1080 pos, err = f.Seek(0, io.SeekCurrent)
1081 c.Check(pos, check.Equals, int64(0))
1082 c.Check(err, check.IsNil)
1084 pos, _ = f.Seek(0, io.SeekCurrent)
1085 c.Check(pos, check.Equals, int64(3))
1086 f.Write([]byte{7, 8, 9})
1087 pos, err = f.Seek(0, io.SeekCurrent)
1088 c.Check(err, check.IsNil)
1089 c.Check(pos, check.Equals, int64(9))
1092 f, err = fs.OpenFile("wronly", os.O_CREATE|os.O_WRONLY, 0)
1093 c.Assert(err, check.IsNil)
1094 n, err = f.Write([]byte{3, 2, 1})
1095 c.Check(n, check.Equals, 3)
1096 c.Check(err, check.IsNil)
1097 pos, _ = f.Seek(0, io.SeekCurrent)
1098 c.Check(pos, check.Equals, int64(3))
1099 pos, _ = f.Seek(0, io.SeekStart)
1100 c.Check(pos, check.Equals, int64(0))
1101 n, err = f.Read(buf)
1102 c.Check(n, check.Equals, 0)
1103 c.Check(err, check.ErrorMatches, `.*O_WRONLY.*`)
1104 f, err = fs.OpenFile("wronly", os.O_RDONLY, 0)
1105 c.Assert(err, check.IsNil)
1107 c.Check(buf[:n], check.DeepEquals, []byte{3, 2, 1})
1109 f, err = fs.OpenFile("unsupported", os.O_CREATE|os.O_SYNC, 0)
1110 c.Check(f, check.IsNil)
1111 c.Check(err, check.NotNil)
1113 f, err = fs.OpenFile("append", os.O_RDWR|os.O_WRONLY, 0)
1114 c.Check(f, check.IsNil)
1115 c.Check(err, check.ErrorMatches, `invalid flag.*`)
1118 func (s *CollectionFSSuite) TestFlushFullBlocksWritingLongFile(c *check.C) {
1119 defer func(cw, mbs int) {
1120 concurrentWriters = cw
1122 }(concurrentWriters, maxBlockSize)
1123 concurrentWriters = 2
1126 proceed := make(chan struct{})
1127 var started, concurrent int32
1129 s.kc.onWrite = func([]byte) {
1130 atomic.AddInt32(&concurrent, 1)
1131 switch atomic.AddInt32(&started, 1) {
1133 // Wait until block 2 starts and finishes, and block 3 starts
1136 c.Check(blk2done, check.Equals, true)
1137 case <-time.After(time.Second):
1138 c.Error("timed out")
1141 time.Sleep(time.Millisecond)
1146 time.Sleep(time.Millisecond)
1148 c.Check(atomic.AddInt32(&concurrent, -1) < int32(concurrentWriters), check.Equals, true)
1151 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1152 c.Assert(err, check.IsNil)
1153 f, err := fs.OpenFile("50K", os.O_WRONLY|os.O_CREATE, 0)
1154 c.Assert(err, check.IsNil)
1157 data := make([]byte, 500)
1160 for i := 0; i < 100; i++ {
1161 n, err := f.Write(data)
1162 c.Assert(n, check.Equals, len(data))
1163 c.Assert(err, check.IsNil)
1166 currentMemExtents := func() (memExtents []int) {
1167 for idx, e := range f.(*filehandle).inode.(*filenode).segments {
1170 memExtents = append(memExtents, idx)
1175 f.(*filehandle).inode.(*filenode).waitPrune()
1176 c.Check(currentMemExtents(), check.HasLen, 1)
1178 m, err := fs.MarshalManifest(".")
1179 c.Check(m, check.Matches, `[^:]* 0:50000:50K\n`)
1180 c.Check(err, check.IsNil)
1181 c.Check(currentMemExtents(), check.HasLen, 0)
1184 // Ensure blocks get flushed to disk if a lot of data is written to
1185 // small files/directories without calling sync().
1187 // Write four 512KiB files into each of 256 top-level dirs (total
1188 // 512MiB), calling Flush() every 8 dirs. Ensure memory usage never
1189 // exceeds 24MiB (4 concurrentWriters * 2MiB + 8 unflushed dirs *
1191 func (s *CollectionFSSuite) TestFlushAll(c *check.C) {
1192 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1193 c.Assert(err, check.IsNil)
1195 s.kc.onWrite = func([]byte) {
1196 // discard flushed data -- otherwise the stub will use
1198 time.Sleep(time.Millisecond)
1201 s.kc.blocks = map[string][]byte{}
1203 for i := 0; i < 256; i++ {
1204 buf := bytes.NewBuffer(make([]byte, 524288))
1205 fmt.Fprintf(buf, "test file in dir%d", i)
1207 dir := fmt.Sprintf("dir%d", i)
1209 for j := 0; j < 2; j++ {
1210 f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1211 c.Assert(err, check.IsNil)
1213 _, err = io.Copy(f, buf)
1214 c.Assert(err, check.IsNil)
1221 size := fs.MemorySize()
1222 if !c.Check(size <= 1<<24, check.Equals, true) {
1223 c.Logf("at dir%d fs.MemorySize()=%d", i, size)
1229 // Ensure short blocks at the end of a stream don't get flushed by
1232 // Write 67x 1MiB files to each of 8 dirs, and check that 8 full 64MiB
1233 // blocks have been flushed while 8x 3MiB is still buffered in memory.
1234 func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
1235 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1236 c.Assert(err, check.IsNil)
1239 s.kc.onWrite = func(p []byte) {
1240 atomic.AddInt64(&flushed, int64(len(p)))
1245 megabyte := make([]byte, 1<<20)
1246 for i := int64(0); i < nDirs; i++ {
1247 dir := fmt.Sprintf("dir%d", i)
1249 for j := int64(0); j < nFiles; j++ {
1250 f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1251 c.Assert(err, check.IsNil)
1253 _, err = f.Write(megabyte)
1254 c.Assert(err, check.IsNil)
1257 inodebytes := int64((nDirs*(nFiles+1) + 1) * 64)
1258 c.Check(fs.MemorySize(), check.Equals, nDirs*nFiles*(1<<20+64)+inodebytes)
1259 c.Check(flushed, check.Equals, int64(0))
1261 waitForFlush := func(expectUnflushed, expectFlushed int64) {
1262 for deadline := time.Now().Add(5 * time.Second); fs.MemorySize() > expectUnflushed && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
1264 c.Check(fs.MemorySize(), check.Equals, expectUnflushed)
1265 c.Check(flushed, check.Equals, expectFlushed)
1268 // Nothing flushed yet
1269 waitForFlush(nDirs*nFiles*(1<<20+64)+inodebytes, 0)
1271 // Flushing a non-empty dir "/" is non-recursive and there are
1272 // no top-level files, so this has no effect
1273 fs.Flush("/", false)
1274 waitForFlush(nDirs*nFiles*(1<<20+64)+inodebytes, 0)
1276 // Flush the full block in dir0
1277 fs.Flush("dir0", false)
1278 bigloclen := int64(32 + 9 + 51 + 64) // md5 + "+" + "67xxxxxx" + "+Axxxxxx..." + 64 (see (storedSegment)memorySize)
1279 waitForFlush((nDirs*nFiles-64)*(1<<20+64)+inodebytes+bigloclen*64, 64<<20)
1281 err = fs.Flush("dir-does-not-exist", false)
1282 c.Check(err, check.NotNil)
1284 // Flush full blocks in all dirs
1286 waitForFlush(nDirs*3*(1<<20+64)+inodebytes+bigloclen*64*nDirs, nDirs*64<<20)
1288 // Flush non-full blocks, too
1290 smallloclen := int64(32 + 8 + 51 + 64) // md5 + "+" + "3xxxxxx" + "+Axxxxxx..." + 64 (see (storedSegment)memorySize)
1291 waitForFlush(inodebytes+bigloclen*64*nDirs+smallloclen*3*nDirs, nDirs*67<<20)
1294 // Even when writing lots of files/dirs from different goroutines, as
1295 // long as Flush(dir,false) is called after writing each file,
1296 // unflushed data should be limited to one full block per
1297 // concurrentWriter, plus one nearly-full block at the end of each
1299 func (s *CollectionFSSuite) TestMaxUnflushed(c *check.C) {
1301 maxUnflushed := (int64(concurrentWriters) + nDirs) << 26
1303 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1304 c.Assert(err, check.IsNil)
1306 release := make(chan struct{})
1307 timeout := make(chan struct{})
1308 time.AfterFunc(10*time.Second, func() { close(timeout) })
1309 var putCount, concurrency int64
1311 s.kc.onWrite = func(p []byte) {
1312 defer atomic.AddInt64(&unflushed, -int64(len(p)))
1313 cur := atomic.AddInt64(&concurrency, 1)
1314 defer atomic.AddInt64(&concurrency, -1)
1315 pc := atomic.AddInt64(&putCount, 1)
1316 if pc < int64(concurrentWriters) {
1317 // Block until we reach concurrentWriters, to
1318 // make sure we're really accepting concurrent
1325 } else if pc == int64(concurrentWriters) {
1326 // Unblock the first N-1 PUT reqs.
1329 c.Assert(cur <= int64(concurrentWriters), check.Equals, true)
1330 c.Assert(atomic.LoadInt64(&unflushed) <= maxUnflushed, check.Equals, true)
1333 var owg sync.WaitGroup
1334 megabyte := make([]byte, 1<<20)
1335 for i := int64(0); i < nDirs; i++ {
1336 dir := fmt.Sprintf("dir%d", i)
1341 defer fs.Flush(dir, true)
1342 var iwg sync.WaitGroup
1344 for j := 0; j < 67; j++ {
1348 f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1349 c.Assert(err, check.IsNil)
1351 n, err := f.Write(megabyte)
1352 c.Assert(err, check.IsNil)
1353 atomic.AddInt64(&unflushed, int64(n))
1354 fs.Flush(dir, false)
1363 func (s *CollectionFSSuite) TestFlushStress(c *check.C) {
1365 defer func() { done = true }()
1366 time.AfterFunc(10*time.Second, func() {
1368 pprof.Lookup("goroutine").WriteTo(os.Stderr, 1)
1374 s.kc.onWrite = func(p []byte) {
1376 s.kc.blocks = map[string][]byte{}
1378 defer c.Logf("wrote block %d, %d bytes", wrote, len(p))
1380 time.Sleep(20 * time.Millisecond)
1383 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1384 c.Assert(err, check.IsNil)
1386 data := make([]byte, 1<<20)
1387 for i := 0; i < 3; i++ {
1388 dir := fmt.Sprintf("dir%d", i)
1390 for j := 0; j < 200; j++ {
1392 f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1393 c.Assert(err, check.IsNil)
1394 _, err = f.Write(data)
1395 c.Assert(err, check.IsNil)
1397 fs.Flush(dir, false)
1399 _, err := fs.MarshalManifest(".")
1400 c.Check(err, check.IsNil)
1404 func (s *CollectionFSSuite) TestFlushShort(c *check.C) {
1405 s.kc.onWrite = func([]byte) {
1407 s.kc.blocks = map[string][]byte{}
1410 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1411 c.Assert(err, check.IsNil)
1412 for _, blocksize := range []int{8, 1000000} {
1413 dir := fmt.Sprintf("dir%d", blocksize)
1414 err = fs.Mkdir(dir, 0755)
1415 c.Assert(err, check.IsNil)
1416 data := make([]byte, blocksize)
1417 for i := 0; i < 100; i++ {
1418 f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, i), os.O_WRONLY|os.O_CREATE, 0)
1419 c.Assert(err, check.IsNil)
1420 _, err = f.Write(data)
1421 c.Assert(err, check.IsNil)
1423 fs.Flush(dir, false)
1426 _, err := fs.MarshalManifest(".")
1427 c.Check(err, check.IsNil)
1431 func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
1432 for _, txt := range []string{
1436 ". d41d8cd98f00b204e9800998ecf8427e+0\n",
1437 ". d41d8cd98f00b204e9800998ecf8427e+0 \n",
1440 ". 0:0:foo 0:0:bar\n",
1441 ". d41d8cd98f00b204e9800998ecf8427e 0:0:foo\n",
1442 ". d41d8cd98f00b204e9800998ecf8427e+0 :0:0:foo\n",
1443 ". d41d8cd98f00b204e9800998ecf8427e+0 foo:0:foo\n",
1444 ". d41d8cd98f00b204e9800998ecf8427e+0 0:foo:foo\n",
1445 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:foo 1:1:bar\n",
1446 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\n",
1447 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\\057\\056\n",
1448 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:.\n",
1449 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:..\n",
1450 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:..\n",
1451 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/..\n",
1452 ". d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n",
1453 "./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n. d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n",
1456 fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc)
1457 c.Check(fs, check.IsNil)
1458 c.Logf("-> %s", err)
1459 c.Check(err, check.NotNil)
1463 func (s *CollectionFSSuite) TestEdgeCaseManifests(c *check.C) {
1464 for _, txt := range []string{
1466 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo\n",
1467 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:...\n",
1468 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:. 0:0:. 0:0:\\056 0:0:\\056\n",
1469 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/. 0:0:. 0:0:foo\\057bar\\057\\056\n",
1470 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n",
1471 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/bar\n./foo d41d8cd98f00b204e9800998ecf8427e+0 0:0:bar\n",
1474 fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc)
1475 c.Check(err, check.IsNil)
1476 c.Check(fs, check.NotNil)
1480 func (s *CollectionFSSuite) TestSnapshotSplice(c *check.C) {
1481 filedata1 := "hello snapshot+splice world\n"
1482 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1483 c.Assert(err, check.IsNil)
1485 f, err := fs.OpenFile("file1", os.O_CREATE|os.O_RDWR, 0700)
1486 c.Assert(err, check.IsNil)
1487 _, err = f.Write([]byte(filedata1))
1488 c.Assert(err, check.IsNil)
1490 c.Assert(err, check.IsNil)
1493 snap, err := Snapshot(fs, "/")
1494 c.Assert(err, check.IsNil)
1495 err = Splice(fs, "dir1", snap)
1496 c.Assert(err, check.IsNil)
1497 f, err := fs.Open("dir1/file1")
1498 c.Assert(err, check.IsNil)
1499 buf, err := io.ReadAll(f)
1500 c.Assert(err, check.IsNil)
1501 c.Check(string(buf), check.Equals, filedata1)
1504 func (s *CollectionFSSuite) TestRefreshSignatures(c *check.C) {
1505 filedata1 := "hello refresh signatures world\n"
1506 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1507 c.Assert(err, check.IsNil)
1508 fs.Mkdir("d1", 0700)
1509 f, err := fs.OpenFile("d1/file1", os.O_CREATE|os.O_RDWR, 0700)
1510 c.Assert(err, check.IsNil)
1511 _, err = f.Write([]byte(filedata1))
1512 c.Assert(err, check.IsNil)
1514 c.Assert(err, check.IsNil)
1516 filedata2 := "hello refresh signatures universe\n"
1517 fs.Mkdir("d2", 0700)
1518 f, err = fs.OpenFile("d2/file2", os.O_CREATE|os.O_RDWR, 0700)
1519 c.Assert(err, check.IsNil)
1520 _, err = f.Write([]byte(filedata2))
1521 c.Assert(err, check.IsNil)
1523 c.Assert(err, check.IsNil)
1524 txt, err := fs.MarshalManifest(".")
1525 c.Assert(err, check.IsNil)
1526 var saved Collection
1527 err = s.client.RequestAndDecode(&saved, "POST", "arvados/v1/collections", nil, map[string]interface{}{
1528 "select": []string{"manifest_text", "uuid", "portable_data_hash"},
1529 "collection": map[string]interface{}{
1530 "manifest_text": txt,
1533 c.Assert(err, check.IsNil)
1535 // Update signatures synchronously if they are already expired
1536 // when Read() is called.
1538 saved.ManifestText = SignManifest(saved.ManifestText, s.kc.authToken, time.Now().Add(-2*time.Second), s.kc.sigttl, []byte(s.kc.sigkey))
1539 fs, err := saved.FileSystem(s.client, s.kc)
1540 c.Assert(err, check.IsNil)
1541 f, err := fs.OpenFile("d1/file1", os.O_RDONLY, 0)
1542 c.Assert(err, check.IsNil)
1543 buf, err := ioutil.ReadAll(f)
1544 c.Check(err, check.IsNil)
1545 c.Check(string(buf), check.Equals, filedata1)
1548 // Update signatures asynchronously if we're more than half
1549 // way to TTL when Read() is called.
1551 exp := time.Now().Add(2 * time.Minute)
1552 saved.ManifestText = SignManifest(saved.ManifestText, s.kc.authToken, exp, s.kc.sigttl, []byte(s.kc.sigkey))
1553 fs, err := saved.FileSystem(s.client, s.kc)
1554 c.Assert(err, check.IsNil)
1555 f1, err := fs.OpenFile("d1/file1", os.O_RDONLY, 0)
1556 c.Assert(err, check.IsNil)
1557 f2, err := fs.OpenFile("d2/file2", os.O_RDONLY, 0)
1558 c.Assert(err, check.IsNil)
1559 buf, err := ioutil.ReadAll(f1)
1560 c.Check(err, check.IsNil)
1561 c.Check(string(buf), check.Equals, filedata1)
1563 // Ensure fs treats the 2-minute TTL as less than half
1564 // the server's signing TTL. If we don't do this,
1565 // collectionfs will guess the signature is fresh,
1566 // i.e., signing TTL is 2 minutes, and won't do an
1568 fs.(*collectionFileSystem).guessSignatureTTL = time.Hour
1571 for deadline := time.Now().Add(time.Second * 10); time.Now().Before(deadline) && !refreshed; time.Sleep(time.Second / 10) {
1572 _, err = f1.Seek(0, io.SeekStart)
1573 c.Assert(err, check.IsNil)
1574 buf, err = ioutil.ReadAll(f1)
1575 c.Assert(err, check.IsNil)
1576 c.Assert(string(buf), check.Equals, filedata1)
1577 loc := s.kc.reads[len(s.kc.reads)-1]
1578 t, err := signatureExpiryTime(loc)
1579 c.Assert(err, check.IsNil)
1580 c.Logf("last read block %s had signature expiry time %v", loc, t)
1581 if t.Sub(time.Now()) > time.Hour {
1585 c.Check(refreshed, check.Equals, true)
1587 // Second locator should have been updated at the same
1589 buf, err = ioutil.ReadAll(f2)
1590 c.Assert(err, check.IsNil)
1591 c.Assert(string(buf), check.Equals, filedata2)
1592 loc := s.kc.reads[len(s.kc.reads)-1]
1593 c.Check(loc, check.Not(check.Equals), s.kc.reads[len(s.kc.reads)-2])
1594 t, err := signatureExpiryTime(s.kc.reads[len(s.kc.reads)-1])
1595 c.Assert(err, check.IsNil)
1596 c.Logf("last read block %s had signature expiry time %v", loc, t)
1597 c.Check(t.Sub(time.Now()) > time.Hour, check.Equals, true)
1601 var bigmanifest = func() string {
1602 var buf bytes.Buffer
1603 for i := 0; i < 2000; i++ {
1604 fmt.Fprintf(&buf, "./dir%d", i)
1605 for i := 0; i < 100; i++ {
1606 fmt.Fprintf(&buf, " d41d8cd98f00b204e9800998ecf8427e+99999")
1608 for i := 0; i < 2000; i++ {
1609 fmt.Fprintf(&buf, " 1200000:300000:file%d", i)
1611 fmt.Fprintf(&buf, "\n")
1616 func (s *CollectionFSSuite) BenchmarkParseManifest(c *check.C) {
1617 DebugLocksPanicMode = false
1618 c.Logf("test manifest is %d bytes", len(bigmanifest))
1619 for i := 0; i < c.N; i++ {
1620 fs, err := (&Collection{ManifestText: bigmanifest}).FileSystem(s.client, s.kc)
1621 c.Check(err, check.IsNil)
1622 c.Check(fs, check.NotNil)
1626 func (s *CollectionFSSuite) checkMemSize(c *check.C, f File) {
1627 fn := f.(*filehandle).inode.(*filenode)
1629 for _, seg := range fn.segments {
1630 if e, ok := seg.(*memSegment); ok {
1631 memsize += int64(len(e.buf))
1634 c.Check(fn.memsize, check.Equals, memsize)
1637 type CollectionFSUnitSuite struct{}
1639 var _ = check.Suite(&CollectionFSUnitSuite{})
1641 // expect ~2 seconds to load a manifest with 256K files
1642 func (s *CollectionFSUnitSuite) TestLargeManifest(c *check.C) {
1643 if testing.Short() {
1652 mb := bytes.NewBuffer(make([]byte, 0, 40000000))
1653 for i := 0; i < dirCount; i++ {
1654 fmt.Fprintf(mb, "./dir%d", i)
1655 for j := 0; j <= fileCount; j++ {
1656 fmt.Fprintf(mb, " %032x+42+A%040x@%08x", j, j, j)
1658 for j := 0; j < fileCount; j++ {
1659 fmt.Fprintf(mb, " %d:%d:dir%d/file%d", j*42+21, 42, j, j)
1661 mb.Write([]byte{'\n'})
1663 coll := Collection{ManifestText: mb.String()}
1664 c.Logf("%s built", time.Now())
1666 var memstats runtime.MemStats
1667 runtime.ReadMemStats(&memstats)
1668 c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys)
1670 f, err := coll.FileSystem(NewClientFromEnv(), &keepClientStub{})
1671 c.Check(err, check.IsNil)
1672 c.Logf("%s loaded", time.Now())
1673 c.Check(f.Size(), check.Equals, int64(42*dirCount*fileCount))
1675 for i := 0; i < dirCount; i++ {
1676 for j := 0; j < fileCount; j++ {
1677 f.Stat(fmt.Sprintf("./dir%d/dir%d/file%d", i, j, j))
1680 c.Logf("%s Stat() x %d", time.Now(), dirCount*fileCount)
1682 runtime.ReadMemStats(&memstats)
1683 c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys)
1686 // Gocheck boilerplate
1687 func Test(t *testing.T) {