8784: Fix test for latest firefox.
[arvados.git] / sdk / go / keepclient / collectionreader.go
1 package keepclient
2
3 import (
4         "errors"
5         "fmt"
6         "io"
7         "os"
8
9         "git.curoverse.com/arvados.git/sdk/go/arvados"
10         "git.curoverse.com/arvados.git/sdk/go/manifest"
11 )
12
13 const (
14         // After reading a data block from Keep, cfReader slices it up
15         // and sends the slices to a buffered channel to be consumed
16         // by the caller via Read().
17         //
18         // dataSliceSize is the maximum size of the slices, and
19         // therefore the maximum number of bytes that will be returned
20         // by a single call to Read().
21         dataSliceSize = 1 << 20
22 )
23
24 // ErrNoManifest indicates the given collection has no manifest
25 // information (e.g., manifest_text was excluded by a "select"
26 // parameter when retrieving the collection record).
27 var ErrNoManifest = errors.New("Collection has no manifest")
28
29 // CollectionFileReader returns a Reader that reads content from a single file
30 // in the collection. The filename must be relative to the root of the
31 // collection.  A leading prefix of "/" or "./" in the filename is ignored.
32 func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, filename string) (arvados.File, error) {
33         mText, ok := collection["manifest_text"].(string)
34         if !ok {
35                 return nil, ErrNoManifest
36         }
37         m := manifest.Manifest{Text: mText}
38         return kc.ManifestFileReader(m, filename)
39 }
40
41 func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) {
42         f := &file{
43                 kc: kc,
44         }
45         err := f.load(m, filename)
46         if err != nil {
47                 return nil, err
48         }
49         return f, nil
50 }
51
52 type file struct {
53         kc       *KeepClient
54         segments []*manifest.FileSegment
55         size     int64 // total file size
56         offset   int64 // current read offset
57
58         // current/latest segment accessed -- might or might not match pos
59         seg           *manifest.FileSegment
60         segStart      int64 // position of segment relative to file
61         segData       []byte
62         segNext       []*manifest.FileSegment
63         readaheadDone bool
64 }
65
66 // Close implements io.Closer.
67 func (f *file) Close() error {
68         f.kc = nil
69         f.segments = nil
70         f.segData = nil
71         return nil
72 }
73
74 // Read implements io.Reader.
75 func (f *file) Read(buf []byte) (int, error) {
76         if f.seg == nil || f.offset < f.segStart || f.offset >= f.segStart+int64(f.seg.Len) {
77                 // f.seg does not cover the current read offset
78                 // (f.pos).  Iterate over f.segments to find the one
79                 // that does.
80                 f.seg = nil
81                 f.segStart = 0
82                 f.segData = nil
83                 f.segNext = f.segments
84                 for len(f.segNext) > 0 {
85                         seg := f.segNext[0]
86                         f.segNext = f.segNext[1:]
87                         segEnd := f.segStart + int64(seg.Len)
88                         if segEnd > f.offset {
89                                 f.seg = seg
90                                 break
91                         }
92                         f.segStart = segEnd
93                 }
94                 f.readaheadDone = false
95         }
96         if f.seg == nil {
97                 return 0, io.EOF
98         }
99         if f.segData == nil {
100                 data, err := f.kc.cache().Get(f.kc, f.seg.Locator)
101                 if err != nil {
102                         return 0, err
103                 }
104                 if len(data) < f.seg.Offset+f.seg.Len {
105                         return 0, fmt.Errorf("invalid segment (offset %d len %d) in %d-byte block %s", f.seg.Offset, f.seg.Len, len(data), f.seg.Locator)
106                 }
107                 f.segData = data[f.seg.Offset : f.seg.Offset+f.seg.Len]
108         }
109         // dataOff and dataLen denote a portion of f.segData
110         // corresponding to a portion of the file at f.offset.
111         dataOff := int(f.offset - f.segStart)
112         dataLen := f.seg.Len - dataOff
113
114         if !f.readaheadDone && len(f.segNext) > 0 && f.offset >= 1048576 && dataOff+dataLen > len(f.segData)/16 {
115                 // If we have already read more than just the first
116                 // few bytes of this file, and we have already
117                 // consumed a noticeable portion of this segment, and
118                 // there's more data for this file in the next segment
119                 // ... then there's a good chance we are going to need
120                 // the data for that next segment soon. Start getting
121                 // it into the cache now.
122                 go f.kc.cache().Get(f.kc, f.segNext[0].Locator)
123                 f.readaheadDone = true
124         }
125
126         n := len(buf)
127         if n > dataLen {
128                 n = dataLen
129         }
130         copy(buf[:n], f.segData[dataOff:dataOff+n])
131         f.offset += int64(n)
132         return n, nil
133 }
134
135 // Seek implements io.Seeker.
136 func (f *file) Seek(offset int64, whence int) (int64, error) {
137         var want int64
138         switch whence {
139         case io.SeekStart:
140                 want = offset
141         case io.SeekCurrent:
142                 want = f.offset + offset
143         case io.SeekEnd:
144                 want = f.size + offset
145         default:
146                 return f.offset, fmt.Errorf("invalid whence %d", whence)
147         }
148         if want < 0 {
149                 return f.offset, fmt.Errorf("attempted seek to %d", want)
150         }
151         if want > f.size {
152                 want = f.size
153         }
154         f.offset = want
155         return f.offset, nil
156 }
157
158 // Size returns the file size in bytes.
159 func (f *file) Size() int64 {
160         return f.size
161 }
162
163 func (f *file) load(m manifest.Manifest, path string) error {
164         f.segments = nil
165         f.size = 0
166         for seg := range m.FileSegmentIterByName(path) {
167                 f.segments = append(f.segments, seg)
168                 f.size += int64(seg.Len)
169         }
170         if f.segments == nil {
171                 return os.ErrNotExist
172         }
173         return nil
174 }