20318: Sync cache state after 1% churn instead of 5 minute timer.
[arvados.git] / sdk / go / arvados / keep_cache.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "errors"
12         "fmt"
13         "io"
14         "io/fs"
15         "os"
16         "path/filepath"
17         "sort"
18         "strconv"
19         "strings"
20         "sync"
21         "sync/atomic"
22         "syscall"
23         "time"
24
25         "github.com/sirupsen/logrus"
26         "golang.org/x/sys/unix"
27 )
28
29 type KeepGateway interface {
30         ReadAt(locator string, dst []byte, offset int) (int, error)
31         BlockRead(ctx context.Context, opts BlockReadOptions) (int, error)
32         BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error)
33         LocalLocator(locator string) (string, error)
34 }
35
36 // DiskCache wraps KeepGateway, adding a disk-based cache layer.
37 type DiskCache struct {
38         KeepGateway
39         Dir     string
40         MaxSize ByteSizeOrPercent
41         Logger  logrus.FieldLogger
42
43         *sharedCache
44         setupOnce sync.Once
45 }
46
47 var (
48         sharedCachesLock sync.Mutex
49         sharedCaches     = map[string]*sharedCache{}
50 )
51
52 // sharedCache has fields that coordinate the cache usage in a single
53 // cache directory; it can be shared by multiple DiskCaches.
54 //
55 // This serves to share a single pool of held-open filehandles, a
56 // single tidying goroutine, etc., even when the program (like
57 // keep-web) uses multiple KeepGateway stacks that use different auth
58 // tokens, etc.
59 type sharedCache struct {
60         dir     string
61         maxSize ByteSizeOrPercent
62
63         tidying        int32 // see tidy()
64         defaultMaxSize int64
65
66         // The "heldopen" fields are used to open cache files for
67         // reading, and leave them open for future/concurrent ReadAt
68         // operations. See quickReadAt.
69         heldopen     map[string]*openFileEnt
70         heldopenMax  int
71         heldopenLock sync.Mutex
72
73         // The "writing" fields allow multiple concurrent/sequential
74         // ReadAt calls to be notified as a single
75         // read-block-from-backend-into-cache goroutine fills the
76         // cache file.
77         writing     map[string]*writeprogress
78         writingCond *sync.Cond
79         writingLock sync.Mutex
80
81         sizeMeasured    int64 // actual size on disk after last tidy(); zero if not measured yet
82         sizeEstimated   int64 // last measured size, plus files we have written since
83         lastFileCount   int64 // number of files on disk at last count
84         writesSinceTidy int64 // number of files written since last tidy()
85 }
86
87 type writeprogress struct {
88         cond *sync.Cond // broadcast whenever size or done changes
89         done bool       // size and err have their final values
90         size int        // bytes copied into cache file so far
91         err  error      // error encountered while copying from backend to cache
92 }
93
94 type openFileEnt struct {
95         sync.RWMutex
96         f   *os.File
97         err error // if err is non-nil, f should not be used.
98 }
99
100 const (
101         cacheFileSuffix = ".keepcacheblock"
102         tmpFileSuffix   = ".tmp"
103 )
104
105 func (cache *DiskCache) setup() {
106         sharedCachesLock.Lock()
107         defer sharedCachesLock.Unlock()
108         dir := cache.Dir
109         if sharedCaches[dir] == nil {
110                 sharedCaches[dir] = &sharedCache{dir: dir, maxSize: cache.MaxSize}
111         }
112         cache.sharedCache = sharedCaches[dir]
113 }
114
115 func (cache *DiskCache) cacheFile(locator string) string {
116         hash := locator
117         if i := strings.Index(hash, "+"); i > 0 {
118                 hash = hash[:i]
119         }
120         return filepath.Join(cache.dir, hash[:3], hash+cacheFileSuffix)
121 }
122
123 // Open a cache file, creating the parent dir if necessary.
124 func (cache *DiskCache) openFile(name string, flags int) (*os.File, error) {
125         f, err := os.OpenFile(name, flags, 0600)
126         if os.IsNotExist(err) {
127                 // Create the parent dir and try again. (We could have
128                 // checked/created the parent dir before, but that
129                 // would be less efficient in the much more common
130                 // situation where it already exists.)
131                 parent, _ := filepath.Split(name)
132                 os.Mkdir(parent, 0700)
133                 f, err = os.OpenFile(name, flags, 0600)
134         }
135         return f, err
136 }
137
138 // Rename a file, creating the new path's parent dir if necessary.
139 func (cache *DiskCache) rename(old, new string) error {
140         if nil == os.Rename(old, new) {
141                 return nil
142         }
143         parent, _ := filepath.Split(new)
144         os.Mkdir(parent, 0700)
145         return os.Rename(old, new)
146 }
147
148 func (cache *DiskCache) debugf(format string, args ...interface{}) {
149         logger := cache.Logger
150         if logger == nil {
151                 return
152         }
153         logger.Debugf(format, args...)
154 }
155
156 // BlockWrite writes through to the wrapped KeepGateway, and (if
157 // possible) retains a copy of the written block in the cache.
158 func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
159         cache.setupOnce.Do(cache.setup)
160         unique := fmt.Sprintf("%x.%p%s", os.Getpid(), &opts, tmpFileSuffix)
161         tmpfilename := filepath.Join(cache.dir, "tmp", unique)
162         tmpfile, err := cache.openFile(tmpfilename, os.O_CREATE|os.O_EXCL|os.O_RDWR)
163         if err != nil {
164                 cache.debugf("BlockWrite: open(%s) failed: %s", tmpfilename, err)
165                 return cache.KeepGateway.BlockWrite(ctx, opts)
166         }
167
168         ctx, cancel := context.WithCancel(ctx)
169         defer cancel()
170         copyerr := make(chan error, 1)
171
172         // Start a goroutine to copy the caller's source data to
173         // tmpfile, a hash checker, and (via pipe) the wrapped
174         // KeepGateway.
175         pipereader, pipewriter := io.Pipe()
176         defer pipereader.Close()
177         go func() {
178                 defer tmpfile.Close()
179                 defer os.Remove(tmpfilename)
180                 defer pipewriter.Close()
181
182                 // Copy from opts.Data or opts.Reader, depending on
183                 // which was provided.
184                 var src io.Reader
185                 if opts.Data != nil {
186                         src = bytes.NewReader(opts.Data)
187                 } else {
188                         src = opts.Reader
189                 }
190
191                 hashcheck := md5.New()
192                 n, err := io.Copy(io.MultiWriter(tmpfile, pipewriter, hashcheck), src)
193                 if err != nil {
194                         copyerr <- err
195                         cancel()
196                         return
197                 } else if opts.DataSize > 0 && opts.DataSize != int(n) {
198                         copyerr <- fmt.Errorf("block size %d did not match provided size %d", n, opts.DataSize)
199                         cancel()
200                         return
201                 }
202                 err = tmpfile.Close()
203                 if err != nil {
204                         // Don't rename tmpfile into place, but allow
205                         // the BlockWrite call to succeed if nothing
206                         // else goes wrong.
207                         return
208                 }
209                 hash := fmt.Sprintf("%x", hashcheck.Sum(nil))
210                 if opts.Hash != "" && opts.Hash != hash {
211                         // Even if the wrapped KeepGateway doesn't
212                         // notice a problem, this should count as an
213                         // error.
214                         copyerr <- fmt.Errorf("block hash %s did not match provided hash %s", hash, opts.Hash)
215                         cancel()
216                         return
217                 }
218                 cachefilename := cache.cacheFile(hash)
219                 err = cache.rename(tmpfilename, cachefilename)
220                 if err != nil {
221                         cache.debugf("BlockWrite: rename(%s, %s) failed: %s", tmpfilename, cachefilename, err)
222                 }
223                 atomic.AddInt64(&cache.sizeEstimated, int64(n))
224                 cache.gotidy()
225         }()
226
227         // Write through to the wrapped KeepGateway from the pipe,
228         // instead of the original reader.
229         newopts := opts
230         if newopts.DataSize == 0 {
231                 newopts.DataSize = len(newopts.Data)
232         }
233         newopts.Reader = pipereader
234         newopts.Data = nil
235
236         resp, err := cache.KeepGateway.BlockWrite(ctx, newopts)
237         if len(copyerr) > 0 {
238                 // If the copy-to-pipe goroutine failed, that error
239                 // will be more helpful than the resulting "context
240                 // canceled" or "read [from pipereader] failed" error
241                 // seen by the wrapped KeepGateway.
242                 //
243                 // If the wrapped KeepGateway encounters an error
244                 // before all the data is copied into the pipe, it
245                 // stops reading from the pipe, which causes the
246                 // io.Copy() in the goroutine to block until our
247                 // deferred pipereader.Close() call runs. In that case
248                 // len(copyerr)==0 here, so the wrapped KeepGateway
249                 // error is the one we return to our caller.
250                 err = <-copyerr
251         }
252         return resp, err
253 }
254
255 type funcwriter func([]byte) (int, error)
256
257 func (fw funcwriter) Write(p []byte) (int, error) {
258         return fw(p)
259 }
260
261 // ReadAt reads the entire block from the wrapped KeepGateway into the
262 // cache if needed, and copies the requested portion into the provided
263 // slice.
264 //
265 // ReadAt returns as soon as the requested portion is available in the
266 // cache. The remainder of the block may continue to be copied into
267 // the cache in the background.
268 func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) {
269         cache.setupOnce.Do(cache.setup)
270         cachefilename := cache.cacheFile(locator)
271         if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil {
272                 return n, err
273         }
274         readf, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDONLY)
275         if err != nil {
276                 return 0, fmt.Errorf("ReadAt: %w", err)
277         }
278         defer readf.Close()
279
280         err = syscall.Flock(int(readf.Fd()), syscall.LOCK_SH)
281         if err != nil {
282                 return 0, fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
283         }
284
285         cache.writingLock.Lock()
286         progress := cache.writing[cachefilename]
287         if progress != nil {
288                 cache.writingLock.Unlock()
289         } else {
290                 progress = &writeprogress{}
291                 progress.cond = sync.NewCond(&sync.Mutex{})
292                 if cache.writing == nil {
293                         cache.writing = map[string]*writeprogress{}
294                 }
295                 cache.writing[cachefilename] = progress
296                 cache.writingLock.Unlock()
297
298                 // Start a goroutine to copy from backend to f. As
299                 // data arrives, wake up any waiting loops (see below)
300                 // so ReadAt() requests for partial data can return as
301                 // soon as the relevant bytes have been copied.
302                 go func() {
303                         var size int
304                         var writef *os.File
305                         var err error
306                         defer func() {
307                                 closeErr := writef.Close()
308                                 if err == nil {
309                                         err = closeErr
310                                 }
311                                 progress.cond.L.Lock()
312                                 progress.err = err
313                                 progress.done = true
314                                 progress.size = size
315                                 progress.cond.L.Unlock()
316                                 progress.cond.Broadcast()
317                                 cache.writingLock.Lock()
318                                 delete(cache.writing, cachefilename)
319                                 cache.writingLock.Unlock()
320                         }()
321                         writef, err = cache.openFile(cachefilename, os.O_WRONLY)
322                         if err != nil {
323                                 err = fmt.Errorf("ReadAt: %w", err)
324                                 return
325                         }
326                         err = syscall.Flock(int(writef.Fd()), syscall.LOCK_SH)
327                         if err != nil {
328                                 err = fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
329                                 return
330                         }
331                         size, err = cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{
332                                 Locator: locator,
333                                 WriteTo: funcwriter(func(p []byte) (int, error) {
334                                         n, err := writef.Write(p)
335                                         if n > 0 {
336                                                 progress.cond.L.Lock()
337                                                 progress.size += n
338                                                 progress.cond.L.Unlock()
339                                                 progress.cond.Broadcast()
340                                         }
341                                         return n, err
342                                 })})
343                         atomic.AddInt64(&cache.sizeEstimated, int64(size))
344                         cache.gotidy()
345                 }()
346         }
347         progress.cond.L.Lock()
348         for !progress.done && progress.size < len(dst)+offset {
349                 progress.cond.Wait()
350         }
351         ok := progress.size >= len(dst)+offset
352         err = progress.err
353         progress.cond.L.Unlock()
354
355         if !ok && err != nil {
356                 // If the copy-from-backend goroutine encountered an
357                 // error before copying enough bytes to satisfy our
358                 // request, we return that error.
359                 return 0, err
360         } else {
361                 // Regardless of whether the copy-from-backend
362                 // goroutine succeeded, or failed after copying the
363                 // bytes we need, the only errors we need to report
364                 // are errors reading from the cache file.
365                 return readf.ReadAt(dst, int64(offset))
366         }
367 }
368
369 var quickReadAtLostRace = errors.New("quickReadAt: lost race")
370
371 // Remove the cache entry for the indicated cachefilename if it
372 // matches expect (quickReadAt() usage), or if expect is nil (tidy()
373 // usage).
374 //
375 // If expect is non-nil, close expect's filehandle.
376 //
377 // If expect is nil and a different cache entry is deleted, close its
378 // filehandle.
379 func (cache *DiskCache) deleteHeldopen(cachefilename string, expect *openFileEnt) {
380         needclose := expect
381
382         cache.heldopenLock.Lock()
383         found := cache.heldopen[cachefilename]
384         if found != nil && (expect == nil || expect == found) {
385                 delete(cache.heldopen, cachefilename)
386                 needclose = found
387         }
388         cache.heldopenLock.Unlock()
389
390         if needclose != nil {
391                 needclose.Lock()
392                 defer needclose.Unlock()
393                 if needclose.f != nil {
394                         needclose.f.Close()
395                         needclose.f = nil
396                 }
397         }
398 }
399
400 // quickReadAt attempts to use a cached-filehandle approach to read
401 // from the indicated file. The expectation is that the caller
402 // (ReadAt) will try a more robust approach when this fails, so
403 // quickReadAt doesn't try especially hard to ensure success in
404 // races. In particular, when there are concurrent calls, and one
405 // fails, that can cause others to fail too.
406 func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int) (int, error) {
407         isnew := false
408         cache.heldopenLock.Lock()
409         if cache.heldopenMax == 0 {
410                 // Choose a reasonable limit on open cache files based
411                 // on RLIMIT_NOFILE. Note Go automatically raises
412                 // softlimit to hardlimit, so it's typically 1048576,
413                 // not 1024.
414                 lim := syscall.Rlimit{}
415                 err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &lim)
416                 if err != nil {
417                         cache.heldopenMax = 100
418                 } else if lim.Cur > 400000 {
419                         cache.heldopenMax = 10000
420                 } else {
421                         cache.heldopenMax = int(lim.Cur / 40)
422                 }
423         }
424         heldopen := cache.heldopen[cachefilename]
425         if heldopen == nil {
426                 isnew = true
427                 heldopen = &openFileEnt{}
428                 if cache.heldopen == nil {
429                         cache.heldopen = make(map[string]*openFileEnt, cache.heldopenMax)
430                 } else if len(cache.heldopen) > cache.heldopenMax {
431                         // Rather than go to the trouble of tracking
432                         // last access time, just close all files, and
433                         // open again as needed. Even in the worst
434                         // pathological case, this causes one extra
435                         // open+close per read, which is not
436                         // especially bad (see benchmarks).
437                         go func(m map[string]*openFileEnt) {
438                                 for _, heldopen := range m {
439                                         heldopen.Lock()
440                                         defer heldopen.Unlock()
441                                         if heldopen.f != nil {
442                                                 heldopen.f.Close()
443                                                 heldopen.f = nil
444                                         }
445                                 }
446                         }(cache.heldopen)
447                         cache.heldopen = nil
448                 }
449                 cache.heldopen[cachefilename] = heldopen
450                 heldopen.Lock()
451         }
452         cache.heldopenLock.Unlock()
453
454         if isnew {
455                 // Open and flock the file, then call wg.Done() to
456                 // unblock any other goroutines that are waiting in
457                 // the !isnew case above.
458                 f, err := os.Open(cachefilename)
459                 if err == nil {
460                         err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
461                         if err == nil {
462                                 heldopen.f = f
463                         } else {
464                                 f.Close()
465                         }
466                 }
467                 if err != nil {
468                         heldopen.err = err
469                         go cache.deleteHeldopen(cachefilename, heldopen)
470                 }
471                 heldopen.Unlock()
472         }
473         // Acquire read lock to ensure (1) initialization is complete,
474         // if it's done by a different goroutine, and (2) any "delete
475         // old/unused entries" waits for our read to finish before
476         // closing the file.
477         heldopen.RLock()
478         defer heldopen.RUnlock()
479         if heldopen.err != nil {
480                 // Other goroutine encountered an error during setup
481                 return 0, heldopen.err
482         } else if heldopen.f == nil {
483                 // Other goroutine closed the file before we got RLock
484                 return 0, quickReadAtLostRace
485         }
486
487         // If another goroutine is currently writing the file, wait
488         // for it to catch up to the end of the range we need.
489         cache.writingLock.Lock()
490         progress := cache.writing[cachefilename]
491         cache.writingLock.Unlock()
492         if progress != nil {
493                 progress.cond.L.Lock()
494                 for !progress.done && progress.size < len(dst)+offset {
495                         progress.cond.Wait()
496                 }
497                 progress.cond.L.Unlock()
498                 // If size<needed && progress.err!=nil here, we'll end
499                 // up reporting a less helpful "EOF reading from cache
500                 // file" below, instead of the actual error fetching
501                 // from upstream to cache file.  This is OK though,
502                 // because our caller (ReadAt) doesn't even report our
503                 // error, it just retries.
504         }
505
506         n, err := heldopen.f.ReadAt(dst, int64(offset))
507         if err != nil {
508                 // wait for any concurrent users to finish, then
509                 // delete this cache entry in case reopening the
510                 // backing file helps.
511                 go cache.deleteHeldopen(cachefilename, heldopen)
512         }
513         return n, err
514 }
515
516 // BlockRead reads an entire block using a 128 KiB buffer.
517 func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
518         cache.setupOnce.Do(cache.setup)
519         i := strings.Index(opts.Locator, "+")
520         if i < 0 || i >= len(opts.Locator) {
521                 return 0, errors.New("invalid block locator: no size hint")
522         }
523         sizestr := opts.Locator[i+1:]
524         i = strings.Index(sizestr, "+")
525         if i > 0 {
526                 sizestr = sizestr[:i]
527         }
528         blocksize, err := strconv.ParseInt(sizestr, 10, 32)
529         if err != nil || blocksize < 0 {
530                 return 0, errors.New("invalid block locator: invalid size hint")
531         }
532
533         offset := 0
534         buf := make([]byte, 131072)
535         for offset < int(blocksize) {
536                 if ctx.Err() != nil {
537                         return offset, ctx.Err()
538                 }
539                 if int(blocksize)-offset > len(buf) {
540                         buf = buf[:int(blocksize)-offset]
541                 }
542                 nr, err := cache.ReadAt(opts.Locator, buf, offset)
543                 if nr > 0 {
544                         nw, err := opts.WriteTo.Write(buf)
545                         if err != nil {
546                                 return offset + nw, err
547                         }
548                 }
549                 offset += nr
550                 if err != nil {
551                         return offset, err
552                 }
553         }
554         return offset, nil
555 }
556
557 // Start a tidy() goroutine, unless one is already running / recently
558 // finished.
559 func (cache *DiskCache) gotidy() {
560         writes := atomic.AddInt64(&cache.writesSinceTidy, 1)
561         // Skip if another tidy goroutine is running in this process.
562         n := atomic.AddInt32(&cache.tidying, 1)
563         if n != 1 {
564                 atomic.AddInt32(&cache.tidying, -1)
565                 return
566         }
567         // Skip if sizeEstimated is based on an actual measurement and
568         // is below maxSize, and we haven't done very many writes
569         // since last tidy (defined as 1% of number of cache files at
570         // last count).
571         if cache.sizeMeasured > 0 &&
572                 atomic.LoadInt64(&cache.sizeEstimated) < atomic.LoadInt64(&cache.defaultMaxSize) &&
573                 writes < cache.lastFileCount/100 {
574                 atomic.AddInt32(&cache.tidying, -1)
575                 return
576         }
577         go func() {
578                 cache.tidy()
579                 atomic.StoreInt64(&cache.writesSinceTidy, 0)
580                 atomic.AddInt32(&cache.tidying, -1)
581         }()
582 }
583
584 // Delete cache files as needed to control disk usage.
585 func (cache *DiskCache) tidy() {
586         maxsize := int64(cache.maxSize.ByteSize())
587         if maxsize < 1 {
588                 maxsize = atomic.LoadInt64(&cache.defaultMaxSize)
589                 if maxsize == 0 {
590                         // defaultMaxSize not yet computed. Use 10% of
591                         // filesystem capacity (or different
592                         // percentage if indicated by cache.maxSize)
593                         pct := cache.maxSize.Percent()
594                         if pct == 0 {
595                                 pct = 10
596                         }
597                         var stat unix.Statfs_t
598                         if nil == unix.Statfs(cache.dir, &stat) {
599                                 maxsize = int64(stat.Bavail) * stat.Bsize * pct / 100
600                                 atomic.StoreInt64(&cache.defaultMaxSize, maxsize)
601                         } else {
602                                 // In this case we will set
603                                 // defaultMaxSize below after
604                                 // measuring current usage.
605                         }
606                 }
607         }
608
609         // Bail if a tidy goroutine is running in a different process.
610         lockfile, err := cache.openFile(filepath.Join(cache.dir, "tmp", "tidy.lock"), os.O_CREATE|os.O_WRONLY)
611         if err != nil {
612                 return
613         }
614         defer lockfile.Close()
615         err = syscall.Flock(int(lockfile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
616         if err != nil {
617                 return
618         }
619
620         type entT struct {
621                 path  string
622                 atime time.Time
623                 size  int64
624         }
625         var ents []entT
626         var totalsize int64
627         filepath.Walk(cache.dir, func(path string, info fs.FileInfo, err error) error {
628                 if err != nil {
629                         cache.debugf("tidy: skipping dir %s: %s", path, err)
630                         return nil
631                 }
632                 if info.IsDir() {
633                         return nil
634                 }
635                 if !strings.HasSuffix(path, cacheFileSuffix) && !strings.HasSuffix(path, tmpFileSuffix) {
636                         return nil
637                 }
638                 var atime time.Time
639                 if stat, ok := info.Sys().(*syscall.Stat_t); ok {
640                         // Access time is available (hopefully the
641                         // filesystem is not mounted with noatime)
642                         atime = time.Unix(stat.Atim.Sec, stat.Atim.Nsec)
643                 } else {
644                         // If access time isn't available we fall back
645                         // to sorting by modification time.
646                         atime = info.ModTime()
647                 }
648                 ents = append(ents, entT{path, atime, info.Size()})
649                 totalsize += info.Size()
650                 return nil
651         })
652         if cache.Logger != nil {
653                 cache.Logger.WithFields(logrus.Fields{
654                         "totalsize": totalsize,
655                         "maxsize":   maxsize,
656                 }).Debugf("DiskCache: checked current cache usage")
657         }
658
659         // If MaxSize wasn't specified and we failed to come up with a
660         // defaultSize above, use the larger of {current cache size, 1
661         // GiB} as the defaultMaxSize for subsequent tidy()
662         // operations.
663         if maxsize == 0 {
664                 if totalsize < 1<<30 {
665                         atomic.StoreInt64(&cache.defaultMaxSize, 1<<30)
666                 } else {
667                         atomic.StoreInt64(&cache.defaultMaxSize, totalsize)
668                 }
669                 cache.debugf("found initial size %d, setting defaultMaxSize %d", totalsize, cache.defaultMaxSize)
670                 return
671         }
672
673         // If we're below MaxSize or there's only one block in the
674         // cache, just update the usage estimate and return.
675         //
676         // (We never delete the last block because that would merely
677         // cause the same block to get re-fetched repeatedly from the
678         // backend.)
679         if totalsize <= maxsize || len(ents) == 1 {
680                 atomic.StoreInt64(&cache.sizeMeasured, totalsize)
681                 atomic.StoreInt64(&cache.sizeEstimated, totalsize)
682                 cache.lastFileCount = int64(len(ents))
683                 return
684         }
685
686         // Set a new size target of maxsize minus 5%.  This makes some
687         // room for sizeEstimate to grow before it triggers another
688         // tidy. We don't want to walk/sort an entire large cache
689         // directory each time we write a block.
690         target := maxsize - (maxsize / 20)
691
692         // Delete oldest entries until totalsize < target or we're
693         // down to a single cached block.
694         sort.Slice(ents, func(i, j int) bool {
695                 return ents[i].atime.Before(ents[j].atime)
696         })
697         deleted := 0
698         for _, ent := range ents {
699                 os.Remove(ent.path)
700                 go cache.deleteHeldopen(ent.path, nil)
701                 deleted++
702                 totalsize -= ent.size
703                 if totalsize <= target || deleted == len(ents)-1 {
704                         break
705                 }
706         }
707
708         if cache.Logger != nil {
709                 cache.Logger.WithFields(logrus.Fields{
710                         "deleted":   deleted,
711                         "totalsize": totalsize,
712                 }).Debugf("DiskCache: remaining cache usage after deleting")
713         }
714         atomic.StoreInt64(&cache.sizeMeasured, totalsize)
715         atomic.StoreInt64(&cache.sizeEstimated, totalsize)
716         cache.lastFileCount = int64(len(ents) - deleted)
717 }