8015: Fix tests
[arvados.git] / services / crunch-run / upload.go
1 package main
2
3 // Originally based on sdk/go/crunchrunner/upload.go
4 //
5 // Unlike the original, which iterates over a directory tree and uploads each
6 // file sequentially, this version supports opening and writing multiple files
7 // in a collection simultaneously.
8 //
9 // Eventually this should move into the Arvados Go SDK for a more comprehensive
10 // implementation of Collections.
11
12 import (
13         "bytes"
14         "crypto/md5"
15         "errors"
16         "fmt"
17         "git.curoverse.com/arvados.git/sdk/go/keepclient"
18         "git.curoverse.com/arvados.git/sdk/go/manifest"
19         "io"
20         "log"
21         "os"
22         "path/filepath"
23         "strings"
24         "sync"
25 )
26
27 // Block is a data block in a manifest stream
28 type Block struct {
29         data   []byte
30         offset int64
31 }
32
33 // CollectionFileWriter is a Writer that permits writing to a file in a Keep Collection.
34 type CollectionFileWriter struct {
35         IKeepClient
36         *manifest.ManifestStream
37         offset uint64
38         length uint64
39         *Block
40         uploader chan *Block
41         finish   chan []error
42         fn       string
43 }
44
45 // Write to a file in a keep collection
46 func (m *CollectionFileWriter) Write(p []byte) (int, error) {
47         n, err := m.ReadFrom(bytes.NewReader(p))
48         return int(n), err
49 }
50
51 // ReadFrom a Reader and write to the Keep collection file.
52 func (m *CollectionFileWriter) ReadFrom(r io.Reader) (n int64, err error) {
53         var total int64
54         var count int
55
56         for err == nil {
57                 if m.Block == nil {
58                         m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
59                 }
60                 count, err = r.Read(m.Block.data[m.Block.offset:])
61                 total += int64(count)
62                 m.Block.offset += int64(count)
63                 if m.Block.offset == keepclient.BLOCKSIZE {
64                         m.uploader <- m.Block
65                         m.Block = nil
66                 }
67         }
68
69         m.length += uint64(total)
70
71         if err == io.EOF {
72                 return total, nil
73         }
74         return total, err
75 }
76
77 // Close stops writing a file and adds it to the parent manifest.
78 func (m *CollectionFileWriter) Close() error {
79         m.ManifestStream.FileStreamSegments = append(m.ManifestStream.FileStreamSegments,
80                 manifest.FileStreamSegment{m.offset, m.length, m.fn})
81         return nil
82 }
83
84 func (m *CollectionFileWriter) NewFile(fn string) {
85         m.offset += m.length
86         m.length = 0
87         m.fn = fn
88 }
89
90 func (m *CollectionFileWriter) goUpload() {
91         var errors []error
92         uploader := m.uploader
93         finish := m.finish
94         for block := range uploader {
95                 hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
96                 signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset])
97                 if err != nil {
98                         errors = append(errors, err)
99                 } else {
100                         m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
101                 }
102         }
103         finish <- errors
104 }
105
106 // CollectionWriter makes implements creating new Keep collections by opening files
107 // and writing to them.
108 type CollectionWriter struct {
109         IKeepClient
110         Streams []*CollectionFileWriter
111         mtx     sync.Mutex
112 }
113
114 // Open a new file for writing in the Keep collection.
115 func (m *CollectionWriter) Open(path string) io.WriteCloser {
116         var dir string
117         var fn string
118
119         i := strings.Index(path, "/")
120         if i > -1 {
121                 dir = "./" + path[0:i]
122                 fn = path[i+1:]
123         } else {
124                 dir = "."
125                 fn = path
126         }
127
128         fw := &CollectionFileWriter{
129                 m.IKeepClient,
130                 &manifest.ManifestStream{StreamName: dir},
131                 0,
132                 0,
133                 nil,
134                 make(chan *Block),
135                 make(chan []error),
136                 fn}
137         go fw.goUpload()
138
139         m.mtx.Lock()
140         defer m.mtx.Unlock()
141         m.Streams = append(m.Streams, fw)
142
143         return fw
144 }
145
146 // Finish writing the collection, wait for all blocks to complete uploading.
147 func (m *CollectionWriter) Finish() error {
148         var errstring string
149         m.mtx.Lock()
150         defer m.mtx.Unlock()
151
152         for _, stream := range m.Streams {
153                 if stream.uploader == nil {
154                         continue
155                 }
156                 if stream.Block != nil {
157                         stream.uploader <- stream.Block
158                 }
159                 close(stream.uploader)
160                 stream.uploader = nil
161
162                 errors := <-stream.finish
163                 close(stream.finish)
164                 stream.finish = nil
165
166                 for _, r := range errors {
167                         errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
168                 }
169         }
170         if errstring != "" {
171                 return errors.New(errstring)
172         }
173         return nil
174 }
175
176 // ManifestText returns the manifest text of the collection.  Calls Finish()
177 // first to ensure that all blocks are written and that signed locators and
178 // available.
179 func (m *CollectionWriter) ManifestText() (mt string, err error) {
180         err = m.Finish()
181         if err != nil {
182                 return "", err
183         }
184
185         var buf bytes.Buffer
186
187         m.mtx.Lock()
188         defer m.mtx.Unlock()
189         for _, v := range m.Streams {
190                 k := v.StreamName
191                 if k == "." {
192                         buf.WriteString(".")
193                 } else {
194                         k = strings.Replace(k, " ", "\\040", -1)
195                         k = strings.Replace(k, "\n", "", -1)
196                         buf.WriteString("./" + k)
197                 }
198                 for _, b := range v.Blocks {
199                         buf.WriteString(" ")
200                         buf.WriteString(b)
201                 }
202                 for _, f := range v.FileStreamSegments {
203                         buf.WriteString(" ")
204                         name := strings.Replace(f.Name, " ", "\\040", -1)
205                         name = strings.Replace(name, "\n", "", -1)
206                         buf.WriteString(fmt.Sprintf("%v:%v:%v", f.SegPos, f.SegLen, name))
207                 }
208                 buf.WriteString("\n")
209         }
210         return buf.String(), nil
211 }
212
213 type WalkUpload struct {
214         kc          IKeepClient
215         stripPrefix string
216         streamMap   map[string]*CollectionFileWriter
217         status      *log.Logger
218 }
219
220 // WalkFunc walks a directory tree, uploads each file found and adds it to the
221 // CollectionWriter.
222 func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
223
224         if info.IsDir() {
225                 return nil
226         }
227
228         var dir string
229         if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
230                 dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
231         }
232         if dir == "" {
233                 dir = "."
234         }
235
236         fn := path[(len(path) - len(info.Name())):]
237
238         if m.streamMap[dir] == nil {
239                 m.streamMap[dir] = &CollectionFileWriter{
240                         m.kc,
241                         &manifest.ManifestStream{StreamName: dir},
242                         0,
243                         0,
244                         nil,
245                         make(chan *Block),
246                         make(chan []error),
247                         ""}
248                 go m.streamMap[dir].goUpload()
249         }
250
251         fileWriter := m.streamMap[dir]
252
253         // Reset the CollectionFileWriter for a new file
254         fileWriter.NewFile(fn)
255
256         file, err := os.Open(path)
257         if err != nil {
258                 return err
259         }
260         defer file.Close()
261
262         m.status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
263
264         _, err = io.Copy(fileWriter, file)
265         if err != nil {
266                 return err
267         }
268
269         // Commits the current file.  Legal to call this repeatedly.
270         fileWriter.Close()
271
272         return nil
273 }
274
275 func (cw *CollectionWriter) WriteTree(root string, status *log.Logger) (manifest string, err error) {
276         streamMap := make(map[string]*CollectionFileWriter)
277         wu := &WalkUpload{cw.IKeepClient, root, streamMap, status}
278         err = filepath.Walk(root, wu.WalkFunc)
279
280         if err != nil {
281                 return "", err
282         }
283
284         cw.mtx.Lock()
285         for _, st := range streamMap {
286                 cw.Streams = append(cw.Streams, st)
287         }
288         cw.mtx.Unlock()
289
290         return cw.ManifestText()
291 }