20318: Route KeepClient block writes through disk cache.
[arvados.git] / sdk / go / arvados / keep_cache.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "errors"
12         "fmt"
13         "io"
14         "io/fs"
15         "os"
16         "path/filepath"
17         "sort"
18         "strconv"
19         "strings"
20         "sync"
21         "sync/atomic"
22         "syscall"
23         "time"
24
25         "github.com/sirupsen/logrus"
26         "golang.org/x/sys/unix"
27 )
28
29 type KeepGateway interface {
30         ReadAt(locator string, dst []byte, offset int) (int, error)
31         BlockRead(ctx context.Context, opts BlockReadOptions) (int, error)
32         BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error)
33         LocalLocator(locator string) (string, error)
34 }
35
36 // DiskCache wraps KeepGateway, adding a disk-based cache layer.
37 type DiskCache struct {
38         KeepGateway
39         Dir     string
40         MaxSize int64
41         Logger  logrus.FieldLogger
42
43         tidying        int32 // see tidy()
44         tidyHoldUntil  time.Time
45         defaultMaxSize int64
46
47         // The "heldopen" fields are used to open cache files for
48         // reading, and leave them open for future/concurrent ReadAt
49         // operations. See quickReadAt.
50         heldopen     map[string]*openFileEnt
51         heldopenMax  int
52         heldopenLock sync.Mutex
53 }
54
55 type openFileEnt struct {
56         sync.RWMutex
57         f   *os.File
58         err error // if err is non-nil, f should not be used.
59 }
60
61 const (
62         cacheFileSuffix  = ".keepcacheblock"
63         tmpFileSuffix    = ".tmp"
64         tidyHoldDuration = 10 * time.Second
65 )
66
67 func (cache *DiskCache) cacheFile(locator string) string {
68         hash := locator
69         if i := strings.Index(hash, "+"); i > 0 {
70                 hash = hash[:i]
71         }
72         return filepath.Join(cache.Dir, hash[:3], hash+cacheFileSuffix)
73 }
74
75 // Open a cache file, creating the parent dir if necessary.
76 func (cache *DiskCache) openFile(name string, flags int) (*os.File, error) {
77         f, err := os.OpenFile(name, flags, 0600)
78         if os.IsNotExist(err) {
79                 // Create the parent dir and try again. (We could have
80                 // checked/created the parent dir before, but that
81                 // would be less efficient in the much more common
82                 // situation where it already exists.)
83                 parent, _ := filepath.Split(name)
84                 os.Mkdir(parent, 0700)
85                 f, err = os.OpenFile(name, flags, 0600)
86         }
87         return f, err
88 }
89
90 // Rename a file, creating the new path's parent dir if necessary.
91 func (cache *DiskCache) rename(old, new string) error {
92         if nil == os.Rename(old, new) {
93                 return nil
94         }
95         parent, _ := filepath.Split(new)
96         os.Mkdir(parent, 0700)
97         return os.Rename(old, new)
98 }
99
100 func (cache *DiskCache) debugf(format string, args ...interface{}) {
101         logger := cache.Logger
102         if logger == nil {
103                 return
104         }
105         logger.Debugf(format, args...)
106 }
107
108 // BlockWrite writes through to the wrapped KeepGateway, and (if
109 // possible) retains a copy of the written block in the cache.
110 func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
111         cache.gotidy()
112         unique := fmt.Sprintf("%x.%p%s", os.Getpid(), &opts, tmpFileSuffix)
113         tmpfilename := filepath.Join(cache.Dir, "tmp", unique)
114         tmpfile, err := cache.openFile(tmpfilename, os.O_CREATE|os.O_EXCL|os.O_RDWR)
115         if err != nil {
116                 cache.debugf("BlockWrite: open(%s) failed: %s", tmpfilename, err)
117                 return cache.KeepGateway.BlockWrite(ctx, opts)
118         }
119
120         ctx, cancel := context.WithCancel(ctx)
121         defer cancel()
122         copyerr := make(chan error, 1)
123
124         // Start a goroutine to copy the caller's source data to
125         // tmpfile, a hash checker, and (via pipe) the wrapped
126         // KeepGateway.
127         pipereader, pipewriter := io.Pipe()
128         defer pipereader.Close()
129         go func() {
130                 defer tmpfile.Close()
131                 defer os.Remove(tmpfilename)
132                 defer pipewriter.Close()
133
134                 // Copy from opts.Data or opts.Reader, depending on
135                 // which was provided.
136                 var src io.Reader
137                 if opts.Data != nil {
138                         src = bytes.NewReader(opts.Data)
139                 } else {
140                         src = opts.Reader
141                 }
142
143                 hashcheck := md5.New()
144                 n, err := io.Copy(io.MultiWriter(tmpfile, pipewriter, hashcheck), src)
145                 if err != nil {
146                         copyerr <- err
147                         cancel()
148                         return
149                 } else if opts.DataSize > 0 && opts.DataSize != int(n) {
150                         copyerr <- fmt.Errorf("block size %d did not match provided size %d", n, opts.DataSize)
151                         cancel()
152                         return
153                 }
154                 err = tmpfile.Close()
155                 if err != nil {
156                         // Don't rename tmpfile into place, but allow
157                         // the BlockWrite call to succeed if nothing
158                         // else goes wrong.
159                         return
160                 }
161                 hash := fmt.Sprintf("%x", hashcheck.Sum(nil))
162                 if opts.Hash != "" && opts.Hash != hash {
163                         // Even if the wrapped KeepGateway doesn't
164                         // notice a problem, this should count as an
165                         // error.
166                         copyerr <- fmt.Errorf("block hash %s did not match provided hash %s", hash, opts.Hash)
167                         cancel()
168                         return
169                 }
170                 cachefilename := cache.cacheFile(hash)
171                 err = cache.rename(tmpfilename, cachefilename)
172                 if err != nil {
173                         cache.debugf("BlockWrite: rename(%s, %s) failed: %s", tmpfilename, cachefilename, err)
174                 }
175         }()
176
177         // Write through to the wrapped KeepGateway from the pipe,
178         // instead of the original reader.
179         newopts := opts
180         if newopts.DataSize == 0 {
181                 newopts.DataSize = len(newopts.Data)
182         }
183         newopts.Reader = pipereader
184         newopts.Data = nil
185
186         resp, err := cache.KeepGateway.BlockWrite(ctx, newopts)
187         if len(copyerr) > 0 {
188                 // If the copy-to-pipe goroutine failed, that error
189                 // will be more helpful than the resulting "context
190                 // canceled" or "read [from pipereader] failed" error
191                 // seen by the wrapped KeepGateway.
192                 //
193                 // If the wrapped KeepGateway encounters an error
194                 // before all the data is copied into the pipe, it
195                 // stops reading from the pipe, which causes the
196                 // io.Copy() in the goroutine to block until our
197                 // deferred pipereader.Close() call runs. In that case
198                 // len(copyerr)==0 here, so the wrapped KeepGateway
199                 // error is the one we return to our caller.
200                 err = <-copyerr
201         }
202         return resp, err
203 }
204
205 // ReadAt reads the entire block from the wrapped KeepGateway into the
206 // cache if needed, and copies the requested portion into the provided
207 // slice.
208 func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) {
209         cache.gotidy()
210         cachefilename := cache.cacheFile(locator)
211         if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil {
212                 return n, err
213         }
214         f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
215         if err != nil {
216                 return 0, fmt.Errorf("ReadAt: open(%s) failed: %s", cachefilename, err)
217         }
218         defer f.Close()
219
220         err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
221         if err != nil {
222                 return 0, fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
223         }
224
225         size, err := f.Seek(0, io.SeekEnd)
226         if err != nil {
227                 return 0, fmt.Errorf("seek(%s, seek_end) failed: %w", cachefilename, err)
228         }
229         if size < int64(len(dst)+offset) {
230                 // The cache file seems to be truncated or empty
231                 // (possibly because we just created it). Wait for an
232                 // exclusive lock, then check again (in case someone
233                 // else is doing the same thing) before trying to
234                 // retrieve the entire block.
235                 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
236                 if err != nil {
237                         return 0, fmt.Errorf("flock(%s, lock_ex) failed: %w", cachefilename, err)
238                 }
239         }
240         size, err = f.Seek(0, io.SeekEnd)
241         if err != nil {
242                 return 0, fmt.Errorf("seek(%s, seek_end) failed: %w", cachefilename, err)
243         }
244         if size < int64(len(dst)+offset) {
245                 // The cache file is truncated or empty, and we own it
246                 // now. Fill it.
247                 _, err = f.Seek(0, io.SeekStart)
248                 if err != nil {
249                         return 0, fmt.Errorf("seek(%s, seek_start) failed: %w", cachefilename, err)
250                 }
251                 n, err := cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{Locator: locator, WriteTo: f})
252                 if err != nil {
253                         return 0, err
254                 }
255                 f.Truncate(int64(n))
256         }
257         return f.ReadAt(dst, int64(offset))
258 }
259
260 var quickReadAtLostRace = errors.New("quickReadAt: lost race")
261
262 // Remove the cache entry for the indicated cachefilename if it
263 // matches expect (quickReadAt() usage), or if expect is nil (tidy()
264 // usage).
265 //
266 // If expect is non-nil, close expect's filehandle.
267 //
268 // If expect is nil and a different cache entry is deleted, close its
269 // filehandle.
270 func (cache *DiskCache) deleteHeldopen(cachefilename string, expect *openFileEnt) {
271         needclose := expect
272
273         cache.heldopenLock.Lock()
274         found := cache.heldopen[cachefilename]
275         if found != nil && (expect == nil || expect == found) {
276                 delete(cache.heldopen, cachefilename)
277                 needclose = found
278         }
279         cache.heldopenLock.Unlock()
280
281         if needclose != nil {
282                 needclose.Lock()
283                 defer needclose.Unlock()
284                 if needclose.f != nil {
285                         needclose.f.Close()
286                         needclose.f = nil
287                 }
288         }
289 }
290
291 // quickReadAt attempts to use a cached-filehandle approach to read
292 // from the indicated file. The expectation is that the caller
293 // (ReadAt) will try a more robust approach when this fails, so
294 // quickReadAt doesn't try especially hard to ensure success in
295 // races. In particular, when there are concurrent calls, and one
296 // fails, that can cause others to fail too.
297 func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int) (int, error) {
298         isnew := false
299         cache.heldopenLock.Lock()
300         if cache.heldopenMax == 0 {
301                 // Choose a reasonable limit on open cache files based
302                 // on RLIMIT_NOFILE. Note Go automatically raises
303                 // softlimit to hardlimit, so it's typically 1048576,
304                 // not 1024.
305                 lim := syscall.Rlimit{}
306                 err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &lim)
307                 if err != nil {
308                         cache.heldopenMax = 256
309                 } else if lim.Cur > 40000 {
310                         cache.heldopenMax = 10000
311                 } else {
312                         cache.heldopenMax = int(lim.Cur / 4)
313                 }
314         }
315         heldopen := cache.heldopen[cachefilename]
316         if heldopen == nil {
317                 isnew = true
318                 heldopen = &openFileEnt{}
319                 if cache.heldopen == nil {
320                         cache.heldopen = make(map[string]*openFileEnt, cache.heldopenMax)
321                 } else if len(cache.heldopen) > cache.heldopenMax {
322                         // Rather than go to the trouble of tracking
323                         // last access time, just close all files, and
324                         // open again as needed. Even in the worst
325                         // pathological case, this causes one extra
326                         // open+close per read, which is not
327                         // especially bad (see benchmarks).
328                         go func(m map[string]*openFileEnt) {
329                                 for _, heldopen := range m {
330                                         heldopen.Lock()
331                                         defer heldopen.Unlock()
332                                         if heldopen.f != nil {
333                                                 heldopen.f.Close()
334                                                 heldopen.f = nil
335                                         }
336                                 }
337                         }(cache.heldopen)
338                         cache.heldopen = nil
339                 }
340                 cache.heldopen[cachefilename] = heldopen
341                 heldopen.Lock()
342         }
343         cache.heldopenLock.Unlock()
344
345         if isnew {
346                 // Open and flock the file, then call wg.Done() to
347                 // unblock any other goroutines that are waiting in
348                 // the !isnew case above.
349                 f, err := os.Open(cachefilename)
350                 if err == nil {
351                         err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
352                         if err == nil {
353                                 heldopen.f = f
354                         } else {
355                                 f.Close()
356                         }
357                 }
358                 if err != nil {
359                         heldopen.err = err
360                         go cache.deleteHeldopen(cachefilename, heldopen)
361                 }
362                 heldopen.Unlock()
363         }
364         // Acquire read lock to ensure (1) initialization is complete,
365         // if it's done by a different goroutine, and (2) any "delete
366         // old/unused entries" waits for our read to finish before
367         // closing the file.
368         heldopen.RLock()
369         defer heldopen.RUnlock()
370         if heldopen.err != nil {
371                 // Other goroutine encountered an error during setup
372                 return 0, heldopen.err
373         } else if heldopen.f == nil {
374                 // Other goroutine closed the file before we got RLock
375                 return 0, quickReadAtLostRace
376         }
377         n, err := heldopen.f.ReadAt(dst, int64(offset))
378         if err != nil {
379                 // wait for any concurrent users to finish, then
380                 // delete this cache entry in case reopening the
381                 // backing file helps.
382                 go cache.deleteHeldopen(cachefilename, heldopen)
383         }
384         return n, err
385 }
386
387 // BlockRead reads the entire block from the wrapped KeepGateway into
388 // the cache if needed, and writes it to the provided writer.
389 func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
390         cache.gotidy()
391         cachefilename := cache.cacheFile(opts.Locator)
392         f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
393         if err != nil {
394                 cache.debugf("BlockRead: open(%s) failed: %s", cachefilename, err)
395                 return cache.KeepGateway.BlockRead(ctx, opts)
396         }
397         defer f.Close()
398
399         i := strings.Index(opts.Locator, "+")
400         if i < 0 || i >= len(opts.Locator) {
401                 return 0, errors.New("invalid block locator: no size hint")
402         }
403         sizestr := opts.Locator[i+1:]
404         i = strings.Index(sizestr, "+")
405         if i > 0 {
406                 sizestr = sizestr[:i]
407         }
408         blocksize, err := strconv.ParseInt(sizestr, 10, 32)
409         if err != nil || blocksize < 0 {
410                 return 0, errors.New("invalid block locator: invalid size hint")
411         }
412
413         err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
414         if err != nil {
415                 return 0, err
416         }
417         filesize, err := f.Seek(0, io.SeekEnd)
418         if err != nil {
419                 return 0, err
420         }
421         _, err = f.Seek(0, io.SeekStart)
422         if err != nil {
423                 return 0, err
424         }
425         if filesize == blocksize {
426                 n, err := io.Copy(opts.WriteTo, f)
427                 return int(n), err
428         }
429         err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
430         if err != nil {
431                 return 0, err
432         }
433         opts.WriteTo = io.MultiWriter(f, opts.WriteTo)
434         n, err := cache.KeepGateway.BlockRead(ctx, opts)
435         if err != nil {
436                 return int(n), err
437         }
438         f.Truncate(int64(n))
439         return n, nil
440 }
441
442 // Start a tidy() goroutine, unless one is already running / recently
443 // finished.
444 func (cache *DiskCache) gotidy() {
445         // Return quickly if another tidy goroutine is running in this process.
446         n := atomic.AddInt32(&cache.tidying, 1)
447         if n != 1 || time.Now().Before(cache.tidyHoldUntil) {
448                 atomic.AddInt32(&cache.tidying, -1)
449                 return
450         }
451         go func() {
452                 cache.tidy()
453                 cache.tidyHoldUntil = time.Now().Add(tidyHoldDuration)
454                 atomic.AddInt32(&cache.tidying, -1)
455         }()
456 }
457
458 // Delete cache files as needed to control disk usage.
459 func (cache *DiskCache) tidy() {
460         maxsize := cache.MaxSize
461         if maxsize < 1 {
462                 if maxsize = atomic.LoadInt64(&cache.defaultMaxSize); maxsize == 0 {
463                         var stat unix.Statfs_t
464                         if nil == unix.Statfs(cache.Dir, &stat) {
465                                 maxsize = int64(stat.Bavail) * stat.Bsize / 10
466                         }
467                         atomic.StoreInt64(&cache.defaultMaxSize, maxsize)
468                 }
469         }
470
471         // Bail if a tidy goroutine is running in a different process.
472         lockfile, err := cache.openFile(filepath.Join(cache.Dir, "tmp", "tidy.lock"), os.O_CREATE|os.O_WRONLY)
473         if err != nil {
474                 return
475         }
476         defer lockfile.Close()
477         err = syscall.Flock(int(lockfile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
478         if err != nil {
479                 return
480         }
481
482         type entT struct {
483                 path  string
484                 atime time.Time
485                 size  int64
486         }
487         var ents []entT
488         var totalsize int64
489         filepath.Walk(cache.Dir, func(path string, info fs.FileInfo, err error) error {
490                 if err != nil {
491                         cache.debugf("tidy: skipping dir %s: %s", path, err)
492                         return nil
493                 }
494                 if info.IsDir() {
495                         return nil
496                 }
497                 if !strings.HasSuffix(path, cacheFileSuffix) && !strings.HasSuffix(path, tmpFileSuffix) {
498                         return nil
499                 }
500                 var atime time.Time
501                 if stat, ok := info.Sys().(*syscall.Stat_t); ok {
502                         // Access time is available (hopefully the
503                         // filesystem is not mounted with noatime)
504                         atime = time.Unix(stat.Atim.Sec, stat.Atim.Nsec)
505                 } else {
506                         // If access time isn't available we fall back
507                         // to sorting by modification time.
508                         atime = info.ModTime()
509                 }
510                 ents = append(ents, entT{path, atime, info.Size()})
511                 totalsize += info.Size()
512                 return nil
513         })
514         if cache.Logger != nil {
515                 cache.Logger.WithFields(logrus.Fields{
516                         "totalsize": totalsize,
517                         "maxsize":   maxsize,
518                 }).Debugf("DiskCache: checked current cache usage")
519         }
520
521         // If MaxSize wasn't specified and we failed to come up with a
522         // defaultSize above, use the larger of {current cache size, 1
523         // GiB} as the defaultSize for subsequent tidy() operations.
524         if maxsize == 0 {
525                 if totalsize < 1<<30 {
526                         atomic.StoreInt64(&cache.defaultMaxSize, 1<<30)
527                 } else {
528                         atomic.StoreInt64(&cache.defaultMaxSize, totalsize)
529                 }
530                 cache.debugf("found initial size %d, setting defaultMaxSize %d", totalsize, cache.defaultMaxSize)
531                 return
532         }
533
534         if totalsize <= maxsize {
535                 return
536         }
537
538         // Delete oldest entries until totalsize < maxsize.
539         sort.Slice(ents, func(i, j int) bool {
540                 return ents[i].atime.Before(ents[j].atime)
541         })
542         deleted := 0
543         for _, ent := range ents {
544                 os.Remove(ent.path)
545                 go cache.deleteHeldopen(ent.path, nil)
546                 deleted++
547                 totalsize -= ent.size
548                 if totalsize <= maxsize {
549                         break
550                 }
551         }
552
553         if cache.Logger != nil {
554                 cache.Logger.WithFields(logrus.Fields{
555                         "deleted":   deleted,
556                         "totalsize": totalsize,
557                 }).Debugf("DiskCache: remaining cache usage after deleting")
558         }
559 }