Merge branch '12461-cache-race'
[arvados.git] / sdk / go / keepclient / collectionreader.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package keepclient
6
7 import (
8         "errors"
9         "fmt"
10         "io"
11         "os"
12
13         "git.curoverse.com/arvados.git/sdk/go/arvados"
14         "git.curoverse.com/arvados.git/sdk/go/manifest"
15 )
16
17 const (
18         // After reading a data block from Keep, cfReader slices it up
19         // and sends the slices to a buffered channel to be consumed
20         // by the caller via Read().
21         //
22         // dataSliceSize is the maximum size of the slices, and
23         // therefore the maximum number of bytes that will be returned
24         // by a single call to Read().
25         dataSliceSize = 1 << 20
26 )
27
28 // ErrNoManifest indicates the given collection has no manifest
29 // information (e.g., manifest_text was excluded by a "select"
30 // parameter when retrieving the collection record).
31 var ErrNoManifest = errors.New("Collection has no manifest")
32
33 // CollectionFileReader returns a Reader that reads content from a single file
34 // in the collection. The filename must be relative to the root of the
35 // collection.  A leading prefix of "/" or "./" in the filename is ignored.
36 func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (arvados.File, error) {
37         mText, ok := collection["manifest_text"].(string)
38         if !ok {
39                 return nil, ErrNoManifest
40         }
41         m := manifest.Manifest{Text: mText}
42         return kc.ManifestFileReader(m, filename)
43 }
44
45 func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) {
46         f := &file{
47                 kc: kc,
48         }
49         err := f.load(m, filename)
50         if err != nil {
51                 return nil, err
52         }
53         return f, nil
54 }
55
56 type file struct {
57         kc       *KeepClient
58         segments []*manifest.FileSegment
59         size     int64 // total file size
60         offset   int64 // current read offset
61
62         // current/latest segment accessed -- might or might not match pos
63         seg           *manifest.FileSegment
64         segStart      int64 // position of segment relative to file
65         segData       []byte
66         segNext       []*manifest.FileSegment
67         readaheadDone bool
68 }
69
70 // Close implements io.Closer.
71 func (f *file) Close() error {
72         f.kc = nil
73         f.segments = nil
74         f.segData = nil
75         return nil
76 }
77
78 // Read implements io.Reader.
79 func (f *file) Read(buf []byte) (int, error) {
80         if f.seg == nil || f.offset < f.segStart || f.offset >= f.segStart+int64(f.seg.Len) {
81                 // f.seg does not cover the current read offset
82                 // (f.pos).  Iterate over f.segments to find the one
83                 // that does.
84                 f.seg = nil
85                 f.segStart = 0
86                 f.segData = nil
87                 f.segNext = f.segments
88                 for len(f.segNext) > 0 {
89                         seg := f.segNext[0]
90                         f.segNext = f.segNext[1:]
91                         segEnd := f.segStart + int64(seg.Len)
92                         if segEnd > f.offset {
93                                 f.seg = seg
94                                 break
95                         }
96                         f.segStart = segEnd
97                 }
98                 f.readaheadDone = false
99         }
100         if f.seg == nil {
101                 return 0, io.EOF
102         }
103         if f.segData == nil {
104                 data, err := f.kc.cache().Get(f.kc, f.seg.Locator)
105                 if err != nil {
106                         return 0, err
107                 }
108                 if len(data) < f.seg.Offset+f.seg.Len {
109                         return 0, fmt.Errorf("invalid segment (offset %d len %d) in %d-byte block %s", f.seg.Offset, f.seg.Len, len(data), f.seg.Locator)
110                 }
111                 f.segData = data[f.seg.Offset : f.seg.Offset+f.seg.Len]
112         }
113         // dataOff and dataLen denote a portion of f.segData
114         // corresponding to a portion of the file at f.offset.
115         dataOff := int(f.offset - f.segStart)
116         dataLen := f.seg.Len - dataOff
117
118         if !f.readaheadDone && len(f.segNext) > 0 && f.offset >= 1048576 && dataOff+dataLen > len(f.segData)/16 {
119                 // If we have already read more than just the first
120                 // few bytes of this file, and we have already
121                 // consumed a noticeable portion of this segment, and
122                 // there's more data for this file in the next segment
123                 // ... then there's a good chance we are going to need
124                 // the data for that next segment soon. Start getting
125                 // it into the cache now.
126                 go f.kc.cache().Get(f.kc, f.segNext[0].Locator)
127                 f.readaheadDone = true
128         }
129
130         n := len(buf)
131         if n > dataLen {
132                 n = dataLen
133         }
134         copy(buf[:n], f.segData[dataOff:dataOff+n])
135         f.offset += int64(n)
136         return n, nil
137 }
138
139 // Seek implements io.Seeker.
140 func (f *file) Seek(offset int64, whence int) (int64, error) {
141         var want int64
142         switch whence {
143         case io.SeekStart:
144                 want = offset
145         case io.SeekCurrent:
146                 want = f.offset + offset
147         case io.SeekEnd:
148                 want = f.size + offset
149         default:
150                 return f.offset, fmt.Errorf("invalid whence %d", whence)
151         }
152         if want < 0 {
153                 return f.offset, fmt.Errorf("attempted seek to %d", want)
154         }
155         if want > f.size {
156                 want = f.size
157         }
158         f.offset = want
159         return f.offset, nil
160 }
161
162 // Size returns the file size in bytes.
163 func (f *file) Size() int64 {
164         return f.size
165 }
166
167 func (f *file) load(m manifest.Manifest, path string) error {
168         f.segments = nil
169         f.size = 0
170         for seg := range m.FileSegmentIterByName(path) {
171                 f.segments = append(f.segments, seg)
172                 f.size += int64(seg.Len)
173         }
174         if f.segments == nil {
175                 return os.ErrNotExist
176         }
177         return nil
178 }