1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
30 check "gopkg.in/check.v1"
33 var _ = check.Suite(&CollectionFSSuite{})
35 type keepClientStub struct {
36 blocks map[string][]byte
37 refreshable map[string]bool
38 reads []string // locators from ReadAt() calls
39 onWrite func(bufcopy []byte) error // called from WriteBlock, before acquiring lock
40 authToken string // client's auth token (used for signing locators)
41 sigkey string // blob signing key
42 sigttl time.Duration // blob signing ttl
46 var errStub404 = errors.New("404 block not found")
48 func (kcs *keepClientStub) ReadAt(locator string, p []byte, off int) (int, error) {
50 kcs.reads = append(kcs.reads, locator)
54 if err := VerifySignature(locator, kcs.authToken, kcs.sigttl, []byte(kcs.sigkey)); err != nil {
57 buf := kcs.blocks[locator[:32]]
61 return copy(p, buf[off:]), nil
64 func (kcs *keepClientStub) BlockWrite(_ context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
67 buf = make([]byte, opts.DataSize)
68 _, err := io.ReadFull(opts.Reader, buf)
70 return BlockWriteResponse{}, err
73 buf = append([]byte(nil), opts.Data...)
75 locator := SignLocator(fmt.Sprintf("%x+%d", md5.Sum(buf), len(buf)), kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
76 if kcs.onWrite != nil {
77 err := kcs.onWrite(buf)
79 return BlockWriteResponse{}, err
82 for _, sc := range opts.StorageClasses {
84 return BlockWriteResponse{}, fmt.Errorf("stub does not write storage class %q", sc)
89 kcs.blocks[locator[:32]] = buf
90 return BlockWriteResponse{Locator: locator, Replicas: 1}, nil
93 var reRemoteSignature = regexp.MustCompile(`\+[AR][^+]*`)
95 func (kcs *keepClientStub) LocalLocator(locator string) (string, error) {
96 if strings.Contains(locator, "+A") {
101 if strings.Contains(locator, "+R") {
102 if len(locator) < 32 {
103 return "", fmt.Errorf("bad locator: %q", locator)
105 if _, ok := kcs.blocks[locator[:32]]; !ok && !kcs.refreshable[locator[:32]] {
106 return "", fmt.Errorf("kcs.refreshable[%q]==false", locator)
109 locator = reRemoteSignature.ReplaceAllLiteralString(locator, "")
110 locator = SignLocator(locator, kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
114 type CollectionFSSuite struct {
117 fs CollectionFileSystem
121 func (s *CollectionFSSuite) SetUpTest(c *check.C) {
122 s.client = NewClientFromEnv()
123 s.client.AuthToken = fixtureActiveToken
124 err := s.client.RequestAndDecode(&s.coll, "GET", "arvados/v1/collections/"+fixtureFooAndBarFilesInDirUUID, nil, nil)
125 c.Assert(err, check.IsNil)
126 s.kc = &keepClientStub{
127 blocks: map[string][]byte{
128 "3858f62230ac3c915f300c664312c63f": []byte("foobar"),
130 sigkey: fixtureBlobSigningKey,
131 sigttl: fixtureBlobSigningTTL,
132 authToken: fixtureActiveToken,
134 s.fs, err = s.coll.FileSystem(s.client, s.kc)
135 c.Assert(err, check.IsNil)
138 func (s *CollectionFSSuite) TestSyncNonCanonicalManifest(c *check.C) {
140 err := s.client.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+fixtureFooAndBarFilesInDirUUID, nil, nil)
141 c.Assert(err, check.IsNil)
142 mtxt := strings.Replace(coll.ManifestText, "3:3:bar 0:3:foo", "0:3:foo 3:3:bar", -1)
143 c.Assert(mtxt, check.Not(check.Equals), coll.ManifestText)
144 err = s.client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
145 "collection": map[string]interface{}{
146 "manifest_text": mtxt}})
147 c.Assert(err, check.IsNil)
148 // In order for the rest of the test to work as intended, the API server
149 // needs to retain the file ordering we set manually. We check that here.
150 // We can't check `mtxt == coll.ManifestText` because the API server
151 // might've returned new block signatures if the GET and POST happened in
152 // different seconds.
153 expectPattern := `\./dir1 \S+ 0:3:foo 3:3:bar\n`
154 c.Assert(coll.ManifestText, check.Matches, expectPattern)
156 fs, err := coll.FileSystem(s.client, s.kc)
157 c.Assert(err, check.IsNil)
159 c.Check(err, check.IsNil)
161 // fs had no local changes, so Sync should not have saved
162 // anything back to the API/database. (If it did, we would see
163 // the manifest rewritten in canonical order.)
165 err = s.client.RequestAndDecode(&saved, "GET", "arvados/v1/collections/"+coll.UUID, nil, nil)
166 c.Assert(err, check.IsNil)
167 c.Check(saved.ManifestText, check.Matches, expectPattern)
170 func (s *CollectionFSSuite) TestHttpFileSystemInterface(c *check.C) {
171 _, ok := s.fs.(http.FileSystem)
172 c.Check(ok, check.Equals, true)
175 func (s *CollectionFSSuite) TestUnattainableStorageClasses(c *check.C) {
176 fs, err := (&Collection{
177 StorageClassesDesired: []string{"unobtainium"},
178 }).FileSystem(s.client, s.kc)
179 c.Assert(err, check.IsNil)
181 f, err := fs.OpenFile("/foo", os.O_CREATE|os.O_WRONLY, 0777)
182 c.Assert(err, check.IsNil)
183 _, err = f.Write([]byte("food"))
184 c.Assert(err, check.IsNil)
186 c.Assert(err, check.IsNil)
187 _, err = fs.MarshalManifest(".")
188 c.Assert(err, check.ErrorMatches, `.*stub does not write storage class \"unobtainium\"`)
191 func (s *CollectionFSSuite) TestColonInFilename(c *check.C) {
192 fs, err := (&Collection{
193 ManifestText: "./foo:foo 3858f62230ac3c915f300c664312c63f+3 0:3:bar:bar\n",
194 }).FileSystem(s.client, s.kc)
195 c.Assert(err, check.IsNil)
197 f, err := fs.Open("/foo:foo")
198 c.Assert(err, check.IsNil)
200 fis, err := f.Readdir(0)
201 c.Check(err, check.IsNil)
202 c.Check(len(fis), check.Equals, 1)
203 c.Check(fis[0].Name(), check.Equals, "bar:bar")
206 func (s *CollectionFSSuite) TestReaddirFull(c *check.C) {
207 f, err := s.fs.Open("/dir1")
208 c.Assert(err, check.IsNil)
211 c.Assert(err, check.IsNil)
212 c.Check(st.Size(), check.Equals, int64(2))
213 c.Check(st.IsDir(), check.Equals, true)
215 fis, err := f.Readdir(0)
216 c.Check(err, check.IsNil)
217 c.Check(len(fis), check.Equals, 2)
219 c.Check(fis[0].Size(), check.Equals, int64(3))
223 func (s *CollectionFSSuite) TestReaddirLimited(c *check.C) {
224 f, err := s.fs.Open("./dir1")
225 c.Assert(err, check.IsNil)
227 fis, err := f.Readdir(1)
228 c.Check(err, check.IsNil)
229 c.Check(len(fis), check.Equals, 1)
231 c.Check(fis[0].Size(), check.Equals, int64(3))
234 fis, err = f.Readdir(1)
235 c.Check(err, check.IsNil)
236 c.Check(len(fis), check.Equals, 1)
238 c.Check(fis[0].Size(), check.Equals, int64(3))
241 fis, err = f.Readdir(1)
242 c.Check(len(fis), check.Equals, 0)
243 c.Check(err, check.NotNil)
244 c.Check(err, check.Equals, io.EOF)
246 f, err = s.fs.Open("dir1")
247 c.Assert(err, check.IsNil)
248 fis, err = f.Readdir(1)
249 c.Check(len(fis), check.Equals, 1)
250 c.Assert(err, check.IsNil)
251 fis, err = f.Readdir(2)
252 c.Check(len(fis), check.Equals, 1)
253 c.Assert(err, check.IsNil)
254 fis, err = f.Readdir(2)
255 c.Check(len(fis), check.Equals, 0)
256 c.Assert(err, check.Equals, io.EOF)
259 func (s *CollectionFSSuite) TestPathMunge(c *check.C) {
260 for _, path := range []string{".", "/", "./", "///", "/../", "/./.."} {
261 f, err := s.fs.Open(path)
262 c.Assert(err, check.IsNil)
265 c.Assert(err, check.IsNil)
266 c.Check(st.Size(), check.Equals, int64(1))
267 c.Check(st.IsDir(), check.Equals, true)
269 for _, path := range []string{"/dir1", "dir1", "./dir1", "///dir1//.//", "../dir1/../dir1/"} {
271 f, err := s.fs.Open(path)
272 c.Assert(err, check.IsNil)
275 c.Assert(err, check.IsNil)
276 c.Check(st.Size(), check.Equals, int64(2))
277 c.Check(st.IsDir(), check.Equals, true)
281 func (s *CollectionFSSuite) TestNotExist(c *check.C) {
282 for _, path := range []string{"/no", "no", "./no", "n/o", "/n/o"} {
283 f, err := s.fs.Open(path)
284 c.Assert(f, check.IsNil)
285 c.Assert(err, check.NotNil)
286 c.Assert(os.IsNotExist(err), check.Equals, true)
290 func (s *CollectionFSSuite) TestReadOnlyFile(c *check.C) {
291 f, err := s.fs.OpenFile("/dir1/foo", os.O_RDONLY, 0)
292 c.Assert(err, check.IsNil)
294 c.Assert(err, check.IsNil)
295 c.Check(st.Size(), check.Equals, int64(3))
296 n, err := f.Write([]byte("bar"))
297 c.Check(n, check.Equals, 0)
298 c.Check(err, check.Equals, ErrReadOnlyFile)
301 func (s *CollectionFSSuite) TestCreateFile(c *check.C) {
302 f, err := s.fs.OpenFile("/new-file 1", os.O_RDWR|os.O_CREATE, 0)
303 c.Assert(err, check.IsNil)
305 c.Assert(err, check.IsNil)
306 c.Check(st.Size(), check.Equals, int64(0))
308 n, err := f.Write([]byte("bar"))
309 c.Check(n, check.Equals, 3)
310 c.Check(err, check.IsNil)
312 c.Check(f.Close(), check.IsNil)
314 f, err = s.fs.OpenFile("/new-file 1", os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
315 c.Check(f, check.IsNil)
316 c.Assert(err, check.NotNil)
318 f, err = s.fs.OpenFile("/new-file 1", os.O_RDWR, 0)
319 c.Assert(err, check.IsNil)
321 c.Assert(err, check.IsNil)
322 c.Check(st.Size(), check.Equals, int64(3))
324 c.Check(f.Close(), check.IsNil)
326 m, err := s.fs.MarshalManifest(".")
327 c.Assert(err, check.IsNil)
328 c.Check(m, check.Matches, `. 37b51d194a7513e45b56f6524f2d51f2\+3\+\S+ 0:3:new-file\\0401\n./dir1 .* 3:3:bar 0:3:foo\n`)
331 func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
333 defer func() { maxBlockSize = 2 << 26 }()
335 f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
336 c.Assert(err, check.IsNil)
339 c.Assert(err, check.IsNil)
340 c.Check(st.Size(), check.Equals, int64(3))
342 f2, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
343 c.Assert(err, check.IsNil)
346 buf := make([]byte, 64)
347 n, err := f.Read(buf)
348 c.Check(n, check.Equals, 3)
349 c.Check(err, check.Equals, io.EOF)
350 c.Check(string(buf[:3]), check.DeepEquals, "foo")
352 pos, err := f.Seek(-2, io.SeekCurrent)
353 c.Check(pos, check.Equals, int64(1))
354 c.Check(err, check.IsNil)
356 // Split a storedExtent in two, and insert a memExtent
357 n, err = f.Write([]byte("*"))
358 c.Check(n, check.Equals, 1)
359 c.Check(err, check.IsNil)
361 pos, err = f.Seek(0, io.SeekCurrent)
362 c.Check(pos, check.Equals, int64(2))
363 c.Check(err, check.IsNil)
365 pos, err = f.Seek(0, io.SeekStart)
366 c.Check(pos, check.Equals, int64(0))
367 c.Check(err, check.IsNil)
369 rbuf, err := ioutil.ReadAll(f)
370 c.Check(len(rbuf), check.Equals, 3)
371 c.Check(err, check.IsNil)
372 c.Check(string(rbuf), check.Equals, "f*o")
374 // Write multiple blocks in one call
375 f.Seek(1, io.SeekStart)
376 n, err = f.Write([]byte("0123456789abcdefg"))
377 c.Check(n, check.Equals, 17)
378 c.Check(err, check.IsNil)
379 pos, err = f.Seek(0, io.SeekCurrent)
380 c.Check(pos, check.Equals, int64(18))
381 c.Check(err, check.IsNil)
382 pos, err = f.Seek(-18, io.SeekCurrent)
383 c.Check(pos, check.Equals, int64(0))
384 c.Check(err, check.IsNil)
385 n, err = io.ReadFull(f, buf)
386 c.Check(n, check.Equals, 18)
387 c.Check(err, check.Equals, io.ErrUnexpectedEOF)
388 c.Check(string(buf[:n]), check.Equals, "f0123456789abcdefg")
390 buf2, err := ioutil.ReadAll(f2)
391 c.Check(err, check.IsNil)
392 c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
394 // truncate to current size
396 c.Check(err, check.IsNil)
397 f2.Seek(0, io.SeekStart)
398 buf2, err = ioutil.ReadAll(f2)
399 c.Check(err, check.IsNil)
400 c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
402 // shrink to zero some data
404 f2.Seek(0, io.SeekStart)
405 buf2, err = ioutil.ReadAll(f2)
406 c.Check(err, check.IsNil)
407 c.Check(string(buf2), check.Equals, "f0123456789abcd")
409 // grow to partial block/extent
411 f2.Seek(0, io.SeekStart)
412 buf2, err = ioutil.ReadAll(f2)
413 c.Check(err, check.IsNil)
414 c.Check(string(buf2), check.Equals, "f0123456789abcd\x00\x00\x00\x00\x00")
417 f2.Seek(0, io.SeekStart)
418 f2.Write([]byte("12345678abcdefghijkl"))
420 // grow to block/extent boundary
422 f2.Seek(0, io.SeekStart)
423 buf2, err = ioutil.ReadAll(f2)
424 c.Check(err, check.IsNil)
425 c.Check(len(buf2), check.Equals, 64)
426 c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 8)
428 // shrink to block/extent boundary
430 c.Check(err, check.IsNil)
431 f2.Seek(0, io.SeekStart)
432 buf2, err = ioutil.ReadAll(f2)
433 c.Check(err, check.IsNil)
434 c.Check(len(buf2), check.Equals, 32)
435 c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 4)
437 // shrink to partial block/extent
439 c.Check(err, check.IsNil)
440 f2.Seek(0, io.SeekStart)
441 buf2, err = ioutil.ReadAll(f2)
442 c.Check(err, check.IsNil)
443 c.Check(string(buf2), check.Equals, "12345678abcdefg")
444 c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 2)
446 // Force flush to ensure the block "12345678" gets stored, so
447 // we know what to expect in the final manifest below.
448 _, err = s.fs.MarshalManifest(".")
449 c.Check(err, check.IsNil)
451 // Truncate to size=3 while f2's ptr is at 15
453 c.Check(err, check.IsNil)
454 buf2, err = ioutil.ReadAll(f2)
455 c.Check(err, check.IsNil)
456 c.Check(string(buf2), check.Equals, "")
457 f2.Seek(0, io.SeekStart)
458 buf2, err = ioutil.ReadAll(f2)
459 c.Check(err, check.IsNil)
460 c.Check(string(buf2), check.Equals, "123")
461 c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 1)
463 m, err := s.fs.MarshalManifest(".")
464 c.Check(err, check.IsNil)
465 m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
466 c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 25d55ad283aa400af464c76d713c07ad+8 3:3:bar 6:3:foo\n")
467 c.Check(s.fs.Size(), check.Equals, int64(6))
470 func (s *CollectionFSSuite) TestSeekSparse(c *check.C) {
471 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
472 c.Assert(err, check.IsNil)
473 f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755)
474 c.Assert(err, check.IsNil)
477 checkSize := func(size int64) {
479 c.Assert(err, check.IsNil)
480 c.Check(fi.Size(), check.Equals, size)
482 f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755)
483 c.Assert(err, check.IsNil)
486 c.Check(err, check.IsNil)
487 c.Check(fi.Size(), check.Equals, size)
488 pos, err := f.Seek(0, io.SeekEnd)
489 c.Check(err, check.IsNil)
490 c.Check(pos, check.Equals, size)
493 f.Seek(2, io.SeekEnd)
498 f.Seek(2, io.SeekCurrent)
503 f.Seek(8, io.SeekStart)
505 n, err := f.Read(make([]byte, 1))
506 c.Check(n, check.Equals, 0)
507 c.Check(err, check.Equals, io.EOF)
509 f.Write([]byte{1, 2, 3})
513 func (s *CollectionFSSuite) TestMarshalCopiesRemoteBlocks(c *check.C) {
516 hash := map[string]string{
517 foo: fmt.Sprintf("%x", md5.Sum([]byte(foo))),
518 bar: fmt.Sprintf("%x", md5.Sum([]byte(bar))),
521 fs, err := (&Collection{
522 ManifestText: ". " + hash[foo] + "+3+Rzaaaa-foo@bab " + hash[bar] + "+3+A12345@ffffff 0:2:fo.txt 2:4:obar.txt\n",
523 }).FileSystem(s.client, s.kc)
524 c.Assert(err, check.IsNil)
525 manifest, err := fs.MarshalManifest(".")
526 c.Check(manifest, check.Equals, "")
527 c.Check(err, check.NotNil)
529 s.kc.refreshable = map[string]bool{hash[bar]: true}
531 for _, sigIn := range []string{"Rzaaaa-foo@bab", "A12345@abcde"} {
532 fs, err = (&Collection{
533 ManifestText: ". " + hash[foo] + "+3+A12345@fffff " + hash[bar] + "+3+" + sigIn + " 0:2:fo.txt 2:4:obar.txt\n",
534 }).FileSystem(s.client, s.kc)
535 c.Assert(err, check.IsNil)
536 manifest, err := fs.MarshalManifest(".")
537 c.Check(err, check.IsNil)
538 // Both blocks should now have +A signatures.
539 c.Check(manifest, check.Matches, `.*\+A.* .*\+A.*\n`)
540 c.Check(manifest, check.Not(check.Matches), `.*\+R.*\n`)
544 func (s *CollectionFSSuite) TestMarshalSmallBlocks(c *check.C) {
546 defer func() { maxBlockSize = 2 << 26 }()
549 s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
550 c.Assert(err, check.IsNil)
551 for _, name := range []string{"foo", "bar", "baz"} {
552 f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
553 c.Assert(err, check.IsNil)
554 f.Write([]byte(name))
558 m, err := s.fs.MarshalManifest(".")
559 c.Check(err, check.IsNil)
560 m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
561 c.Check(m, check.Equals, ". c3c23db5285662ef7172373df0003206+6 acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:bar 3:3:baz 6:3:foo\n")
564 func (s *CollectionFSSuite) TestMkdir(c *check.C) {
565 err := s.fs.Mkdir("foo/bar", 0755)
566 c.Check(err, check.Equals, os.ErrNotExist)
568 f, err := s.fs.OpenFile("foo/bar", os.O_CREATE, 0)
569 c.Check(err, check.Equals, os.ErrNotExist)
571 err = s.fs.Mkdir("foo", 0755)
572 c.Check(err, check.IsNil)
574 f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_WRONLY, 0)
575 c.Check(err, check.IsNil)
578 f.Write([]byte("foo"))
581 // mkdir fails if a file already exists with that name
582 err = s.fs.Mkdir("foo/bar", 0755)
583 c.Check(err, check.NotNil)
585 err = s.fs.Remove("foo/bar")
586 c.Check(err, check.IsNil)
588 // mkdir succeeds after the file is deleted
589 err = s.fs.Mkdir("foo/bar", 0755)
590 c.Check(err, check.IsNil)
592 // creating a file in a nonexistent subdir should still fail
593 f, err = s.fs.OpenFile("foo/bar/baz/foo.txt", os.O_CREATE|os.O_WRONLY, 0)
594 c.Check(err, check.Equals, os.ErrNotExist)
596 f, err = s.fs.OpenFile("foo/bar/foo.txt", os.O_CREATE|os.O_WRONLY, 0)
597 c.Check(err, check.IsNil)
600 f.Write([]byte("foo"))
603 // creating foo/bar as a regular file should fail
604 f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_EXCL, 0)
605 c.Check(err, check.NotNil)
607 // creating foo/bar as a directory should fail
608 f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_EXCL, os.ModeDir)
609 c.Check(err, check.NotNil)
610 err = s.fs.Mkdir("foo/bar", 0755)
611 c.Check(err, check.NotNil)
613 m, err := s.fs.MarshalManifest(".")
614 c.Check(err, check.IsNil)
615 m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
616 c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 3:3:bar 0:3:foo\n./foo/bar acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo.txt\n")
619 func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
625 defer func() { maxBlockSize = 1 << 26 }()
627 var wg sync.WaitGroup
628 for n := 0; n < 128; n++ {
632 f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
633 c.Assert(err, check.IsNil)
635 for i := 0; i < 1024; i++ {
639 _, err := s.fs.MarshalManifest(".")
640 c.Check(err, check.IsNil)
642 f.Truncate(int64(rand.Intn(64)))
644 f.Seek(int64(rand.Intn(64)), io.SeekStart)
646 _, err := f.Write([]byte("beep boop"))
647 c.Check(err, check.IsNil)
649 _, err := ioutil.ReadAll(f)
650 c.Check(err, check.IsNil)
657 f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
658 c.Assert(err, check.IsNil)
660 buf, err := ioutil.ReadAll(f)
661 c.Check(err, check.IsNil)
662 c.Logf("after lots of random r/w/seek/trunc, buf is %q", buf)
665 func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
667 defer func() { maxBlockSize = 2 << 26 }()
670 s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
671 c.Assert(err, check.IsNil)
674 const ngoroutines = 256
676 var wg sync.WaitGroup
677 for n := 0; n < ngoroutines; n++ {
681 expect := make([]byte, 0, 64)
682 wbytes := []byte("there's no simple explanation for anything important that any of us do")
683 f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
684 c.Assert(err, check.IsNil)
686 for i := 0; i < nfiles; i++ {
687 trunc := rand.Intn(65)
688 woff := rand.Intn(trunc + 1)
689 wbytes = wbytes[:rand.Intn(64-woff+1)]
690 for buf, i := expect[:cap(expect)], len(expect); i < trunc; i++ {
693 expect = expect[:trunc]
694 if trunc < woff+len(wbytes) {
695 expect = expect[:woff+len(wbytes)]
697 copy(expect[woff:], wbytes)
698 f.Truncate(int64(trunc))
699 pos, err := f.Seek(int64(woff), io.SeekStart)
700 c.Check(pos, check.Equals, int64(woff))
701 c.Check(err, check.IsNil)
702 n, err := f.Write(wbytes)
703 c.Check(n, check.Equals, len(wbytes))
704 c.Check(err, check.IsNil)
705 pos, err = f.Seek(0, io.SeekStart)
706 c.Check(pos, check.Equals, int64(0))
707 c.Check(err, check.IsNil)
708 buf, err := ioutil.ReadAll(f)
709 c.Check(string(buf), check.Equals, string(expect))
710 c.Check(err, check.IsNil)
716 for n := 0; n < ngoroutines; n++ {
717 f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDONLY, 0)
718 c.Assert(err, check.IsNil)
719 f.(*filehandle).inode.(*filenode).waitPrune()
724 root, err := s.fs.Open("/")
725 c.Assert(err, check.IsNil)
727 fi, err := root.Readdir(-1)
728 c.Check(err, check.IsNil)
729 c.Check(len(fi), check.Equals, nfiles)
731 _, err = s.fs.MarshalManifest(".")
732 c.Check(err, check.IsNil)
733 // TODO: check manifest content
736 func (s *CollectionFSSuite) TestRemove(c *check.C) {
737 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
738 c.Assert(err, check.IsNil)
739 err = fs.Mkdir("dir0", 0755)
740 c.Assert(err, check.IsNil)
741 err = fs.Mkdir("dir1", 0755)
742 c.Assert(err, check.IsNil)
743 err = fs.Mkdir("dir1/dir2", 0755)
744 c.Assert(err, check.IsNil)
745 err = fs.Mkdir("dir1/dir3", 0755)
746 c.Assert(err, check.IsNil)
748 err = fs.Remove("dir0")
749 c.Check(err, check.IsNil)
750 err = fs.Remove("dir0")
751 c.Check(err, check.Equals, os.ErrNotExist)
753 err = fs.Remove("dir1/dir2/.")
754 c.Check(err, check.Equals, ErrInvalidArgument)
755 err = fs.Remove("dir1/dir2/..")
756 c.Check(err, check.Equals, ErrInvalidArgument)
757 err = fs.Remove("dir1")
758 c.Check(err, check.Equals, ErrDirectoryNotEmpty)
759 err = fs.Remove("dir1/dir2/../../../dir1")
760 c.Check(err, check.Equals, ErrDirectoryNotEmpty)
761 err = fs.Remove("dir1/dir3/")
762 c.Check(err, check.IsNil)
763 err = fs.RemoveAll("dir1")
764 c.Check(err, check.IsNil)
765 err = fs.RemoveAll("dir1")
766 c.Check(err, check.IsNil)
769 func (s *CollectionFSSuite) TestRenameError(c *check.C) {
770 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
771 c.Assert(err, check.IsNil)
772 err = fs.Mkdir("first", 0755)
773 c.Assert(err, check.IsNil)
774 err = fs.Mkdir("first/second", 0755)
775 c.Assert(err, check.IsNil)
776 f, err := fs.OpenFile("first/second/file", os.O_CREATE|os.O_WRONLY, 0755)
777 c.Assert(err, check.IsNil)
778 f.Write([]byte{1, 2, 3, 4, 5})
780 err = fs.Rename("first", "first/second/third")
781 c.Check(err, check.Equals, ErrInvalidArgument)
782 err = fs.Rename("first", "first/third")
783 c.Check(err, check.Equals, ErrInvalidArgument)
784 err = fs.Rename("first/second", "second")
785 c.Check(err, check.IsNil)
786 f, err = fs.OpenFile("second/file", 0, 0)
787 c.Assert(err, check.IsNil)
788 data, err := ioutil.ReadAll(f)
789 c.Check(err, check.IsNil)
790 c.Check(data, check.DeepEquals, []byte{1, 2, 3, 4, 5})
793 func (s *CollectionFSSuite) TestRenameDirectory(c *check.C) {
794 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
795 c.Assert(err, check.IsNil)
796 err = fs.Mkdir("foo", 0755)
797 c.Assert(err, check.IsNil)
798 err = fs.Mkdir("bar", 0755)
799 c.Assert(err, check.IsNil)
800 err = fs.Rename("bar", "baz")
801 c.Check(err, check.IsNil)
802 err = fs.Rename("foo", "baz")
803 c.Check(err, check.NotNil)
804 err = fs.Rename("foo", "baz/")
805 c.Check(err, check.IsNil)
806 err = fs.Rename("baz/foo", ".")
807 c.Check(err, check.Equals, ErrInvalidArgument)
808 err = fs.Rename("baz/foo/", ".")
809 c.Check(err, check.Equals, ErrInvalidArgument)
812 func (s *CollectionFSSuite) TestRename(c *check.C) {
813 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
814 c.Assert(err, check.IsNil)
819 for i := 0; i < outer; i++ {
820 err = fs.Mkdir(fmt.Sprintf("dir%d", i), 0755)
821 c.Assert(err, check.IsNil)
822 for j := 0; j < inner; j++ {
823 err = fs.Mkdir(fmt.Sprintf("dir%d/dir%d", i, j), 0755)
824 c.Assert(err, check.IsNil)
825 for _, fnm := range []string{
826 fmt.Sprintf("dir%d/file%d", i, j),
827 fmt.Sprintf("dir%d/dir%d/file%d", i, j, j),
829 f, err := fs.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0755)
830 c.Assert(err, check.IsNil)
831 _, err = f.Write([]byte("beep"))
832 c.Assert(err, check.IsNil)
837 var wg sync.WaitGroup
838 for i := 0; i < outer; i++ {
839 for j := 0; j < inner; j++ {
843 oldname := fmt.Sprintf("dir%d/dir%d/file%d", i, j, j)
844 newname := fmt.Sprintf("dir%d/newfile%d", i, inner-j-1)
845 _, err := fs.Open(newname)
846 c.Check(err, check.Equals, os.ErrNotExist)
847 err = fs.Rename(oldname, newname)
848 c.Check(err, check.IsNil)
849 f, err := fs.Open(newname)
850 c.Check(err, check.IsNil)
857 // oldname does not exist
859 fmt.Sprintf("dir%d/dir%d/missing", i, j),
860 fmt.Sprintf("dir%d/dir%d/file%d", outer-i-1, j, j))
861 c.Check(err, check.ErrorMatches, `.*does not exist`)
863 // newname parent dir does not exist
865 fmt.Sprintf("dir%d/dir%d", i, j),
866 fmt.Sprintf("dir%d/missing/irrelevant", outer-i-1))
867 c.Check(err, check.ErrorMatches, `.*does not exist`)
869 // oldname parent dir is a file
871 fmt.Sprintf("dir%d/file%d/patherror", i, j),
872 fmt.Sprintf("dir%d/irrelevant", i))
873 c.Check(err, check.ErrorMatches, `.*not a directory`)
875 // newname parent dir is a file
877 fmt.Sprintf("dir%d/dir%d/file%d", i, j, j),
878 fmt.Sprintf("dir%d/file%d/patherror", i, inner-j-1))
879 c.Check(err, check.ErrorMatches, `.*not a directory`)
885 f, err := fs.OpenFile("dir1/newfile3", 0, 0)
886 c.Assert(err, check.IsNil)
887 c.Check(f.Size(), check.Equals, int64(4))
888 buf, err := ioutil.ReadAll(f)
889 c.Check(buf, check.DeepEquals, []byte("beep"))
890 c.Check(err, check.IsNil)
891 _, err = fs.Open("dir1/dir1/file1")
892 c.Check(err, check.Equals, os.ErrNotExist)
895 func (s *CollectionFSSuite) TestPersist(c *check.C) {
897 defer func() { maxBlockSize = 2 << 26 }()
900 s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
901 c.Assert(err, check.IsNil)
902 err = s.fs.Mkdir("d:r", 0755)
903 c.Assert(err, check.IsNil)
905 expect := map[string][]byte{}
907 var wg sync.WaitGroup
908 for _, name := range []string{"random 1", "random:2", "random\\3", "d:r/random4"} {
909 buf := make([]byte, 500)
913 f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
914 c.Assert(err, check.IsNil)
915 // Note: we don't close the file until after the test
916 // is done. Writes to unclosed files should persist.
922 for i := 0; i < len(buf); i += 5 {
923 _, err := f.Write(buf[i : i+5])
924 c.Assert(err, check.IsNil)
930 m, err := s.fs.MarshalManifest(".")
931 c.Check(err, check.IsNil)
934 root, err := s.fs.Open("/")
935 c.Assert(err, check.IsNil)
937 fi, err := root.Readdir(-1)
938 c.Check(err, check.IsNil)
939 c.Check(len(fi), check.Equals, 4)
941 persisted, err := (&Collection{ManifestText: m}).FileSystem(s.client, s.kc)
942 c.Assert(err, check.IsNil)
944 root, err = persisted.Open("/")
945 c.Assert(err, check.IsNil)
947 fi, err = root.Readdir(-1)
948 c.Check(err, check.IsNil)
949 c.Check(len(fi), check.Equals, 4)
951 for name, content := range expect {
952 c.Logf("read %q", name)
953 f, err := persisted.Open(name)
954 c.Assert(err, check.IsNil)
956 buf, err := ioutil.ReadAll(f)
957 c.Check(err, check.IsNil)
958 c.Check(buf, check.DeepEquals, content)
962 func (s *CollectionFSSuite) TestPersistEmptyFilesAndDirs(c *check.C) {
964 s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
965 c.Assert(err, check.IsNil)
966 for _, name := range []string{"dir", "dir/zerodir", "empty", "not empty", "not empty/empty", "zero", "zero/zero"} {
967 err = s.fs.Mkdir(name, 0755)
968 c.Assert(err, check.IsNil)
971 expect := map[string][]byte{
978 "dir/zerodir/zero": nil,
979 "zero/zero/zero": nil,
981 for name, data := range expect {
982 f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
983 c.Assert(err, check.IsNil)
985 _, err := f.Write(data)
986 c.Assert(err, check.IsNil)
991 m, err := s.fs.MarshalManifest(".")
992 c.Check(err, check.IsNil)
995 persisted, err := (&Collection{ManifestText: m}).FileSystem(s.client, s.kc)
996 c.Assert(err, check.IsNil)
998 for name, data := range expect {
999 _, err = persisted.Open("bogus-" + name)
1000 c.Check(err, check.NotNil)
1002 f, err := persisted.Open(name)
1003 c.Assert(err, check.IsNil)
1008 buf, err := ioutil.ReadAll(f)
1009 c.Check(err, check.IsNil)
1010 c.Check(buf, check.DeepEquals, data)
1013 expectDir := map[string]int{
1016 "not empty/empty": 0,
1018 for name, expectLen := range expectDir {
1019 _, err := persisted.Open(name + "/bogus")
1020 c.Check(err, check.NotNil)
1022 d, err := persisted.Open(name)
1024 c.Check(err, check.IsNil)
1025 fi, err := d.Readdir(-1)
1026 c.Check(err, check.IsNil)
1027 c.Check(fi, check.HasLen, expectLen)
1031 func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
1032 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1033 c.Assert(err, check.IsNil)
1035 f, err := fs.OpenFile("missing", os.O_WRONLY, 0)
1036 c.Check(f, check.IsNil)
1037 c.Check(err, check.ErrorMatches, `file does not exist`)
1039 f, err = fs.OpenFile("new", os.O_CREATE|os.O_RDONLY, 0)
1040 c.Assert(err, check.IsNil)
1042 n, err := f.Write([]byte{1, 2, 3})
1043 c.Check(n, check.Equals, 0)
1044 c.Check(err, check.ErrorMatches, `read-only file`)
1045 n, err = f.Read(make([]byte, 1))
1046 c.Check(n, check.Equals, 0)
1047 c.Check(err, check.Equals, io.EOF)
1048 f, err = fs.OpenFile("new", os.O_RDWR, 0)
1049 c.Assert(err, check.IsNil)
1051 _, err = f.Write([]byte{4, 5, 6})
1052 c.Check(err, check.IsNil)
1054 c.Assert(err, check.IsNil)
1055 c.Check(fi.Size(), check.Equals, int64(3))
1057 f, err = fs.OpenFile("new", os.O_TRUNC|os.O_RDWR, 0)
1058 c.Assert(err, check.IsNil)
1060 pos, err := f.Seek(0, io.SeekEnd)
1061 c.Check(pos, check.Equals, int64(0))
1062 c.Check(err, check.IsNil)
1064 c.Assert(err, check.IsNil)
1065 c.Check(fi.Size(), check.Equals, int64(0))
1068 buf := make([]byte, 64)
1069 f, err = fs.OpenFile("append", os.O_EXCL|os.O_CREATE|os.O_RDWR|os.O_APPEND, 0)
1070 c.Assert(err, check.IsNil)
1071 f.Write([]byte{1, 2, 3})
1072 f.Seek(0, io.SeekStart)
1073 n, _ = f.Read(buf[:1])
1074 c.Check(n, check.Equals, 1)
1075 c.Check(buf[:1], check.DeepEquals, []byte{1})
1076 pos, err = f.Seek(0, io.SeekCurrent)
1077 c.Assert(err, check.IsNil)
1078 c.Check(pos, check.Equals, int64(1))
1079 f.Write([]byte{4, 5, 6})
1080 pos, err = f.Seek(0, io.SeekCurrent)
1081 c.Assert(err, check.IsNil)
1082 c.Check(pos, check.Equals, int64(6))
1083 f.Seek(0, io.SeekStart)
1084 n, err = f.Read(buf)
1085 c.Check(buf[:n], check.DeepEquals, []byte{1, 2, 3, 4, 5, 6})
1086 c.Check(err, check.Equals, io.EOF)
1089 f, err = fs.OpenFile("append", os.O_RDWR|os.O_APPEND, 0)
1090 c.Assert(err, check.IsNil)
1091 pos, err = f.Seek(0, io.SeekCurrent)
1092 c.Check(pos, check.Equals, int64(0))
1093 c.Check(err, check.IsNil)
1095 pos, _ = f.Seek(0, io.SeekCurrent)
1096 c.Check(pos, check.Equals, int64(3))
1097 f.Write([]byte{7, 8, 9})
1098 pos, err = f.Seek(0, io.SeekCurrent)
1099 c.Check(err, check.IsNil)
1100 c.Check(pos, check.Equals, int64(9))
1103 f, err = fs.OpenFile("wronly", os.O_CREATE|os.O_WRONLY, 0)
1104 c.Assert(err, check.IsNil)
1105 n, err = f.Write([]byte{3, 2, 1})
1106 c.Check(n, check.Equals, 3)
1107 c.Check(err, check.IsNil)
1108 pos, _ = f.Seek(0, io.SeekCurrent)
1109 c.Check(pos, check.Equals, int64(3))
1110 pos, _ = f.Seek(0, io.SeekStart)
1111 c.Check(pos, check.Equals, int64(0))
1112 n, err = f.Read(buf)
1113 c.Check(n, check.Equals, 0)
1114 c.Check(err, check.ErrorMatches, `.*O_WRONLY.*`)
1115 f, err = fs.OpenFile("wronly", os.O_RDONLY, 0)
1116 c.Assert(err, check.IsNil)
1118 c.Check(buf[:n], check.DeepEquals, []byte{3, 2, 1})
1120 f, err = fs.OpenFile("unsupported", os.O_CREATE|os.O_SYNC, 0)
1121 c.Check(f, check.IsNil)
1122 c.Check(err, check.NotNil)
1124 f, err = fs.OpenFile("append", os.O_RDWR|os.O_WRONLY, 0)
1125 c.Check(f, check.IsNil)
1126 c.Check(err, check.ErrorMatches, `invalid flag.*`)
1129 func (s *CollectionFSSuite) TestFlushFullBlocksWritingLongFile(c *check.C) {
1130 defer func(cw, mbs int) {
1131 concurrentWriters = cw
1133 }(concurrentWriters, maxBlockSize)
1134 concurrentWriters = 2
1137 proceed := make(chan struct{})
1138 var started, concurrent int32
1140 s.kc.onWrite = func([]byte) error {
1141 atomic.AddInt32(&concurrent, 1)
1142 switch atomic.AddInt32(&started, 1) {
1144 // Wait until block 2 starts and finishes, and block 3 starts
1147 c.Check(blk2done, check.Equals, true)
1148 case <-time.After(time.Second):
1149 c.Error("timed out")
1152 time.Sleep(time.Millisecond)
1157 time.Sleep(time.Millisecond)
1159 c.Check(atomic.AddInt32(&concurrent, -1) < int32(concurrentWriters), check.Equals, true)
1163 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1164 c.Assert(err, check.IsNil)
1165 f, err := fs.OpenFile("50K", os.O_WRONLY|os.O_CREATE, 0)
1166 c.Assert(err, check.IsNil)
1169 data := make([]byte, 500)
1172 for i := 0; i < 100; i++ {
1173 n, err := f.Write(data)
1174 c.Assert(n, check.Equals, len(data))
1175 c.Assert(err, check.IsNil)
1178 currentMemExtents := func() (memExtents []int) {
1179 for idx, e := range f.(*filehandle).inode.(*filenode).segments {
1182 memExtents = append(memExtents, idx)
1187 f.(*filehandle).inode.(*filenode).waitPrune()
1188 c.Check(currentMemExtents(), check.HasLen, 1)
1190 m, err := fs.MarshalManifest(".")
1191 c.Check(m, check.Matches, `[^:]* 0:50000:50K\n`)
1192 c.Check(err, check.IsNil)
1193 c.Check(currentMemExtents(), check.HasLen, 0)
1196 // Ensure blocks get flushed to disk if a lot of data is written to
1197 // small files/directories without calling sync().
1199 // Write four 512KiB files into each of 256 top-level dirs (total
1200 // 512MiB), calling Flush() every 8 dirs. Ensure memory usage never
1201 // exceeds 24MiB (4 concurrentWriters * 2MiB + 8 unflushed dirs *
1203 func (s *CollectionFSSuite) TestFlushAll(c *check.C) {
1204 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1205 c.Assert(err, check.IsNil)
1207 s.kc.onWrite = func([]byte) error {
1208 // discard flushed data -- otherwise the stub will use
1210 time.Sleep(time.Millisecond)
1213 s.kc.blocks = map[string][]byte{}
1216 for i := 0; i < 256; i++ {
1217 buf := bytes.NewBuffer(make([]byte, 524288))
1218 fmt.Fprintf(buf, "test file in dir%d", i)
1220 dir := fmt.Sprintf("dir%d", i)
1222 for j := 0; j < 2; j++ {
1223 f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1224 c.Assert(err, check.IsNil)
1226 _, err = io.Copy(f, buf)
1227 c.Assert(err, check.IsNil)
1234 size := fs.MemorySize()
1235 if !c.Check(size <= 1<<24, check.Equals, true) {
1236 c.Logf("at dir%d fs.MemorySize()=%d", i, size)
1242 // Ensure short blocks at the end of a stream don't get flushed by
1245 // Write 67x 1MiB files to each of 8 dirs, and check that 8 full 64MiB
1246 // blocks have been flushed while 8x 3MiB is still buffered in memory.
1247 func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
1248 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1249 c.Assert(err, check.IsNil)
1252 s.kc.onWrite = func(p []byte) error {
1253 atomic.AddInt64(&flushed, int64(len(p)))
1259 megabyte := make([]byte, 1<<20)
1260 for i := int64(0); i < nDirs; i++ {
1261 dir := fmt.Sprintf("dir%d", i)
1263 for j := int64(0); j < nFiles; j++ {
1264 f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1265 c.Assert(err, check.IsNil)
1267 _, err = f.Write(megabyte)
1268 c.Assert(err, check.IsNil)
1271 inodebytes := int64((nDirs*(nFiles+1) + 1) * 64)
1272 c.Check(fs.MemorySize(), check.Equals, nDirs*nFiles*(1<<20+64)+inodebytes)
1273 c.Check(flushed, check.Equals, int64(0))
1275 waitForFlush := func(expectUnflushed, expectFlushed int64) {
1276 for deadline := time.Now().Add(5 * time.Second); fs.MemorySize() > expectUnflushed && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
1278 c.Check(fs.MemorySize(), check.Equals, expectUnflushed)
1279 c.Check(flushed, check.Equals, expectFlushed)
1282 // Nothing flushed yet
1283 waitForFlush(nDirs*nFiles*(1<<20+64)+inodebytes, 0)
1285 // Flushing a non-empty dir "/" is non-recursive and there are
1286 // no top-level files, so this has no effect
1287 fs.Flush("/", false)
1288 waitForFlush(nDirs*nFiles*(1<<20+64)+inodebytes, 0)
1290 // Flush the full block in dir0
1291 fs.Flush("dir0", false)
1292 bigloclen := int64(32 + 9 + 51 + 64) // md5 + "+" + "67xxxxxx" + "+Axxxxxx..." + 64 (see (storedSegment)memorySize)
1293 waitForFlush((nDirs*nFiles-64)*(1<<20+64)+inodebytes+bigloclen*64, 64<<20)
1295 err = fs.Flush("dir-does-not-exist", false)
1296 c.Check(err, check.NotNil)
1298 // Flush full blocks in all dirs
1300 waitForFlush(nDirs*3*(1<<20+64)+inodebytes+bigloclen*64*nDirs, nDirs*64<<20)
1302 // Flush non-full blocks, too
1304 smallloclen := int64(32 + 8 + 51 + 64) // md5 + "+" + "3xxxxxx" + "+Axxxxxx..." + 64 (see (storedSegment)memorySize)
1305 waitForFlush(inodebytes+bigloclen*64*nDirs+smallloclen*3*nDirs, nDirs*67<<20)
1308 // Even when writing lots of files/dirs from different goroutines, as
1309 // long as Flush(dir,false) is called after writing each file,
1310 // unflushed data should be limited to one full block per
1311 // concurrentWriter, plus one nearly-full block at the end of each
1313 func (s *CollectionFSSuite) TestMaxUnflushed(c *check.C) {
1315 maxUnflushed := (int64(concurrentWriters) + nDirs) << 26
1317 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1318 c.Assert(err, check.IsNil)
1320 release := make(chan struct{})
1321 timeout := make(chan struct{})
1322 time.AfterFunc(10*time.Second, func() { close(timeout) })
1323 var putCount, concurrency int64
1325 s.kc.onWrite = func(p []byte) error {
1326 defer atomic.AddInt64(&unflushed, -int64(len(p)))
1327 cur := atomic.AddInt64(&concurrency, 1)
1328 defer atomic.AddInt64(&concurrency, -1)
1329 pc := atomic.AddInt64(&putCount, 1)
1330 if pc < int64(concurrentWriters) {
1331 // Block until we reach concurrentWriters, to
1332 // make sure we're really accepting concurrent
1339 } else if pc == int64(concurrentWriters) {
1340 // Unblock the first N-1 PUT reqs.
1343 c.Assert(cur <= int64(concurrentWriters), check.Equals, true)
1344 c.Assert(atomic.LoadInt64(&unflushed) <= maxUnflushed, check.Equals, true)
1348 var owg sync.WaitGroup
1349 megabyte := make([]byte, 1<<20)
1350 for i := int64(0); i < nDirs; i++ {
1351 dir := fmt.Sprintf("dir%d", i)
1356 defer fs.Flush(dir, true)
1357 var iwg sync.WaitGroup
1359 for j := 0; j < 67; j++ {
1363 f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1364 c.Assert(err, check.IsNil)
1366 n, err := f.Write(megabyte)
1367 c.Assert(err, check.IsNil)
1368 atomic.AddInt64(&unflushed, int64(n))
1369 fs.Flush(dir, false)
1378 func (s *CollectionFSSuite) TestFlushStress(c *check.C) {
1380 defer func() { done = true }()
1381 time.AfterFunc(10*time.Second, func() {
1383 pprof.Lookup("goroutine").WriteTo(os.Stderr, 1)
1389 s.kc.onWrite = func(p []byte) error {
1391 s.kc.blocks = map[string][]byte{}
1393 defer c.Logf("wrote block %d, %d bytes", wrote, len(p))
1395 time.Sleep(20 * time.Millisecond)
1399 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1400 c.Assert(err, check.IsNil)
1402 data := make([]byte, 1<<20)
1403 for i := 0; i < 3; i++ {
1404 dir := fmt.Sprintf("dir%d", i)
1406 for j := 0; j < 200; j++ {
1408 f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1409 c.Assert(err, check.IsNil)
1410 _, err = f.Write(data)
1411 c.Assert(err, check.IsNil)
1413 fs.Flush(dir, false)
1415 _, err := fs.MarshalManifest(".")
1416 c.Check(err, check.IsNil)
1420 func (s *CollectionFSSuite) TestFlushShort(c *check.C) {
1421 s.kc.onWrite = func([]byte) error {
1423 s.kc.blocks = map[string][]byte{}
1427 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1428 c.Assert(err, check.IsNil)
1429 for _, blocksize := range []int{8, 1000000} {
1430 dir := fmt.Sprintf("dir%d", blocksize)
1431 err = fs.Mkdir(dir, 0755)
1432 c.Assert(err, check.IsNil)
1433 data := make([]byte, blocksize)
1434 for i := 0; i < 100; i++ {
1435 f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, i), os.O_WRONLY|os.O_CREATE, 0)
1436 c.Assert(err, check.IsNil)
1437 _, err = f.Write(data)
1438 c.Assert(err, check.IsNil)
1440 fs.Flush(dir, false)
1443 _, err := fs.MarshalManifest(".")
1444 c.Check(err, check.IsNil)
1448 func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
1449 for _, txt := range []string{
1453 ". d41d8cd98f00b204e9800998ecf8427e+0\n",
1454 ". d41d8cd98f00b204e9800998ecf8427e+0 \n",
1457 ". 0:0:foo 0:0:bar\n",
1458 ". d41d8cd98f00b204e9800998ecf8427e 0:0:foo\n",
1459 ". d41d8cd98f00b204e9800998ecf8427e+0 :0:0:foo\n",
1460 ". d41d8cd98f00b204e9800998ecf8427e+0 foo:0:foo\n",
1461 ". d41d8cd98f00b204e9800998ecf8427e+0 0:foo:foo\n",
1462 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:foo 1:1:bar\n",
1463 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\n",
1464 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\\057\\056\n",
1465 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:.\n",
1466 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:..\n",
1467 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:..\n",
1468 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/..\n",
1469 ". d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n",
1470 "./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n. d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n",
1473 fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc)
1474 c.Check(fs, check.IsNil)
1475 c.Logf("-> %s", err)
1476 c.Check(err, check.NotNil)
1480 func (s *CollectionFSSuite) TestEdgeCaseManifests(c *check.C) {
1481 for _, txt := range []string{
1483 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo\n",
1484 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:...\n",
1485 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:. 0:0:. 0:0:\\056 0:0:\\056\n",
1486 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/. 0:0:. 0:0:foo\\057bar\\057\\056\n",
1487 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n",
1488 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/bar\n./foo d41d8cd98f00b204e9800998ecf8427e+0 0:0:bar\n",
1491 fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc)
1492 c.Check(err, check.IsNil)
1493 c.Check(fs, check.NotNil)
1497 var fakeLocator = func() []string {
1498 locs := make([]string, 10)
1499 for i := range locs {
1500 locs[i] = fmt.Sprintf("%x+%d", md5.Sum(make([]byte, i)), i)
1502 locs[i] += "+Awhatever+Zotherhints"
1508 func (s *CollectionFSSuite) TestReplaceSegments_HappyPath(c *check.C) {
1509 fs, err := (&Collection{
1510 ManifestText: ". " + fakeLocator[1] + " " + fakeLocator[2] + " 0:3:file3\n",
1511 }).FileSystem(nil, &keepClientStub{})
1512 c.Assert(err, check.IsNil)
1513 changed, err := fs.ReplaceSegments(map[BlockSegment]BlockSegment{
1514 BlockSegment{fakeLocator[1], 0, 1}: BlockSegment{fakeLocator[3], 0, 1},
1515 BlockSegment{fakeLocator[2], 0, 2}: BlockSegment{fakeLocator[3], 1, 2},
1517 c.Check(changed, check.Equals, true)
1518 c.Check(err, check.IsNil)
1519 mtxt, err := fs.MarshalManifest(".")
1520 c.Check(err, check.IsNil)
1521 c.Check(mtxt, check.Equals, ". "+fakeLocator[3]+" 0:3:file3\n")
1524 func (s *CollectionFSSuite) TestReplaceSegments_InvalidOffset(c *check.C) {
1525 origtxt := ". " + fakeLocator[1] + " " + fakeLocator[2] + " 0:3:file3\n"
1526 fs, err := (&Collection{
1527 ManifestText: origtxt,
1528 }).FileSystem(nil, &keepClientStub{})
1529 c.Assert(err, check.IsNil)
1530 changed, err := fs.ReplaceSegments(map[BlockSegment]BlockSegment{
1531 BlockSegment{fakeLocator[1], 0, 1}: BlockSegment{fakeLocator[3], 0, 1},
1532 BlockSegment{fakeLocator[2], 0, 2}: BlockSegment{fakeLocator[3], 2, 2},
1534 c.Check(changed, check.Equals, false)
1535 c.Check(err, check.ErrorMatches, `invalid replacement: offset 2 \+ length 2 > block size 3`)
1536 mtxt, err := fs.MarshalManifest(".")
1537 c.Check(err, check.IsNil)
1538 c.Check(mtxt, check.Equals, origtxt)
1541 func (s *CollectionFSSuite) TestReplaceSegments_LengthMismatch(c *check.C) {
1542 origtxt := ". " + fakeLocator[1] + " " + fakeLocator[2] + " 0:3:file3\n"
1543 fs, err := (&Collection{
1544 ManifestText: origtxt,
1545 }).FileSystem(nil, &keepClientStub{})
1546 c.Assert(err, check.IsNil)
1547 changed, err := fs.ReplaceSegments(map[BlockSegment]BlockSegment{
1548 BlockSegment{fakeLocator[1], 0, 1}: BlockSegment{fakeLocator[3], 0, 1},
1549 BlockSegment{fakeLocator[2], 0, 2}: BlockSegment{fakeLocator[3], 0, 3},
1551 c.Check(changed, check.Equals, false)
1552 c.Check(err, check.ErrorMatches, `mismatched length: replacing segment length 2 with segment length 3`)
1553 mtxt, err := fs.MarshalManifest(".")
1554 c.Check(err, check.IsNil)
1555 c.Check(mtxt, check.Equals, origtxt)
1558 func (s *CollectionFSSuite) TestReplaceSegments_SkipUnreferenced(c *check.C) {
1559 fs, err := (&Collection{
1560 ManifestText: ". " + fakeLocator[1] + " " + fakeLocator[2] + " " + fakeLocator[3] + " 0:6:file6\n",
1561 }).FileSystem(nil, &keepClientStub{})
1562 c.Assert(err, check.IsNil)
1563 changed, err := fs.ReplaceSegments(map[BlockSegment]BlockSegment{
1564 BlockSegment{fakeLocator[1], 0, 1}: BlockSegment{fakeLocator[4], 0, 1}, // skipped because [5] unref
1565 BlockSegment{fakeLocator[2], 0, 2}: BlockSegment{fakeLocator[4], 1, 2}, // skipped because [5] unref
1566 BlockSegment{fakeLocator[5], 0, 2}: BlockSegment{fakeLocator[4], 1, 2}, // [5] unreferenced in orig manifest
1567 BlockSegment{fakeLocator[3], 0, 3}: BlockSegment{fakeLocator[6], 3, 3}, // applied
1569 c.Check(changed, check.Equals, true)
1570 c.Check(err, check.IsNil)
1571 mtxt, err := fs.MarshalManifest(".")
1572 c.Check(err, check.IsNil)
1573 c.Check(mtxt, check.Equals, ". "+fakeLocator[1]+" "+fakeLocator[2]+" "+fakeLocator[6]+" 0:3:file6 6:3:file6\n")
1576 func (s *CollectionFSSuite) TestReplaceSegments_SkipIncompleteSegment(c *check.C) {
1577 origtxt := ". " + fakeLocator[2] + " " + fakeLocator[3] + " 0:5:file5\n"
1578 fs, err := (&Collection{
1579 ManifestText: origtxt,
1580 }).FileSystem(nil, &keepClientStub{})
1581 c.Assert(err, check.IsNil)
1582 changed, err := fs.ReplaceSegments(map[BlockSegment]BlockSegment{
1583 BlockSegment{fakeLocator[2], 0, 1}: BlockSegment{fakeLocator[4], 0, 1}, // length=1 does not match the length=2 segment
1585 c.Check(changed, check.Equals, false)
1586 c.Check(err, check.IsNil)
1587 mtxt, err := fs.MarshalManifest(".")
1588 c.Check(err, check.IsNil)
1589 c.Check(mtxt, check.Equals, origtxt)
1592 func (s *CollectionFSSuite) testPlanRepack(c *check.C, manifest string, expectPlan [][]storedSegment) {
1593 fs, err := (&Collection{ManifestText: manifest}).FileSystem(nil, &keepClientStub{})
1594 c.Assert(err, check.IsNil)
1595 cfs := fs.(*collectionFileSystem)
1596 repl, err := cfs.planRepack(context.Background(), RepackOptions{}, cfs.root.(*dirnode))
1597 c.Assert(err, check.IsNil)
1599 // we always expect kc==cfs, so we fill this in instead of
1600 // requiring each test case to repeat it
1601 for _, pp := range expectPlan {
1606 c.Check(repl, check.DeepEquals, expectPlan)
1609 func (s *CollectionFSSuite) TestPlanRepack_2x32M(c *check.C) {
1611 ". aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+32000000 bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+32000000 0:64000000:file\n",
1614 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+32000000", size: 32000000, length: 32000000, offset: 0},
1615 {locator: "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+32000000", size: 32000000, length: 32000000, offset: 0},
1620 func (s *CollectionFSSuite) TestPlanRepack_3x32M(c *check.C) {
1622 ". aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+32000000 bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+32000000 cccccccccccccccccccccccccccccccc+32000000 0:96000000:file\n",
1625 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+32000000", size: 32000000, length: 32000000, offset: 0},
1626 {locator: "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+32000000", size: 32000000, length: 32000000, offset: 0},
1631 func (s *CollectionFSSuite) TestPlanRepack_3x42M(c *check.C) {
1632 // Each block is more than half full, so do nothing.
1634 ". aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+42000000 bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+42000000 cccccccccccccccccccccccccccccccc+42000000 0:126000000:file\n",
1638 func (s *CollectionFSSuite) TestPlanRepack_Premature(c *check.C) {
1639 // Repacking would reduce to one block, but it would still be
1640 // too short to be worthwhile, so do nothing.
1642 ". aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+123 bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+123 cccccccccccccccccccccccccccccccc+123 0:369:file\n",
1646 func (s *CollectionFSSuite) TestPlanRepack_4x22M_NonAdjacent(c *check.C) {
1647 // Repack the first three 22M blocks into one 66M block.
1648 // Don't touch the 44M blocks or the final 22M block.
1650 ". aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+22000000 bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+44000000 cccccccccccccccccccccccccccccccc+22000000 dddddddddddddddddddddddddddddddd+44000000 eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee+22000000 ffffffffffffffffffffffffffffffff+44000000 00000000000000000000000000000000+22000000 0:220000000:file\n",
1653 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+22000000", size: 22000000, length: 22000000, offset: 0},
1654 {locator: "cccccccccccccccccccccccccccccccc+22000000", size: 22000000, length: 22000000, offset: 0},
1655 {locator: "eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee+22000000", size: 22000000, length: 22000000, offset: 0},
1660 func (s *CollectionFSSuite) TestPlanRepack_2x22M_DuplicateBlock(c *check.C) {
1661 // Repack a+b+c, not a+b+a.
1663 ". aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+22000000 bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+22000000 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+22000000 0:66000000:file\n"+
1664 "./dir cccccccccccccccccccccccccccccccc+22000000 0:22000000:file\n",
1667 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+22000000", size: 22000000, length: 22000000, offset: 0},
1668 {locator: "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+22000000", size: 22000000, length: 22000000, offset: 0},
1669 {locator: "cccccccccccccccccccccccccccccccc+22000000", size: 22000000, length: 22000000, offset: 0},
1674 func (s *CollectionFSSuite) TestPlanRepack_2x22M_DuplicateBlock_TooShort(c *check.C) {
1675 // Repacking a+b would not meet the 32MiB threshold.
1677 ". aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+22000000 bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+1 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+22000000 0:44000001:file\n",
1681 func (s *CollectionFSSuite) TestPlanRepack_SiblingsTogether(c *check.C) {
1682 // Pack sibling files' ("a" and "b") segments together before
1683 // other subdirs ("b/b"), even though subdir "b" sorts between
1686 ". aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+15000000 cccccccccccccccccccccccccccccccc+15000000 0:15000000:a 15000000:15000000:c\n"+
1687 "./b bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+15000000 0:15000000:b\n",
1690 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+15000000", size: 15000000, length: 15000000, offset: 0},
1691 {locator: "cccccccccccccccccccccccccccccccc+15000000", size: 15000000, length: 15000000, offset: 0},
1692 {locator: "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+15000000", size: 15000000, length: 15000000, offset: 0},
1697 func (s *CollectionFSSuite) TestRepackData(c *check.C) {
1698 fs, err := (&Collection{}).FileSystem(nil, s.kc)
1699 c.Assert(err, check.IsNil)
1700 cfs := fs.(*collectionFileSystem)
1702 testBlockWritten := make(map[int]string)
1703 // testSegment(N) returns an N-byte segment of a block
1704 // containing repeated byte N%256. The segment's offset
1705 // within the block is N/1000000 (*). The block also has
1706 // N/1000000 null bytes following the segment(*).
1708 // If N=404, the block is not readable.
1710 // (*) ...unless that would result in an oversize block.
1711 testSegment := func(testSegmentNum int) storedSegment {
1712 length := testSegmentNum
1713 offset := testSegmentNum / 1000000
1714 if offset+length > maxBlockSize {
1717 size := testSegmentNum + offset
1718 if size+offset <= maxBlockSize {
1721 if _, stored := testBlockWritten[testSegmentNum]; !stored {
1722 data := make([]byte, size)
1723 for b := range data[offset : offset+length] {
1724 data[b] = byte(testSegmentNum & 0xff)
1726 resp, err := s.kc.BlockWrite(context.Background(), BlockWriteOptions{Data: data})
1727 c.Assert(err, check.IsNil)
1728 testBlockWritten[testSegmentNum] = resp.Locator
1729 if testSegmentNum == 404 {
1730 delete(s.kc.blocks, resp.Locator[:32])
1733 return storedSegment{
1735 locator: testBlockWritten[testSegmentNum],
1741 for trialIndex, trial := range []struct {
1743 // "input" here has the same shape as repackData's
1744 // [][]storedSegment argument, but uses int N has
1745 // shorthand for testSegment(N).
1747 onWrite func([]byte) error
1748 expectRepackedLen int
1749 expectErrorMatches string
1752 label: "one {3 blocks to 1} merge",
1753 input: [][]int{{1, 2, 3}},
1754 expectRepackedLen: 3,
1757 label: "two {3 blocks to 1} merges",
1758 input: [][]int{{1, 2, 3}, {4, 5, 6}},
1759 expectRepackedLen: 6,
1762 label: "merge two {3 blocks to 1} merges",
1763 input: [][]int{{1, 2, 3}, {4, 5, 6}},
1764 expectRepackedLen: 6,
1769 expectRepackedLen: 0,
1772 label: "merge 3 blocks plus a zero-length segment -- not expected to be used, but should work",
1773 input: [][]int{{1, 2, 0, 3}},
1774 expectRepackedLen: 4,
1777 label: "merge a single segment -- not expected to be used, but should work",
1778 input: [][]int{{12345}},
1779 expectRepackedLen: 1,
1782 label: "merge a single empty segment -- not expected to be used, but should work",
1783 input: [][]int{{0}},
1784 expectRepackedLen: 1,
1787 label: "merge zero segments -- not expected to be used, but should work",
1789 expectRepackedLen: 0,
1792 label: "merge same orig segment into two different replacements -- not expected to be used, but should work",
1793 input: [][]int{{1, 22, 3}, {4, 22, 6}},
1794 expectRepackedLen: 5,
1797 label: "identical merges -- not expected to be used, but should work",
1798 input: [][]int{{11, 22, 33}, {11, 22, 33}},
1799 expectRepackedLen: 3,
1802 label: "read error on first segment",
1803 input: [][]int{{404, 2, 3}},
1804 expectRepackedLen: 0,
1805 expectErrorMatches: "404 block not found",
1808 label: "read error on second segment",
1809 input: [][]int{{1, 404, 3}},
1810 expectErrorMatches: "404 block not found",
1813 label: "read error on last segment",
1814 input: [][]int{{1, 2, 404}},
1815 expectErrorMatches: "404 block not found",
1818 label: "merge does not fit in one block",
1819 input: [][]int{{50000000, 20000000}},
1820 expectErrorMatches: "combined length 70000000 would exceed maximum block size 67108864",
1823 label: "write error",
1824 input: [][]int{{1, 2, 3}},
1825 onWrite: func(p []byte) error { return errors.New("stub write error") },
1826 expectErrorMatches: "stub write error",
1829 c.Logf("trial %d: %s", trialIndex, trial.label)
1830 var input [][]storedSegment
1831 for _, seglist := range trial.input {
1832 var segments []storedSegment
1833 for _, segnum := range seglist {
1834 segments = append(segments, testSegment(segnum))
1836 input = append(input, segments)
1838 s.kc.onWrite = trial.onWrite
1839 repacked, err := cfs.repackData(context.Background(), input)
1840 if trial.expectErrorMatches != "" {
1841 c.Check(err, check.ErrorMatches, trial.expectErrorMatches)
1844 c.Assert(err, check.IsNil)
1845 c.Check(repacked, check.HasLen, trial.expectRepackedLen)
1846 for _, origSegments := range input {
1848 for _, origSegment := range origSegments {
1849 origBlock := BlockSegment{
1850 Locator: stripAllHints(origSegment.locator),
1851 Length: origSegment.length,
1852 Offset: origSegment.offset,
1854 buf := make([]byte, origSegment.size)
1855 n, err := cfs.ReadAt(repacked[origBlock].Locator, buf, repacked[origBlock].Offset)
1856 c.Assert(err, check.IsNil)
1857 c.Check(n, check.Equals, len(buf))
1858 expectContent := byte(origSegment.length & 0xff)
1859 for segoffset, b := range buf {
1860 if b != expectContent {
1861 c.Errorf("content mismatch: origSegment.locator %s -> replLocator %s offset %d: byte %d is %d, expected %d", origSegment.locator, replLocator, repacked[origBlock].Offset, segoffset, b, expectContent)
1870 func (s *CollectionFSSuite) TestRepackCost_SourceTree(c *check.C) {
1874 func (s *CollectionFSSuite) testRepackCost(c *check.C) {
1875 s.kc.blocks = make(map[string][]byte)
1876 testfs, err := (&Collection{}).FileSystem(nil, s.kc)
1877 c.Assert(err, check.IsNil)
1878 cfs := testfs.(*collectionFileSystem)
1879 gitdir, err := filepath.Abs("../../..")
1880 c.Assert(err, check.IsNil)
1881 infs := os.DirFS(gitdir)
1882 buf, err := exec.Command("git", "-C", gitdir, "ls-files").CombinedOutput()
1883 c.Assert(err, check.IsNil, check.Commentf("%s", buf))
1884 dirsCreated := make(map[string]bool)
1887 bytesWritten := func() (n int) {
1890 for _, data := range s.kc.blocks {
1895 blocksInManifest := func() int {
1896 blocks := make(map[string]bool)
1897 cfs.fileSystem.root.(*dirnode).walkSegments(func(s segment) segment {
1898 blocks[s.(storedSegment).blockSegment().StripAllHints().Locator] = true
1903 tRepackNoop := time.Duration(0)
1905 tRepackTotal := time.Duration(0)
1907 for _, path := range bytes.Split(buf, []byte("\n")) {
1908 path := string(path)
1910 strings.HasPrefix(path, "tools/arvbox/lib/arvbox/docker/service") &&
1911 strings.HasSuffix(path, "/run") {
1915 fi, err := fs.Stat(infs, path)
1916 c.Assert(err, check.IsNil)
1917 if fi.IsDir() || fi.Mode()&os.ModeSymlink != 0 {
1920 for i, c := range path {
1921 if c == '/' && !dirsCreated[path[:i]] {
1922 testfs.Mkdir(path[:i], 0700)
1923 dirsCreated[path[:i]] = true
1926 data, err := fs.ReadFile(infs, path)
1927 c.Assert(err, check.IsNil)
1928 f, err := testfs.OpenFile(path, os.O_CREATE|os.O_WRONLY, 0700)
1929 c.Assert(err, check.IsNil)
1930 _, err = f.Write(data)
1931 c.Assert(err, check.IsNil)
1933 c.Assert(err, check.IsNil)
1934 bytesContent += len(data)
1936 err = cfs.Flush("", true)
1937 c.Assert(err, check.IsNil)
1939 _, err = cfs.Repack(context.Background(), RepackOptions{})
1940 tRepack := time.Since(t0)
1941 tRepackTotal += tRepack
1943 c.Assert(err, check.IsNil)
1945 if bw := bytesWritten(); bytesRewritten < bw-bytesContent {
1946 c.Logf("bytesContent %d bytesWritten %d bytesRewritten %d blocksInManifest %d -- just rewrote %d in %v", bytesContent, bw, bytesRewritten, blocksInManifest(), bw-bytesContent-bytesRewritten, tRepack)
1947 bytesRewritten = bw - bytesContent
1948 if bytesRewritten/16 > bytesContent {
1949 // Rewriting data >16x on average
1950 // means something is terribly wrong
1951 // -- give up now instead of going
1956 tRepackNoop += tRepack
1960 c.Assert(err, check.IsNil)
1961 c.Logf("bytesContent %d bytesWritten %d bytesRewritten %d blocksInManifest %d", bytesContent, bytesWritten(), bytesRewritten, blocksInManifest())
1962 c.Logf("spent %v on %d Repack calls, average %v per call", tRepackTotal, nRepackTotal, tRepackTotal/time.Duration(nRepackTotal))
1963 c.Logf("spent %v on %d Repack calls that had no effect, average %v per call", tRepackNoop, nRepackNoop, tRepackNoop/time.Duration(nRepackNoop))
1966 func (s *CollectionFSSuite) TestSnapshotSplice(c *check.C) {
1967 filedata1 := "hello snapshot+splice world\n"
1968 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1969 c.Assert(err, check.IsNil)
1971 f, err := fs.OpenFile("file1", os.O_CREATE|os.O_RDWR, 0700)
1972 c.Assert(err, check.IsNil)
1973 _, err = f.Write([]byte(filedata1))
1974 c.Assert(err, check.IsNil)
1976 c.Assert(err, check.IsNil)
1979 snap, err := Snapshot(fs, "/")
1980 c.Assert(err, check.IsNil)
1981 err = Splice(fs, "dir1", snap)
1982 c.Assert(err, check.IsNil)
1983 f, err := fs.Open("dir1/file1")
1984 c.Assert(err, check.IsNil)
1985 buf, err := io.ReadAll(f)
1986 c.Assert(err, check.IsNil)
1987 c.Check(string(buf), check.Equals, filedata1)
1990 func (s *CollectionFSSuite) TestRefreshSignatures(c *check.C) {
1991 filedata1 := "hello refresh signatures world\n"
1992 fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1993 c.Assert(err, check.IsNil)
1994 fs.Mkdir("d1", 0700)
1995 f, err := fs.OpenFile("d1/file1", os.O_CREATE|os.O_RDWR, 0700)
1996 c.Assert(err, check.IsNil)
1997 _, err = f.Write([]byte(filedata1))
1998 c.Assert(err, check.IsNil)
2000 c.Assert(err, check.IsNil)
2002 filedata2 := "hello refresh signatures universe\n"
2003 fs.Mkdir("d2", 0700)
2004 f, err = fs.OpenFile("d2/file2", os.O_CREATE|os.O_RDWR, 0700)
2005 c.Assert(err, check.IsNil)
2006 _, err = f.Write([]byte(filedata2))
2007 c.Assert(err, check.IsNil)
2009 c.Assert(err, check.IsNil)
2010 txt, err := fs.MarshalManifest(".")
2011 c.Assert(err, check.IsNil)
2012 var saved Collection
2013 err = s.client.RequestAndDecode(&saved, "POST", "arvados/v1/collections", nil, map[string]interface{}{
2014 "select": []string{"manifest_text", "uuid", "portable_data_hash"},
2015 "collection": map[string]interface{}{
2016 "manifest_text": txt,
2019 c.Assert(err, check.IsNil)
2021 // Update signatures synchronously if they are already expired
2022 // when Read() is called.
2024 saved.ManifestText = SignManifest(saved.ManifestText, s.kc.authToken, time.Now().Add(-2*time.Second), s.kc.sigttl, []byte(s.kc.sigkey))
2025 fs, err := saved.FileSystem(s.client, s.kc)
2026 c.Assert(err, check.IsNil)
2027 f, err := fs.OpenFile("d1/file1", os.O_RDONLY, 0)
2028 c.Assert(err, check.IsNil)
2029 buf, err := ioutil.ReadAll(f)
2030 c.Check(err, check.IsNil)
2031 c.Check(string(buf), check.Equals, filedata1)
2034 // Update signatures asynchronously if we're more than half
2035 // way to TTL when Read() is called.
2037 exp := time.Now().Add(2 * time.Minute)
2038 saved.ManifestText = SignManifest(saved.ManifestText, s.kc.authToken, exp, s.kc.sigttl, []byte(s.kc.sigkey))
2039 fs, err := saved.FileSystem(s.client, s.kc)
2040 c.Assert(err, check.IsNil)
2041 f1, err := fs.OpenFile("d1/file1", os.O_RDONLY, 0)
2042 c.Assert(err, check.IsNil)
2043 f2, err := fs.OpenFile("d2/file2", os.O_RDONLY, 0)
2044 c.Assert(err, check.IsNil)
2045 buf, err := ioutil.ReadAll(f1)
2046 c.Check(err, check.IsNil)
2047 c.Check(string(buf), check.Equals, filedata1)
2049 // Ensure fs treats the 2-minute TTL as less than half
2050 // the server's signing TTL. If we don't do this,
2051 // collectionfs will guess the signature is fresh,
2052 // i.e., signing TTL is 2 minutes, and won't do an
2054 fs.(*collectionFileSystem).guessSignatureTTL = time.Hour
2057 for deadline := time.Now().Add(time.Second * 10); time.Now().Before(deadline) && !refreshed; time.Sleep(time.Second / 10) {
2058 _, err = f1.Seek(0, io.SeekStart)
2059 c.Assert(err, check.IsNil)
2060 buf, err = ioutil.ReadAll(f1)
2061 c.Assert(err, check.IsNil)
2062 c.Assert(string(buf), check.Equals, filedata1)
2063 loc := s.kc.reads[len(s.kc.reads)-1]
2064 t, err := signatureExpiryTime(loc)
2065 c.Assert(err, check.IsNil)
2066 c.Logf("last read block %s had signature expiry time %v", loc, t)
2067 if t.Sub(time.Now()) > time.Hour {
2071 c.Check(refreshed, check.Equals, true)
2073 // Second locator should have been updated at the same
2075 buf, err = ioutil.ReadAll(f2)
2076 c.Assert(err, check.IsNil)
2077 c.Assert(string(buf), check.Equals, filedata2)
2078 loc := s.kc.reads[len(s.kc.reads)-1]
2079 c.Check(loc, check.Not(check.Equals), s.kc.reads[len(s.kc.reads)-2])
2080 t, err := signatureExpiryTime(s.kc.reads[len(s.kc.reads)-1])
2081 c.Assert(err, check.IsNil)
2082 c.Logf("last read block %s had signature expiry time %v", loc, t)
2083 c.Check(t.Sub(time.Now()) > time.Hour, check.Equals, true)
2087 var bigmanifest = func() string {
2088 var buf bytes.Buffer
2089 for i := 0; i < 2000; i++ {
2090 fmt.Fprintf(&buf, "./dir%d", i)
2091 for i := 0; i < 100; i++ {
2092 fmt.Fprintf(&buf, " d41d8cd98f00b204e9800998ecf8427e+99999")
2094 for i := 0; i < 2000; i++ {
2095 fmt.Fprintf(&buf, " 1200000:300000:file%d", i)
2097 fmt.Fprintf(&buf, "\n")
2102 func (s *CollectionFSSuite) BenchmarkParseManifest(c *check.C) {
2103 DebugLocksPanicMode = false
2104 c.Logf("test manifest is %d bytes", len(bigmanifest))
2105 for i := 0; i < c.N; i++ {
2106 fs, err := (&Collection{ManifestText: bigmanifest}).FileSystem(s.client, s.kc)
2107 c.Check(err, check.IsNil)
2108 c.Check(fs, check.NotNil)
2112 func (s *CollectionFSSuite) checkMemSize(c *check.C, f File) {
2113 fn := f.(*filehandle).inode.(*filenode)
2115 for _, seg := range fn.segments {
2116 if e, ok := seg.(*memSegment); ok {
2117 memsize += int64(len(e.buf))
2120 c.Check(fn.memsize, check.Equals, memsize)
2123 type CollectionFSUnitSuite struct{}
2125 var _ = check.Suite(&CollectionFSUnitSuite{})
2127 // expect ~2 seconds to load a manifest with 256K files
2128 func (s *CollectionFSUnitSuite) TestLargeManifest_ManyFiles(c *check.C) {
2129 if testing.Short() {
2132 s.testLargeManifest(c, 512, 512, 1, 0)
2135 func (s *CollectionFSUnitSuite) TestLargeManifest_LargeFiles(c *check.C) {
2136 if testing.Short() {
2139 s.testLargeManifest(c, 1, 800, 1000, 0)
2142 func (s *CollectionFSUnitSuite) TestLargeManifest_InterleavedFiles(c *check.C) {
2143 if testing.Short() {
2146 // Timing figures here are from a dev host, (0)->(1)->(2)->(3)
2147 // (0) no optimizations (main branch commit ea697fb1e8)
2148 // (1) resolve streampos->blkidx with binary search
2149 // (2) ...and rewrite PortableDataHash() without regexp
2150 // (3) ...and use fnodeCache in loadManifest
2151 s.testLargeManifest(c, 1, 800, 100, 4<<20) // 127s -> 12s -> 2.5s -> 1.5s
2152 s.testLargeManifest(c, 1, 50, 1000, 4<<20) // 44s -> 10s -> 1.5s -> 0.8s
2153 s.testLargeManifest(c, 1, 200, 100, 4<<20) // 13s -> 4s -> 0.6s -> 0.3s
2154 s.testLargeManifest(c, 1, 200, 150, 4<<20) // 26s -> 4s -> 1s -> 0.5s
2155 s.testLargeManifest(c, 1, 200, 200, 4<<20) // 38s -> 6s -> 1.3s -> 0.7s
2156 s.testLargeManifest(c, 1, 200, 225, 4<<20) // 46s -> 7s -> 1.5s -> 1s
2157 s.testLargeManifest(c, 1, 400, 400, 4<<20) // 477s -> 24s -> 5s -> 3s
2158 // s.testLargeManifest(c, 1, 800, 1000, 4<<20) // timeout -> 186s -> 28s -> 17s
2161 func (s *CollectionFSUnitSuite) testLargeManifest(c *check.C, dirCount, filesPerDir, blocksPerFile, interleaveChunk int) {
2163 const blksize = 1 << 26
2164 c.Logf("%s building manifest with dirCount=%d filesPerDir=%d blocksPerFile=%d", time.Now(), dirCount, filesPerDir, blocksPerFile)
2165 mb := bytes.NewBuffer(make([]byte, 0, 40000000))
2167 for i := 0; i < dirCount; i++ {
2168 fmt.Fprintf(mb, "./dir%d", i)
2169 for j := 0; j < filesPerDir; j++ {
2170 for k := 0; k < blocksPerFile; k++ {
2172 fmt.Fprintf(mb, " %032x+%d+A%040x@%08x", blkid, blksize, blkid, blkid)
2175 for j := 0; j < filesPerDir; j++ {
2176 if interleaveChunk == 0 {
2177 fmt.Fprintf(mb, " %d:%d:dir%d/file%d", (filesPerDir-j-1)*blocksPerFile*blksize, blocksPerFile*blksize, j, j)
2180 for todo := int64(blocksPerFile) * int64(blksize); todo > 0; todo -= int64(interleaveChunk) {
2181 size := int64(interleaveChunk)
2185 offset := rand.Int63n(int64(blocksPerFile)*int64(blksize)*int64(filesPerDir) - size)
2186 fmt.Fprintf(mb, " %d:%d:dir%d/file%d", offset, size, j, j)
2189 mb.Write([]byte{'\n'})
2191 coll := Collection{ManifestText: mb.String()}
2192 c.Logf("%s built manifest size=%d", time.Now(), mb.Len())
2194 var memstats runtime.MemStats
2195 runtime.ReadMemStats(&memstats)
2196 c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys)
2198 f, err := coll.FileSystem(NewClientFromEnv(), &keepClientStub{})
2199 c.Check(err, check.IsNil)
2200 c.Logf("%s loaded", time.Now())
2201 c.Check(f.Size(), check.Equals, int64(dirCount*filesPerDir*blocksPerFile*blksize))
2203 // Stat() and OpenFile() each file. This mimics the behavior
2204 // of webdav propfind, which opens each file even when just
2205 // listing directory entries.
2206 for i := 0; i < dirCount; i++ {
2207 for j := 0; j < filesPerDir; j++ {
2208 fnm := fmt.Sprintf("./dir%d/dir%d/file%d", i, j, j)
2209 fi, err := f.Stat(fnm)
2210 c.Assert(err, check.IsNil)
2211 c.Check(fi.IsDir(), check.Equals, false)
2212 f, err := f.OpenFile(fnm, os.O_RDONLY, 0)
2213 c.Assert(err, check.IsNil)
2217 c.Logf("%s OpenFile() x %d", time.Now(), dirCount*filesPerDir)
2219 runtime.ReadMemStats(&memstats)
2220 c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys)
2221 c.Logf("%s MemorySize=%d", time.Now(), f.MemorySize())
2222 c.Logf("%s ... test duration %s", time.Now(), time.Now().Sub(t0))
2225 // Gocheck boilerplate
2226 func Test(t *testing.T) {