20318: Merge branch 'main' into 20318-disk-cache
[arvados.git] / sdk / go / arvados / keep_cache.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "errors"
12         "fmt"
13         "io"
14         "io/fs"
15         "os"
16         "path/filepath"
17         "sort"
18         "strconv"
19         "strings"
20         "sync"
21         "sync/atomic"
22         "syscall"
23         "time"
24
25         "github.com/sirupsen/logrus"
26         "golang.org/x/sys/unix"
27 )
28
29 type KeepGateway interface {
30         ReadAt(locator string, dst []byte, offset int) (int, error)
31         BlockRead(ctx context.Context, opts BlockReadOptions) (int, error)
32         BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error)
33         LocalLocator(locator string) (string, error)
34 }
35
36 // DiskCache wraps KeepGateway, adding a disk-based cache layer.
37 type DiskCache struct {
38         KeepGateway
39         Dir     string
40         MaxSize int64
41         Logger  logrus.FieldLogger
42
43         tidying        int32 // see tidy()
44         tidyHoldUntil  time.Time
45         defaultMaxSize int64
46
47         // The "heldopen" fields are used to open cache files for
48         // reading, and leave them open for future/concurrent ReadAt
49         // operations. See quickReadAt.
50         heldopen     map[string]*openFileEnt
51         heldopenMax  int
52         heldopenLock sync.Mutex
53
54         // The "writing" fields allow multiple concurrent/sequential
55         // ReadAt calls to be notified as a single
56         // read-block-from-backend-into-cache goroutine fills the
57         // cache file.
58         writing     map[string]*writeprogress
59         writingCond *sync.Cond
60         writingLock sync.Mutex
61
62         sizeMeasured  int64 // actual size on disk after last tidy(); zero if not measured yet
63         sizeEstimated int64 // last measured size, plus files we have written since
64 }
65
66 type writeprogress struct {
67         cond *sync.Cond // broadcast whenever size or done changes
68         done bool       // size and err have their final values
69         size int        // bytes copied into cache file so far
70         err  error      // error encountered while copying from backend to cache
71 }
72
73 type openFileEnt struct {
74         sync.RWMutex
75         f   *os.File
76         err error // if err is non-nil, f should not be used.
77 }
78
79 const (
80         cacheFileSuffix  = ".keepcacheblock"
81         tmpFileSuffix    = ".tmp"
82         tidyHoldDuration = 5 * time.Minute // time to re-check cache size even if estimated size is below max
83 )
84
85 func (cache *DiskCache) cacheFile(locator string) string {
86         hash := locator
87         if i := strings.Index(hash, "+"); i > 0 {
88                 hash = hash[:i]
89         }
90         return filepath.Join(cache.Dir, hash[:3], hash+cacheFileSuffix)
91 }
92
93 // Open a cache file, creating the parent dir if necessary.
94 func (cache *DiskCache) openFile(name string, flags int) (*os.File, error) {
95         f, err := os.OpenFile(name, flags, 0600)
96         if os.IsNotExist(err) {
97                 // Create the parent dir and try again. (We could have
98                 // checked/created the parent dir before, but that
99                 // would be less efficient in the much more common
100                 // situation where it already exists.)
101                 parent, _ := filepath.Split(name)
102                 os.Mkdir(parent, 0700)
103                 f, err = os.OpenFile(name, flags, 0600)
104         }
105         return f, err
106 }
107
108 // Rename a file, creating the new path's parent dir if necessary.
109 func (cache *DiskCache) rename(old, new string) error {
110         if nil == os.Rename(old, new) {
111                 return nil
112         }
113         parent, _ := filepath.Split(new)
114         os.Mkdir(parent, 0700)
115         return os.Rename(old, new)
116 }
117
118 func (cache *DiskCache) debugf(format string, args ...interface{}) {
119         logger := cache.Logger
120         if logger == nil {
121                 return
122         }
123         logger.Debugf(format, args...)
124 }
125
126 // BlockWrite writes through to the wrapped KeepGateway, and (if
127 // possible) retains a copy of the written block in the cache.
128 func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
129         unique := fmt.Sprintf("%x.%p%s", os.Getpid(), &opts, tmpFileSuffix)
130         tmpfilename := filepath.Join(cache.Dir, "tmp", unique)
131         tmpfile, err := cache.openFile(tmpfilename, os.O_CREATE|os.O_EXCL|os.O_RDWR)
132         if err != nil {
133                 cache.debugf("BlockWrite: open(%s) failed: %s", tmpfilename, err)
134                 return cache.KeepGateway.BlockWrite(ctx, opts)
135         }
136
137         ctx, cancel := context.WithCancel(ctx)
138         defer cancel()
139         copyerr := make(chan error, 1)
140
141         // Start a goroutine to copy the caller's source data to
142         // tmpfile, a hash checker, and (via pipe) the wrapped
143         // KeepGateway.
144         pipereader, pipewriter := io.Pipe()
145         defer pipereader.Close()
146         go func() {
147                 defer tmpfile.Close()
148                 defer os.Remove(tmpfilename)
149                 defer pipewriter.Close()
150
151                 // Copy from opts.Data or opts.Reader, depending on
152                 // which was provided.
153                 var src io.Reader
154                 if opts.Data != nil {
155                         src = bytes.NewReader(opts.Data)
156                 } else {
157                         src = opts.Reader
158                 }
159
160                 hashcheck := md5.New()
161                 n, err := io.Copy(io.MultiWriter(tmpfile, pipewriter, hashcheck), src)
162                 if err != nil {
163                         copyerr <- err
164                         cancel()
165                         return
166                 } else if opts.DataSize > 0 && opts.DataSize != int(n) {
167                         copyerr <- fmt.Errorf("block size %d did not match provided size %d", n, opts.DataSize)
168                         cancel()
169                         return
170                 }
171                 err = tmpfile.Close()
172                 if err != nil {
173                         // Don't rename tmpfile into place, but allow
174                         // the BlockWrite call to succeed if nothing
175                         // else goes wrong.
176                         return
177                 }
178                 hash := fmt.Sprintf("%x", hashcheck.Sum(nil))
179                 if opts.Hash != "" && opts.Hash != hash {
180                         // Even if the wrapped KeepGateway doesn't
181                         // notice a problem, this should count as an
182                         // error.
183                         copyerr <- fmt.Errorf("block hash %s did not match provided hash %s", hash, opts.Hash)
184                         cancel()
185                         return
186                 }
187                 cachefilename := cache.cacheFile(hash)
188                 err = cache.rename(tmpfilename, cachefilename)
189                 if err != nil {
190                         cache.debugf("BlockWrite: rename(%s, %s) failed: %s", tmpfilename, cachefilename, err)
191                 }
192                 atomic.AddInt64(&cache.sizeEstimated, int64(n))
193                 cache.gotidy()
194         }()
195
196         // Write through to the wrapped KeepGateway from the pipe,
197         // instead of the original reader.
198         newopts := opts
199         if newopts.DataSize == 0 {
200                 newopts.DataSize = len(newopts.Data)
201         }
202         newopts.Reader = pipereader
203         newopts.Data = nil
204
205         resp, err := cache.KeepGateway.BlockWrite(ctx, newopts)
206         if len(copyerr) > 0 {
207                 // If the copy-to-pipe goroutine failed, that error
208                 // will be more helpful than the resulting "context
209                 // canceled" or "read [from pipereader] failed" error
210                 // seen by the wrapped KeepGateway.
211                 //
212                 // If the wrapped KeepGateway encounters an error
213                 // before all the data is copied into the pipe, it
214                 // stops reading from the pipe, which causes the
215                 // io.Copy() in the goroutine to block until our
216                 // deferred pipereader.Close() call runs. In that case
217                 // len(copyerr)==0 here, so the wrapped KeepGateway
218                 // error is the one we return to our caller.
219                 err = <-copyerr
220         }
221         return resp, err
222 }
223
224 type funcwriter func([]byte) (int, error)
225
226 func (fw funcwriter) Write(p []byte) (int, error) {
227         return fw(p)
228 }
229
230 // ReadAt reads the entire block from the wrapped KeepGateway into the
231 // cache if needed, and copies the requested portion into the provided
232 // slice.
233 //
234 // ReadAt returns as soon as the requested portion is available in the
235 // cache. The remainder of the block may continue to be copied into
236 // the cache in the background.
237 func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) {
238         cachefilename := cache.cacheFile(locator)
239         if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil {
240                 return n, err
241         }
242         readf, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDONLY)
243         if err != nil {
244                 return 0, fmt.Errorf("ReadAt: %w", err)
245         }
246         defer readf.Close()
247
248         err = syscall.Flock(int(readf.Fd()), syscall.LOCK_SH)
249         if err != nil {
250                 return 0, fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
251         }
252
253         cache.writingLock.Lock()
254         progress := cache.writing[cachefilename]
255         if progress != nil {
256                 cache.writingLock.Unlock()
257         } else {
258                 progress = &writeprogress{}
259                 progress.cond = sync.NewCond(&sync.Mutex{})
260                 if cache.writing == nil {
261                         cache.writing = map[string]*writeprogress{}
262                 }
263                 cache.writing[cachefilename] = progress
264                 cache.writingLock.Unlock()
265
266                 // Start a goroutine to copy from backend to f. As
267                 // data arrives, wake up any waiting loops (see below)
268                 // so ReadAt() requests for partial data can return as
269                 // soon as the relevant bytes have been copied.
270                 go func() {
271                         var size int
272                         var writef *os.File
273                         var err error
274                         defer func() {
275                                 closeErr := writef.Close()
276                                 if err == nil {
277                                         err = closeErr
278                                 }
279                                 progress.cond.L.Lock()
280                                 progress.err = err
281                                 progress.done = true
282                                 progress.size = size
283                                 progress.cond.L.Unlock()
284                                 progress.cond.Broadcast()
285                                 cache.writingLock.Lock()
286                                 delete(cache.writing, cachefilename)
287                                 cache.writingLock.Unlock()
288                         }()
289                         writef, err = cache.openFile(cachefilename, os.O_WRONLY)
290                         if err != nil {
291                                 err = fmt.Errorf("ReadAt: %w", err)
292                                 return
293                         }
294                         err = syscall.Flock(int(writef.Fd()), syscall.LOCK_SH)
295                         if err != nil {
296                                 err = fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
297                                 return
298                         }
299                         size, err = cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{
300                                 Locator: locator,
301                                 WriteTo: funcwriter(func(p []byte) (int, error) {
302                                         n, err := writef.Write(p)
303                                         if n > 0 {
304                                                 progress.cond.L.Lock()
305                                                 progress.size += n
306                                                 progress.cond.L.Unlock()
307                                                 progress.cond.Broadcast()
308                                         }
309                                         return n, err
310                                 })})
311                         atomic.AddInt64(&cache.sizeEstimated, int64(size))
312                         cache.gotidy()
313                 }()
314         }
315         progress.cond.L.Lock()
316         for !progress.done && progress.size < len(dst)+offset {
317                 progress.cond.Wait()
318         }
319         ok := progress.size >= len(dst)+offset
320         err = progress.err
321         progress.cond.L.Unlock()
322
323         if !ok && err != nil {
324                 // If the copy-from-backend goroutine encountered an
325                 // error before copying enough bytes to satisfy our
326                 // request, we return that error.
327                 return 0, err
328         } else {
329                 // Regardless of whether the copy-from-backend
330                 // goroutine succeeded, or failed after copying the
331                 // bytes we need, the only errors we need to report
332                 // are errors reading from the cache file.
333                 return readf.ReadAt(dst, int64(offset))
334         }
335 }
336
337 var quickReadAtLostRace = errors.New("quickReadAt: lost race")
338
339 // Remove the cache entry for the indicated cachefilename if it
340 // matches expect (quickReadAt() usage), or if expect is nil (tidy()
341 // usage).
342 //
343 // If expect is non-nil, close expect's filehandle.
344 //
345 // If expect is nil and a different cache entry is deleted, close its
346 // filehandle.
347 func (cache *DiskCache) deleteHeldopen(cachefilename string, expect *openFileEnt) {
348         needclose := expect
349
350         cache.heldopenLock.Lock()
351         found := cache.heldopen[cachefilename]
352         if found != nil && (expect == nil || expect == found) {
353                 delete(cache.heldopen, cachefilename)
354                 needclose = found
355         }
356         cache.heldopenLock.Unlock()
357
358         if needclose != nil {
359                 needclose.Lock()
360                 defer needclose.Unlock()
361                 if needclose.f != nil {
362                         needclose.f.Close()
363                         needclose.f = nil
364                 }
365         }
366 }
367
368 // quickReadAt attempts to use a cached-filehandle approach to read
369 // from the indicated file. The expectation is that the caller
370 // (ReadAt) will try a more robust approach when this fails, so
371 // quickReadAt doesn't try especially hard to ensure success in
372 // races. In particular, when there are concurrent calls, and one
373 // fails, that can cause others to fail too.
374 func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int) (int, error) {
375         isnew := false
376         cache.heldopenLock.Lock()
377         if cache.heldopenMax == 0 {
378                 // Choose a reasonable limit on open cache files based
379                 // on RLIMIT_NOFILE. Note Go automatically raises
380                 // softlimit to hardlimit, so it's typically 1048576,
381                 // not 1024.
382                 lim := syscall.Rlimit{}
383                 err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &lim)
384                 if err != nil {
385                         cache.heldopenMax = 256
386                 } else if lim.Cur > 40000 {
387                         cache.heldopenMax = 10000
388                 } else {
389                         cache.heldopenMax = int(lim.Cur / 4)
390                 }
391         }
392         heldopen := cache.heldopen[cachefilename]
393         if heldopen == nil {
394                 isnew = true
395                 heldopen = &openFileEnt{}
396                 if cache.heldopen == nil {
397                         cache.heldopen = make(map[string]*openFileEnt, cache.heldopenMax)
398                 } else if len(cache.heldopen) > cache.heldopenMax {
399                         // Rather than go to the trouble of tracking
400                         // last access time, just close all files, and
401                         // open again as needed. Even in the worst
402                         // pathological case, this causes one extra
403                         // open+close per read, which is not
404                         // especially bad (see benchmarks).
405                         go func(m map[string]*openFileEnt) {
406                                 for _, heldopen := range m {
407                                         heldopen.Lock()
408                                         defer heldopen.Unlock()
409                                         if heldopen.f != nil {
410                                                 heldopen.f.Close()
411                                                 heldopen.f = nil
412                                         }
413                                 }
414                         }(cache.heldopen)
415                         cache.heldopen = nil
416                 }
417                 cache.heldopen[cachefilename] = heldopen
418                 heldopen.Lock()
419         }
420         cache.heldopenLock.Unlock()
421
422         if isnew {
423                 // Open and flock the file, then call wg.Done() to
424                 // unblock any other goroutines that are waiting in
425                 // the !isnew case above.
426                 f, err := os.Open(cachefilename)
427                 if err == nil {
428                         err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
429                         if err == nil {
430                                 heldopen.f = f
431                         } else {
432                                 f.Close()
433                         }
434                 }
435                 if err != nil {
436                         heldopen.err = err
437                         go cache.deleteHeldopen(cachefilename, heldopen)
438                 }
439                 heldopen.Unlock()
440         }
441         // Acquire read lock to ensure (1) initialization is complete,
442         // if it's done by a different goroutine, and (2) any "delete
443         // old/unused entries" waits for our read to finish before
444         // closing the file.
445         heldopen.RLock()
446         defer heldopen.RUnlock()
447         if heldopen.err != nil {
448                 // Other goroutine encountered an error during setup
449                 return 0, heldopen.err
450         } else if heldopen.f == nil {
451                 // Other goroutine closed the file before we got RLock
452                 return 0, quickReadAtLostRace
453         }
454
455         // If another goroutine is currently writing the file, wait
456         // for it to catch up to the end of the range we need.
457         cache.writingLock.Lock()
458         progress := cache.writing[cachefilename]
459         cache.writingLock.Unlock()
460         if progress != nil {
461                 progress.cond.L.Lock()
462                 for !progress.done && progress.size < len(dst)+offset {
463                         progress.cond.Wait()
464                 }
465                 progress.cond.L.Unlock()
466                 // If size<needed && progress.err!=nil here, we'll end
467                 // up reporting a less helpful "EOF reading from cache
468                 // file" below, instead of the actual error fetching
469                 // from upstream to cache file.  This is OK though,
470                 // because our caller (ReadAt) doesn't even report our
471                 // error, it just retries.
472         }
473
474         n, err := heldopen.f.ReadAt(dst, int64(offset))
475         if err != nil {
476                 // wait for any concurrent users to finish, then
477                 // delete this cache entry in case reopening the
478                 // backing file helps.
479                 go cache.deleteHeldopen(cachefilename, heldopen)
480         }
481         return n, err
482 }
483
484 // BlockRead reads an entire block using a 128 KiB buffer.
485 func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
486         i := strings.Index(opts.Locator, "+")
487         if i < 0 || i >= len(opts.Locator) {
488                 return 0, errors.New("invalid block locator: no size hint")
489         }
490         sizestr := opts.Locator[i+1:]
491         i = strings.Index(sizestr, "+")
492         if i > 0 {
493                 sizestr = sizestr[:i]
494         }
495         blocksize, err := strconv.ParseInt(sizestr, 10, 32)
496         if err != nil || blocksize < 0 {
497                 return 0, errors.New("invalid block locator: invalid size hint")
498         }
499
500         offset := 0
501         buf := make([]byte, 131072)
502         for offset < int(blocksize) {
503                 if ctx.Err() != nil {
504                         return offset, ctx.Err()
505                 }
506                 if int(blocksize)-offset > len(buf) {
507                         buf = buf[:int(blocksize)-offset]
508                 }
509                 nr, err := cache.ReadAt(opts.Locator, buf, offset)
510                 if nr > 0 {
511                         nw, err := opts.WriteTo.Write(buf)
512                         if err != nil {
513                                 return offset + nw, err
514                         }
515                 }
516                 offset += nr
517                 if err != nil {
518                         return offset, err
519                 }
520         }
521         return offset, nil
522 }
523
524 // Start a tidy() goroutine, unless one is already running / recently
525 // finished.
526 func (cache *DiskCache) gotidy() {
527         // Skip if another tidy goroutine is running in this process.
528         n := atomic.AddInt32(&cache.tidying, 1)
529         if n != 1 {
530                 atomic.AddInt32(&cache.tidying, -1)
531                 return
532         }
533         // Skip if sizeEstimated is based on an actual measurement and
534         // is below MaxSize, and we haven't reached the "recheck
535         // anyway" time threshold.
536         if cache.sizeMeasured > 0 &&
537                 atomic.LoadInt64(&cache.sizeEstimated) < cache.MaxSize &&
538                 time.Now().Before(cache.tidyHoldUntil) {
539                 atomic.AddInt32(&cache.tidying, -1)
540                 return
541         }
542         go func() {
543                 cache.tidy()
544                 cache.tidyHoldUntil = time.Now().Add(tidyHoldDuration)
545                 atomic.AddInt32(&cache.tidying, -1)
546         }()
547 }
548
549 // Delete cache files as needed to control disk usage.
550 func (cache *DiskCache) tidy() {
551         maxsize := cache.MaxSize
552         if maxsize < 1 {
553                 if maxsize = atomic.LoadInt64(&cache.defaultMaxSize); maxsize == 0 {
554                         var stat unix.Statfs_t
555                         if nil == unix.Statfs(cache.Dir, &stat) {
556                                 maxsize = int64(stat.Bavail) * stat.Bsize / 10
557                         }
558                         atomic.StoreInt64(&cache.defaultMaxSize, maxsize)
559                 }
560         }
561
562         // Bail if a tidy goroutine is running in a different process.
563         lockfile, err := cache.openFile(filepath.Join(cache.Dir, "tmp", "tidy.lock"), os.O_CREATE|os.O_WRONLY)
564         if err != nil {
565                 return
566         }
567         defer lockfile.Close()
568         err = syscall.Flock(int(lockfile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
569         if err != nil {
570                 return
571         }
572
573         type entT struct {
574                 path  string
575                 atime time.Time
576                 size  int64
577         }
578         var ents []entT
579         var totalsize int64
580         filepath.Walk(cache.Dir, func(path string, info fs.FileInfo, err error) error {
581                 if err != nil {
582                         cache.debugf("tidy: skipping dir %s: %s", path, err)
583                         return nil
584                 }
585                 if info.IsDir() {
586                         return nil
587                 }
588                 if !strings.HasSuffix(path, cacheFileSuffix) && !strings.HasSuffix(path, tmpFileSuffix) {
589                         return nil
590                 }
591                 var atime time.Time
592                 if stat, ok := info.Sys().(*syscall.Stat_t); ok {
593                         // Access time is available (hopefully the
594                         // filesystem is not mounted with noatime)
595                         atime = time.Unix(stat.Atim.Sec, stat.Atim.Nsec)
596                 } else {
597                         // If access time isn't available we fall back
598                         // to sorting by modification time.
599                         atime = info.ModTime()
600                 }
601                 ents = append(ents, entT{path, atime, info.Size()})
602                 totalsize += info.Size()
603                 return nil
604         })
605         if cache.Logger != nil {
606                 cache.Logger.WithFields(logrus.Fields{
607                         "totalsize": totalsize,
608                         "maxsize":   maxsize,
609                 }).Debugf("DiskCache: checked current cache usage")
610         }
611
612         // If MaxSize wasn't specified and we failed to come up with a
613         // defaultSize above, use the larger of {current cache size, 1
614         // GiB} as the defaultSize for subsequent tidy() operations.
615         if maxsize == 0 {
616                 if totalsize < 1<<30 {
617                         atomic.StoreInt64(&cache.defaultMaxSize, 1<<30)
618                 } else {
619                         atomic.StoreInt64(&cache.defaultMaxSize, totalsize)
620                 }
621                 cache.debugf("found initial size %d, setting defaultMaxSize %d", totalsize, cache.defaultMaxSize)
622                 return
623         }
624
625         // If we're below MaxSize or there's only one block in the
626         // cache, just update the usage estimate and return.
627         //
628         // (We never delete the last block because that would merely
629         // cause the same block to get re-fetched repeatedly from the
630         // backend.)
631         if totalsize <= maxsize || len(ents) == 1 {
632                 atomic.StoreInt64(&cache.sizeMeasured, totalsize)
633                 atomic.StoreInt64(&cache.sizeEstimated, totalsize)
634                 return
635         }
636
637         // Set a new size target of maxsize minus 5%.  This makes some
638         // room for sizeEstimate to grow before it triggers another
639         // tidy. We don't want to walk/sort an entire large cache
640         // directory each time we write a block.
641         target := maxsize - (maxsize / 20)
642
643         // Delete oldest entries until totalsize < target or we're
644         // down to a single cached block.
645         sort.Slice(ents, func(i, j int) bool {
646                 return ents[i].atime.Before(ents[j].atime)
647         })
648         deleted := 0
649         for _, ent := range ents {
650                 os.Remove(ent.path)
651                 go cache.deleteHeldopen(ent.path, nil)
652                 deleted++
653                 totalsize -= ent.size
654                 if totalsize <= target || deleted == len(ents)-1 {
655                         break
656                 }
657         }
658
659         if cache.Logger != nil {
660                 cache.Logger.WithFields(logrus.Fields{
661                         "deleted":   deleted,
662                         "totalsize": totalsize,
663                 }).Debugf("DiskCache: remaining cache usage after deleting")
664         }
665         atomic.StoreInt64(&cache.sizeMeasured, totalsize)
666         atomic.StoreInt64(&cache.sizeEstimated, totalsize)
667 }