Merge branch '11470-update-task-fields'
[arvados.git] / sdk / go / keepclient / collectionreader.go
1 package keepclient
2
3 import (
4         "errors"
5         "fmt"
6         "io"
7         "os"
8
9         "git.curoverse.com/arvados.git/sdk/go/manifest"
10 )
11
12 // A Reader implements, io.Reader, io.Seeker, and io.Closer, and has a
13 // Len() method that returns the total number of bytes available to
14 // read.
15 type Reader interface {
16         io.Reader
17         io.Seeker
18         io.Closer
19         Len() uint64
20 }
21
22 const (
23         // After reading a data block from Keep, cfReader slices it up
24         // and sends the slices to a buffered channel to be consumed
25         // by the caller via Read().
26         //
27         // dataSliceSize is the maximum size of the slices, and
28         // therefore the maximum number of bytes that will be returned
29         // by a single call to Read().
30         dataSliceSize = 1 << 20
31 )
32
33 // ErrNoManifest indicates the given collection has no manifest
34 // information (e.g., manifest_text was excluded by a "select"
35 // parameter when retrieving the collection record).
36 var ErrNoManifest = errors.New("Collection has no manifest")
37
38 // CollectionFileReader returns a Reader that reads content from a single file
39 // in the collection. The filename must be relative to the root of the
40 // collection.  A leading prefix of "/" or "./" in the filename is ignored.
41 func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (Reader, error) {
42         mText, ok := collection["manifest_text"].(string)
43         if !ok {
44                 return nil, ErrNoManifest
45         }
46         m := manifest.Manifest{Text: mText}
47         return kc.ManifestFileReader(m, filename)
48 }
49
50 func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (Reader, error) {
51         f := &file{
52                 kc: kc,
53         }
54         err := f.load(m, filename)
55         if err != nil {
56                 return nil, err
57         }
58         return f, nil
59 }
60
61 type file struct {
62         kc       *KeepClient
63         segments []*manifest.FileSegment
64         size     int64 // total file size
65         offset   int64 // current read offset
66
67         // current/latest segment accessed -- might or might not match pos
68         seg           *manifest.FileSegment
69         segStart      int64 // position of segment relative to file
70         segData       []byte
71         segNext       []*manifest.FileSegment
72         readaheadDone bool
73 }
74
75 // Close implements io.Closer.
76 func (f *file) Close() error {
77         f.kc = nil
78         f.segments = nil
79         f.segData = nil
80         return nil
81 }
82
83 // Read implements io.Reader.
84 func (f *file) Read(buf []byte) (int, error) {
85         if f.seg == nil || f.offset < f.segStart || f.offset >= f.segStart+int64(f.seg.Len) {
86                 // f.seg does not cover the current read offset
87                 // (f.pos).  Iterate over f.segments to find the one
88                 // that does.
89                 f.seg = nil
90                 f.segStart = 0
91                 f.segData = nil
92                 f.segNext = f.segments
93                 for len(f.segNext) > 0 {
94                         seg := f.segNext[0]
95                         f.segNext = f.segNext[1:]
96                         segEnd := f.segStart + int64(seg.Len)
97                         if segEnd > f.offset {
98                                 f.seg = seg
99                                 break
100                         }
101                         f.segStart = segEnd
102                 }
103                 f.readaheadDone = false
104         }
105         if f.seg == nil {
106                 return 0, io.EOF
107         }
108         if f.segData == nil {
109                 data, err := f.kc.cache().Get(f.kc, f.seg.Locator)
110                 if err != nil {
111                         return 0, err
112                 }
113                 if len(data) < f.seg.Offset+f.seg.Len {
114                         return 0, fmt.Errorf("invalid segment (offset %d len %d) in %d-byte block %s", f.seg.Offset, f.seg.Len, len(data), f.seg.Locator)
115                 }
116                 f.segData = data[f.seg.Offset : f.seg.Offset+f.seg.Len]
117         }
118         // dataOff and dataLen denote a portion of f.segData
119         // corresponding to a portion of the file at f.offset.
120         dataOff := int(f.offset - f.segStart)
121         dataLen := f.seg.Len - dataOff
122
123         if !f.readaheadDone && len(f.segNext) > 0 && f.offset >= 1048576 && dataOff+dataLen > len(f.segData)/16 {
124                 // If we have already read more than just the first
125                 // few bytes of this file, and we have already
126                 // consumed a noticeable portion of this segment, and
127                 // there's more data for this file in the next segment
128                 // ... then there's a good chance we are going to need
129                 // the data for that next segment soon. Start getting
130                 // it into the cache now.
131                 go f.kc.cache().Get(f.kc, f.segNext[0].Locator)
132                 f.readaheadDone = true
133         }
134
135         n := len(buf)
136         if n > dataLen {
137                 n = dataLen
138         }
139         copy(buf[:n], f.segData[dataOff:dataOff+n])
140         f.offset += int64(n)
141         return n, nil
142 }
143
144 // Seek implements io.Seeker.
145 func (f *file) Seek(offset int64, whence int) (int64, error) {
146         var want int64
147         switch whence {
148         case io.SeekStart:
149                 want = offset
150         case io.SeekCurrent:
151                 want = f.offset + offset
152         case io.SeekEnd:
153                 want = f.size + offset
154         default:
155                 return f.offset, fmt.Errorf("invalid whence %d", whence)
156         }
157         if want < 0 {
158                 return f.offset, fmt.Errorf("attempted seek to %d", want)
159         }
160         if want > f.size {
161                 want = f.size
162         }
163         f.offset = want
164         return f.offset, nil
165 }
166
167 // Len returns the file size in bytes.
168 func (f *file) Len() uint64 {
169         return uint64(f.size)
170 }
171
172 func (f *file) load(m manifest.Manifest, path string) error {
173         f.segments = nil
174         f.size = 0
175         for seg := range m.FileSegmentIterByName(path) {
176                 f.segments = append(f.segments, seg)
177                 f.size += int64(seg.Len)
178         }
179         if f.segments == nil {
180                 return os.ErrNotExist
181         }
182         return nil
183 }