Merge branch '21535-multi-wf-delete'
[arvados.git] / sdk / go / arvados / keep_cache.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "errors"
12         "fmt"
13         "io"
14         "io/fs"
15         "os"
16         "path/filepath"
17         "sort"
18         "strconv"
19         "strings"
20         "sync"
21         "sync/atomic"
22         "syscall"
23         "time"
24
25         "github.com/sirupsen/logrus"
26         "golang.org/x/sys/unix"
27 )
28
29 type KeepGateway interface {
30         ReadAt(locator string, dst []byte, offset int) (int, error)
31         BlockRead(ctx context.Context, opts BlockReadOptions) (int, error)
32         BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error)
33         LocalLocator(locator string) (string, error)
34 }
35
36 // DiskCache wraps KeepGateway, adding a disk-based cache layer.
37 //
38 // A DiskCache is automatically incorporated into the backend stack of
39 // each keepclient.KeepClient. Most programs do not need to use
40 // DiskCache directly.
41 type DiskCache struct {
42         KeepGateway
43         Dir     string
44         MaxSize ByteSizeOrPercent
45         Logger  logrus.FieldLogger
46
47         *sharedCache
48         setupOnce sync.Once
49 }
50
51 var (
52         sharedCachesLock sync.Mutex
53         sharedCaches     = map[string]*sharedCache{}
54 )
55
56 // sharedCache has fields that coordinate the cache usage in a single
57 // cache directory; it can be shared by multiple DiskCaches.
58 //
59 // This serves to share a single pool of held-open filehandles, a
60 // single tidying goroutine, etc., even when the program (like
61 // keep-web) uses multiple KeepGateway stacks that use different auth
62 // tokens, etc.
63 type sharedCache struct {
64         dir     string
65         maxSize ByteSizeOrPercent
66
67         tidying        int32 // see tidy()
68         defaultMaxSize int64
69
70         // The "heldopen" fields are used to open cache files for
71         // reading, and leave them open for future/concurrent ReadAt
72         // operations. See quickReadAt.
73         heldopen     map[string]*openFileEnt
74         heldopenMax  int
75         heldopenLock sync.Mutex
76
77         // The "writing" fields allow multiple concurrent/sequential
78         // ReadAt calls to be notified as a single
79         // read-block-from-backend-into-cache goroutine fills the
80         // cache file.
81         writing     map[string]*writeprogress
82         writingCond *sync.Cond
83         writingLock sync.Mutex
84
85         sizeMeasured    int64 // actual size on disk after last tidy(); zero if not measured yet
86         sizeEstimated   int64 // last measured size, plus files we have written since
87         lastFileCount   int64 // number of files on disk at last count
88         writesSinceTidy int64 // number of files written since last tidy()
89 }
90
91 type writeprogress struct {
92         cond    *sync.Cond     // broadcast whenever size or done changes
93         done    bool           // size and err have their final values
94         size    int            // bytes copied into cache file so far
95         err     error          // error encountered while copying from backend to cache
96         sharedf *os.File       // readable filehandle, usable if done && err==nil
97         readers sync.WaitGroup // goroutines that haven't finished reading from f yet
98 }
99
100 type openFileEnt struct {
101         sync.RWMutex
102         f   *os.File
103         err error // if err is non-nil, f should not be used.
104 }
105
106 const (
107         cacheFileSuffix = ".keepcacheblock"
108         tmpFileSuffix   = ".tmp"
109 )
110
111 func (cache *DiskCache) setup() {
112         sharedCachesLock.Lock()
113         defer sharedCachesLock.Unlock()
114         dir := cache.Dir
115         if sharedCaches[dir] == nil {
116                 cache.debugf("initializing sharedCache using %s with max size %d", dir, cache.MaxSize)
117                 sharedCaches[dir] = &sharedCache{dir: dir, maxSize: cache.MaxSize}
118         } else {
119                 cache.debugf("using existing sharedCache using %s with max size %d (would have initialized with %d)", dir, sharedCaches[dir].maxSize, cache.MaxSize)
120         }
121         cache.sharedCache = sharedCaches[dir]
122 }
123
124 func (cache *DiskCache) cacheFile(locator string) string {
125         hash := locator
126         if i := strings.Index(hash, "+"); i > 0 {
127                 hash = hash[:i]
128         }
129         return filepath.Join(cache.dir, hash[:3], hash+cacheFileSuffix)
130 }
131
132 // Open a cache file, creating the parent dir if necessary.
133 func (cache *DiskCache) openFile(name string, flags int) (*os.File, error) {
134         f, err := os.OpenFile(name, flags, 0600)
135         if os.IsNotExist(err) {
136                 // Create the parent dir and try again. (We could have
137                 // checked/created the parent dir before, but that
138                 // would be less efficient in the much more common
139                 // situation where it already exists.)
140                 parent, _ := filepath.Split(name)
141                 os.Mkdir(parent, 0700)
142                 f, err = os.OpenFile(name, flags, 0600)
143         }
144         return f, err
145 }
146
147 // Rename a file, creating the new path's parent dir if necessary.
148 func (cache *DiskCache) rename(old, new string) error {
149         if nil == os.Rename(old, new) {
150                 return nil
151         }
152         parent, _ := filepath.Split(new)
153         os.Mkdir(parent, 0700)
154         return os.Rename(old, new)
155 }
156
157 func (cache *DiskCache) debugf(format string, args ...interface{}) {
158         logger := cache.Logger
159         if logger == nil {
160                 return
161         }
162         logger.Debugf(format, args...)
163 }
164
165 // BlockWrite writes through to the wrapped KeepGateway, and (if
166 // possible) retains a copy of the written block in the cache.
167 func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
168         cache.setupOnce.Do(cache.setup)
169         unique := fmt.Sprintf("%x.%p%s", os.Getpid(), &opts, tmpFileSuffix)
170         tmpfilename := filepath.Join(cache.dir, "tmp", unique)
171         tmpfile, err := cache.openFile(tmpfilename, os.O_CREATE|os.O_EXCL|os.O_RDWR)
172         if err != nil {
173                 cache.debugf("BlockWrite: open(%s) failed: %s", tmpfilename, err)
174                 return cache.KeepGateway.BlockWrite(ctx, opts)
175         }
176
177         ctx, cancel := context.WithCancel(ctx)
178         defer cancel()
179         copyerr := make(chan error, 1)
180
181         // Start a goroutine to copy the caller's source data to
182         // tmpfile, a hash checker, and (via pipe) the wrapped
183         // KeepGateway.
184         pipereader, pipewriter := io.Pipe()
185         defer pipereader.Close()
186         go func() {
187                 // Note this is a double-close (which is a no-op) in
188                 // the happy path.
189                 defer tmpfile.Close()
190                 // Note this is a no-op in the happy path (the
191                 // uniquely named tmpfilename will have been renamed).
192                 defer os.Remove(tmpfilename)
193                 defer pipewriter.Close()
194
195                 // Copy from opts.Data or opts.Reader, depending on
196                 // which was provided.
197                 var src io.Reader
198                 if opts.Data != nil {
199                         src = bytes.NewReader(opts.Data)
200                 } else {
201                         src = opts.Reader
202                 }
203
204                 hashcheck := md5.New()
205                 n, err := io.Copy(io.MultiWriter(tmpfile, pipewriter, hashcheck), src)
206                 if err != nil {
207                         copyerr <- err
208                         cancel()
209                         return
210                 } else if opts.DataSize > 0 && opts.DataSize != int(n) {
211                         copyerr <- fmt.Errorf("block size %d did not match provided size %d", n, opts.DataSize)
212                         cancel()
213                         return
214                 }
215                 err = tmpfile.Close()
216                 if err != nil {
217                         // Don't rename tmpfile into place, but allow
218                         // the BlockWrite call to succeed if nothing
219                         // else goes wrong.
220                         return
221                 }
222                 hash := fmt.Sprintf("%x", hashcheck.Sum(nil))
223                 if opts.Hash != "" && opts.Hash != hash {
224                         // Even if the wrapped KeepGateway doesn't
225                         // notice a problem, this should count as an
226                         // error.
227                         copyerr <- fmt.Errorf("block hash %s did not match provided hash %s", hash, opts.Hash)
228                         cancel()
229                         return
230                 }
231                 cachefilename := cache.cacheFile(hash)
232                 err = cache.rename(tmpfilename, cachefilename)
233                 if err != nil {
234                         cache.debugf("BlockWrite: rename(%s, %s) failed: %s", tmpfilename, cachefilename, err)
235                 }
236                 atomic.AddInt64(&cache.sizeEstimated, int64(n))
237                 cache.gotidy()
238         }()
239
240         // Write through to the wrapped KeepGateway from the pipe,
241         // instead of the original reader.
242         newopts := opts
243         if newopts.DataSize == 0 {
244                 newopts.DataSize = len(newopts.Data)
245         }
246         newopts.Reader = pipereader
247         newopts.Data = nil
248
249         resp, err := cache.KeepGateway.BlockWrite(ctx, newopts)
250         if len(copyerr) > 0 {
251                 // If the copy-to-pipe goroutine failed, that error
252                 // will be more helpful than the resulting "context
253                 // canceled" or "read [from pipereader] failed" error
254                 // seen by the wrapped KeepGateway.
255                 //
256                 // If the wrapped KeepGateway encounters an error
257                 // before all the data is copied into the pipe, it
258                 // stops reading from the pipe, which causes the
259                 // io.Copy() in the goroutine to block until our
260                 // deferred pipereader.Close() call runs. In that case
261                 // len(copyerr)==0 here, so the wrapped KeepGateway
262                 // error is the one we return to our caller.
263                 err = <-copyerr
264         }
265         return resp, err
266 }
267
268 type funcwriter func([]byte) (int, error)
269
270 func (fw funcwriter) Write(p []byte) (int, error) {
271         return fw(p)
272 }
273
274 // ReadAt reads the entire block from the wrapped KeepGateway into the
275 // cache if needed, and copies the requested portion into the provided
276 // slice.
277 //
278 // ReadAt returns as soon as the requested portion is available in the
279 // cache. The remainder of the block may continue to be copied into
280 // the cache in the background.
281 func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) {
282         cache.setupOnce.Do(cache.setup)
283         cachefilename := cache.cacheFile(locator)
284         if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil {
285                 return n, nil
286         }
287
288         cache.writingLock.Lock()
289         progress := cache.writing[cachefilename]
290         if progress == nil {
291                 // Nobody else is fetching from backend, so we'll add
292                 // a new entry to cache.writing, fetch in a separate
293                 // goroutine.
294                 progress = &writeprogress{}
295                 progress.cond = sync.NewCond(&sync.Mutex{})
296                 if cache.writing == nil {
297                         cache.writing = map[string]*writeprogress{}
298                 }
299                 cache.writing[cachefilename] = progress
300
301                 // Start a goroutine to copy from backend to f. As
302                 // data arrives, wake up any waiting loops (see below)
303                 // so ReadAt() requests for partial data can return as
304                 // soon as the relevant bytes have been copied.
305                 go func() {
306                         var size int
307                         var err error
308                         defer func() {
309                                 if err == nil && progress.sharedf != nil {
310                                         err = progress.sharedf.Sync()
311                                 }
312                                 progress.cond.L.Lock()
313                                 progress.err = err
314                                 progress.done = true
315                                 progress.size = size
316                                 progress.cond.L.Unlock()
317                                 progress.cond.Broadcast()
318                                 cache.writingLock.Lock()
319                                 delete(cache.writing, cachefilename)
320                                 cache.writingLock.Unlock()
321
322                                 // Wait for other goroutines to wake
323                                 // up, notice we're done, and use our
324                                 // sharedf to read their data, before
325                                 // we close sharedf.
326                                 //
327                                 // Nobody can join the WaitGroup after
328                                 // the progress entry is deleted from
329                                 // cache.writing above. Therefore,
330                                 // this Wait ensures nobody else is
331                                 // accessing progress, and we don't
332                                 // need to lock anything.
333                                 progress.readers.Wait()
334                                 progress.sharedf.Close()
335                         }()
336                         progress.sharedf, err = cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
337                         if err != nil {
338                                 err = fmt.Errorf("ReadAt: %w", err)
339                                 return
340                         }
341                         err = syscall.Flock(int(progress.sharedf.Fd()), syscall.LOCK_SH)
342                         if err != nil {
343                                 err = fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
344                                 return
345                         }
346                         size, err = cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{
347                                 Locator: locator,
348                                 WriteTo: funcwriter(func(p []byte) (int, error) {
349                                         n, err := progress.sharedf.Write(p)
350                                         if n > 0 {
351                                                 progress.cond.L.Lock()
352                                                 progress.size += n
353                                                 progress.cond.L.Unlock()
354                                                 progress.cond.Broadcast()
355                                         }
356                                         return n, err
357                                 })})
358                         atomic.AddInt64(&cache.sizeEstimated, int64(size))
359                         cache.gotidy()
360                 }()
361         }
362         // We add ourselves to the readers WaitGroup so the
363         // fetch-from-backend goroutine doesn't close the shared
364         // filehandle before we read the data we need from it.
365         progress.readers.Add(1)
366         defer progress.readers.Done()
367         cache.writingLock.Unlock()
368
369         progress.cond.L.Lock()
370         for !progress.done && progress.size < len(dst)+offset {
371                 progress.cond.Wait()
372         }
373         sharedf := progress.sharedf
374         err := progress.err
375         progress.cond.L.Unlock()
376
377         if err != nil {
378                 // If the copy-from-backend goroutine encountered an
379                 // error, we return that error. (Even if we read the
380                 // desired number of bytes, the error might be
381                 // something like BadChecksum so we should not ignore
382                 // it.)
383                 return 0, err
384         }
385         if len(dst) == 0 {
386                 // It's possible that sharedf==nil here (the writer
387                 // goroutine might not have done anything at all yet)
388                 // and we don't need it anyway because no bytes are
389                 // being read. Reading zero bytes seems pointless, but
390                 // if someone does it, we might as well return
391                 // suitable values, rather than risk a crash by
392                 // calling sharedf.ReadAt() when sharedf is nil.
393                 return 0, nil
394         }
395         return sharedf.ReadAt(dst, int64(offset))
396 }
397
398 var quickReadAtLostRace = errors.New("quickReadAt: lost race")
399
400 // Remove the cache entry for the indicated cachefilename if it
401 // matches expect (quickReadAt() usage), or if expect is nil (tidy()
402 // usage).
403 //
404 // If expect is non-nil, close expect's filehandle.
405 //
406 // If expect is nil and a different cache entry is deleted, close its
407 // filehandle.
408 func (cache *DiskCache) deleteHeldopen(cachefilename string, expect *openFileEnt) {
409         needclose := expect
410
411         cache.heldopenLock.Lock()
412         found := cache.heldopen[cachefilename]
413         if found != nil && (expect == nil || expect == found) {
414                 delete(cache.heldopen, cachefilename)
415                 needclose = found
416         }
417         cache.heldopenLock.Unlock()
418
419         if needclose != nil {
420                 needclose.Lock()
421                 defer needclose.Unlock()
422                 if needclose.f != nil {
423                         needclose.f.Close()
424                         needclose.f = nil
425                 }
426         }
427 }
428
429 // quickReadAt attempts to use a cached-filehandle approach to read
430 // from the indicated file. The expectation is that the caller
431 // (ReadAt) will try a more robust approach when this fails, so
432 // quickReadAt doesn't try especially hard to ensure success in
433 // races. In particular, when there are concurrent calls, and one
434 // fails, that can cause others to fail too.
435 func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int) (int, error) {
436         isnew := false
437         cache.heldopenLock.Lock()
438         if cache.heldopenMax == 0 {
439                 // Choose a reasonable limit on open cache files based
440                 // on RLIMIT_NOFILE. Note Go automatically raises
441                 // softlimit to hardlimit, so it's typically 1048576,
442                 // not 1024.
443                 lim := syscall.Rlimit{}
444                 err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &lim)
445                 if err != nil {
446                         cache.heldopenMax = 100
447                 } else if lim.Cur > 400000 {
448                         cache.heldopenMax = 10000
449                 } else {
450                         cache.heldopenMax = int(lim.Cur / 40)
451                 }
452         }
453         heldopen := cache.heldopen[cachefilename]
454         if heldopen == nil {
455                 isnew = true
456                 heldopen = &openFileEnt{}
457                 if cache.heldopen == nil {
458                         cache.heldopen = make(map[string]*openFileEnt, cache.heldopenMax)
459                 } else if len(cache.heldopen) > cache.heldopenMax {
460                         // Rather than go to the trouble of tracking
461                         // last access time, just close all files, and
462                         // open again as needed. Even in the worst
463                         // pathological case, this causes one extra
464                         // open+close per read, which is not
465                         // especially bad (see benchmarks).
466                         go func(m map[string]*openFileEnt) {
467                                 for _, heldopen := range m {
468                                         heldopen.Lock()
469                                         defer heldopen.Unlock()
470                                         if heldopen.f != nil {
471                                                 heldopen.f.Close()
472                                                 heldopen.f = nil
473                                         }
474                                 }
475                         }(cache.heldopen)
476                         cache.heldopen = nil
477                 }
478                 cache.heldopen[cachefilename] = heldopen
479                 heldopen.Lock()
480         }
481         cache.heldopenLock.Unlock()
482
483         if isnew {
484                 // Open and flock the file, save the filehandle (or
485                 // error) in heldopen.f, and release the write lock so
486                 // other goroutines waiting at heldopen.RLock() below
487                 // can use the shared filehandle (or shared error).
488                 f, err := os.Open(cachefilename)
489                 if err == nil {
490                         err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
491                         if err == nil {
492                                 heldopen.f = f
493                         } else {
494                                 f.Close()
495                         }
496                 }
497                 if err != nil {
498                         heldopen.err = err
499                         go cache.deleteHeldopen(cachefilename, heldopen)
500                 }
501                 heldopen.Unlock()
502         }
503         // Acquire read lock to ensure (1) initialization is complete,
504         // if it's done by a different goroutine, and (2) any "delete
505         // old/unused entries" waits for our read to finish before
506         // closing the file.
507         heldopen.RLock()
508         defer heldopen.RUnlock()
509         if heldopen.err != nil {
510                 // Other goroutine encountered an error during setup
511                 return 0, heldopen.err
512         } else if heldopen.f == nil {
513                 // Other goroutine closed the file before we got RLock
514                 return 0, quickReadAtLostRace
515         }
516
517         // If another goroutine is currently writing the file, wait
518         // for it to catch up to the end of the range we need.
519         cache.writingLock.Lock()
520         progress := cache.writing[cachefilename]
521         cache.writingLock.Unlock()
522         if progress != nil {
523                 progress.cond.L.Lock()
524                 for !progress.done && progress.size < len(dst)+offset {
525                         progress.cond.Wait()
526                 }
527                 progress.cond.L.Unlock()
528                 // If size<needed && progress.err!=nil here, we'll end
529                 // up reporting a less helpful "EOF reading from cache
530                 // file" below, instead of the actual error fetching
531                 // from upstream to cache file.  This is OK though,
532                 // because our caller (ReadAt) doesn't even report our
533                 // error, it just retries.
534         }
535
536         n, err := heldopen.f.ReadAt(dst, int64(offset))
537         if err != nil {
538                 // wait for any concurrent users to finish, then
539                 // delete this cache entry in case reopening the
540                 // backing file helps.
541                 go cache.deleteHeldopen(cachefilename, heldopen)
542         }
543         return n, err
544 }
545
546 // BlockRead reads an entire block using a 128 KiB buffer.
547 func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
548         cache.setupOnce.Do(cache.setup)
549         i := strings.Index(opts.Locator, "+")
550         if i < 0 || i >= len(opts.Locator) {
551                 return 0, errors.New("invalid block locator: no size hint")
552         }
553         sizestr := opts.Locator[i+1:]
554         i = strings.Index(sizestr, "+")
555         if i > 0 {
556                 sizestr = sizestr[:i]
557         }
558         blocksize, err := strconv.ParseInt(sizestr, 10, 32)
559         if err != nil || blocksize < 0 {
560                 return 0, errors.New("invalid block locator: invalid size hint")
561         }
562
563         offset := 0
564         buf := make([]byte, 131072)
565         for offset < int(blocksize) {
566                 if ctx.Err() != nil {
567                         return offset, ctx.Err()
568                 }
569                 if int(blocksize)-offset < len(buf) {
570                         buf = buf[:int(blocksize)-offset]
571                 }
572                 nr, err := cache.ReadAt(opts.Locator, buf, offset)
573                 if nr > 0 {
574                         nw, err := opts.WriteTo.Write(buf[:nr])
575                         if err != nil {
576                                 return offset + nw, err
577                         }
578                 }
579                 offset += nr
580                 if err != nil {
581                         return offset, err
582                 }
583         }
584         return offset, nil
585 }
586
587 // Start a tidy() goroutine, unless one is already running / recently
588 // finished.
589 func (cache *DiskCache) gotidy() {
590         writes := atomic.AddInt64(&cache.writesSinceTidy, 1)
591         // Skip if another tidy goroutine is running in this process.
592         n := atomic.AddInt32(&cache.tidying, 1)
593         if n != 1 {
594                 atomic.AddInt32(&cache.tidying, -1)
595                 return
596         }
597         // Skip if sizeEstimated is based on an actual measurement and
598         // is below maxSize, and we haven't done very many writes
599         // since last tidy (defined as 1% of number of cache files at
600         // last count).
601         if cache.sizeMeasured > 0 &&
602                 atomic.LoadInt64(&cache.sizeEstimated) < atomic.LoadInt64(&cache.defaultMaxSize) &&
603                 writes < cache.lastFileCount/100 {
604                 atomic.AddInt32(&cache.tidying, -1)
605                 return
606         }
607         go func() {
608                 cache.tidy()
609                 atomic.StoreInt64(&cache.writesSinceTidy, 0)
610                 atomic.AddInt32(&cache.tidying, -1)
611         }()
612 }
613
614 // Delete cache files as needed to control disk usage.
615 func (cache *DiskCache) tidy() {
616         maxsize := int64(cache.maxSize.ByteSize())
617         if maxsize < 1 {
618                 maxsize = atomic.LoadInt64(&cache.defaultMaxSize)
619                 if maxsize == 0 {
620                         // defaultMaxSize not yet computed. Use 10% of
621                         // filesystem capacity (or different
622                         // percentage if indicated by cache.maxSize)
623                         pct := cache.maxSize.Percent()
624                         if pct == 0 {
625                                 pct = 10
626                         }
627                         var stat unix.Statfs_t
628                         if nil == unix.Statfs(cache.dir, &stat) {
629                                 maxsize = int64(stat.Blocks) * stat.Bsize * pct / 100
630                                 atomic.StoreInt64(&cache.defaultMaxSize, maxsize)
631                                 cache.debugf("setting cache size %d = blocks %d * bsize %d * pct %d / 100", maxsize, stat.Blocks, stat.Bsize, pct)
632                         } else {
633                                 // In this case we will set
634                                 // defaultMaxSize below after
635                                 // measuring current usage.
636                         }
637                 }
638         }
639
640         // Bail if a tidy goroutine is running in a different process.
641         lockfile, err := cache.openFile(filepath.Join(cache.dir, "tmp", "tidy.lock"), os.O_CREATE|os.O_WRONLY)
642         if err != nil {
643                 return
644         }
645         defer lockfile.Close()
646         err = syscall.Flock(int(lockfile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
647         if err != nil {
648                 return
649         }
650
651         type entT struct {
652                 path  string
653                 atime time.Time
654                 size  int64
655         }
656         var ents []entT
657         var totalsize int64
658         filepath.Walk(cache.dir, func(path string, info fs.FileInfo, err error) error {
659                 if err != nil {
660                         cache.debugf("tidy: skipping dir %s: %s", path, err)
661                         return nil
662                 }
663                 if info.IsDir() {
664                         return nil
665                 }
666                 if !strings.HasSuffix(path, cacheFileSuffix) && !strings.HasSuffix(path, tmpFileSuffix) {
667                         return nil
668                 }
669                 var atime time.Time
670                 if stat, ok := info.Sys().(*syscall.Stat_t); ok {
671                         // Access time is available (hopefully the
672                         // filesystem is not mounted with noatime)
673                         atime = time.Unix(stat.Atim.Sec, stat.Atim.Nsec)
674                 } else {
675                         // If access time isn't available we fall back
676                         // to sorting by modification time.
677                         atime = info.ModTime()
678                 }
679                 ents = append(ents, entT{path, atime, info.Size()})
680                 totalsize += info.Size()
681                 return nil
682         })
683         if cache.Logger != nil {
684                 cache.Logger.WithFields(logrus.Fields{
685                         "totalsize": totalsize,
686                         "maxsize":   maxsize,
687                 }).Debugf("DiskCache: checked current cache usage")
688         }
689
690         // If MaxSize wasn't specified and we failed to come up with a
691         // defaultSize above, use the larger of {current cache size, 1
692         // GiB} as the defaultMaxSize for subsequent tidy()
693         // operations.
694         if maxsize == 0 {
695                 if totalsize < 1<<30 {
696                         atomic.StoreInt64(&cache.defaultMaxSize, 1<<30)
697                 } else {
698                         atomic.StoreInt64(&cache.defaultMaxSize, totalsize)
699                 }
700                 cache.debugf("found initial size %d, setting defaultMaxSize %d", totalsize, cache.defaultMaxSize)
701                 return
702         }
703
704         // If we're below MaxSize or there's only one block in the
705         // cache, just update the usage estimate and return.
706         //
707         // (We never delete the last block because that would merely
708         // cause the same block to get re-fetched repeatedly from the
709         // backend.)
710         if totalsize <= maxsize || len(ents) == 1 {
711                 atomic.StoreInt64(&cache.sizeMeasured, totalsize)
712                 atomic.StoreInt64(&cache.sizeEstimated, totalsize)
713                 cache.lastFileCount = int64(len(ents))
714                 return
715         }
716
717         // Set a new size target of maxsize minus 5%.  This makes some
718         // room for sizeEstimate to grow before it triggers another
719         // tidy. We don't want to walk/sort an entire large cache
720         // directory each time we write a block.
721         target := maxsize - (maxsize / 20)
722
723         // Delete oldest entries until totalsize < target or we're
724         // down to a single cached block.
725         sort.Slice(ents, func(i, j int) bool {
726                 return ents[i].atime.Before(ents[j].atime)
727         })
728         deleted := 0
729         for _, ent := range ents {
730                 os.Remove(ent.path)
731                 go cache.deleteHeldopen(ent.path, nil)
732                 deleted++
733                 totalsize -= ent.size
734                 if totalsize <= target || deleted == len(ents)-1 {
735                         break
736                 }
737         }
738
739         if cache.Logger != nil {
740                 cache.Logger.WithFields(logrus.Fields{
741                         "deleted":   deleted,
742                         "totalsize": totalsize,
743                 }).Debugf("DiskCache: remaining cache usage after deleting")
744         }
745         atomic.StoreInt64(&cache.sizeMeasured, totalsize)
746         atomic.StoreInt64(&cache.sizeEstimated, totalsize)
747         cache.lastFileCount = int64(len(ents) - deleted)
748 }