11308: Merge branch 'master' into 11308-python3
[arvados.git] / services / crunch-run / upload.go
1 package main
2
3 // Originally based on sdk/go/crunchrunner/upload.go
4 //
5 // Unlike the original, which iterates over a directory tree and uploads each
6 // file sequentially, this version supports opening and writing multiple files
7 // in a collection simultaneously.
8 //
9 // Eventually this should move into the Arvados Go SDK for a more comprehensive
10 // implementation of Collections.
11
12 import (
13         "bytes"
14         "crypto/md5"
15         "errors"
16         "fmt"
17         "git.curoverse.com/arvados.git/sdk/go/keepclient"
18         "git.curoverse.com/arvados.git/sdk/go/manifest"
19         "io"
20         "log"
21         "os"
22         "path/filepath"
23         "strings"
24         "sync"
25 )
26
27 // Block is a data block in a manifest stream
28 type Block struct {
29         data   []byte
30         offset int64
31 }
32
33 // CollectionFileWriter is a Writer that permits writing to a file in a Keep Collection.
34 type CollectionFileWriter struct {
35         IKeepClient
36         *manifest.ManifestStream
37         offset uint64
38         length uint64
39         *Block
40         uploader chan *Block
41         finish   chan []error
42         fn       string
43 }
44
45 // Write to a file in a keep collection
46 func (m *CollectionFileWriter) Write(p []byte) (int, error) {
47         n, err := m.ReadFrom(bytes.NewReader(p))
48         return int(n), err
49 }
50
51 // ReadFrom a Reader and write to the Keep collection file.
52 func (m *CollectionFileWriter) ReadFrom(r io.Reader) (n int64, err error) {
53         var total int64
54         var count int
55
56         for err == nil {
57                 if m.Block == nil {
58                         m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
59                 }
60                 count, err = r.Read(m.Block.data[m.Block.offset:])
61                 total += int64(count)
62                 m.Block.offset += int64(count)
63                 if m.Block.offset == keepclient.BLOCKSIZE {
64                         m.uploader <- m.Block
65                         m.Block = nil
66                 }
67         }
68
69         m.length += uint64(total)
70
71         if err == io.EOF {
72                 return total, nil
73         }
74         return total, err
75 }
76
77 // Close stops writing a file and adds it to the parent manifest.
78 func (m *CollectionFileWriter) Close() error {
79         m.ManifestStream.FileStreamSegments = append(m.ManifestStream.FileStreamSegments,
80                 manifest.FileStreamSegment{m.offset, m.length, m.fn})
81         return nil
82 }
83
84 func (m *CollectionFileWriter) NewFile(fn string) {
85         m.offset += m.length
86         m.length = 0
87         m.fn = fn
88 }
89
90 func (m *CollectionFileWriter) goUpload(workers chan struct{}) {
91         var mtx sync.Mutex
92         var wg sync.WaitGroup
93
94         var errors []error
95         uploader := m.uploader
96         finish := m.finish
97         for block := range uploader {
98                 mtx.Lock()
99                 m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, "")
100                 blockIndex := len(m.ManifestStream.Blocks) - 1
101                 mtx.Unlock()
102
103                 workers <- struct{}{} // wait for an available worker slot
104                 wg.Add(1)
105
106                 go func(block *Block, blockIndex int) {
107                         hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
108                         signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset])
109                         <-workers
110
111                         mtx.Lock()
112                         if err != nil {
113                                 errors = append(errors, err)
114                         } else {
115                                 m.ManifestStream.Blocks[blockIndex] = signedHash
116                         }
117                         mtx.Unlock()
118
119                         wg.Done()
120                 }(block, blockIndex)
121         }
122         wg.Wait()
123
124         finish <- errors
125 }
126
127 // CollectionWriter implements creating new Keep collections by opening files
128 // and writing to them.
129 type CollectionWriter struct {
130         MaxWriters int
131         IKeepClient
132         Streams []*CollectionFileWriter
133         workers chan struct{}
134         mtx     sync.Mutex
135 }
136
137 // Open a new file for writing in the Keep collection.
138 func (m *CollectionWriter) Open(path string) io.WriteCloser {
139         var dir string
140         var fn string
141
142         i := strings.Index(path, "/")
143         if i > -1 {
144                 dir = "./" + path[0:i]
145                 fn = path[i+1:]
146         } else {
147                 dir = "."
148                 fn = path
149         }
150
151         fw := &CollectionFileWriter{
152                 m.IKeepClient,
153                 &manifest.ManifestStream{StreamName: dir},
154                 0,
155                 0,
156                 nil,
157                 make(chan *Block),
158                 make(chan []error),
159                 fn}
160
161         m.mtx.Lock()
162         defer m.mtx.Unlock()
163         if m.workers == nil {
164                 if m.MaxWriters < 1 {
165                         m.MaxWriters = 2
166                 }
167                 m.workers = make(chan struct{}, m.MaxWriters)
168         }
169
170         go fw.goUpload(m.workers)
171
172         m.Streams = append(m.Streams, fw)
173
174         return fw
175 }
176
177 // Finish writing the collection, wait for all blocks to complete uploading.
178 func (m *CollectionWriter) Finish() error {
179         var errstring string
180         m.mtx.Lock()
181         defer m.mtx.Unlock()
182
183         for _, stream := range m.Streams {
184                 if stream.uploader == nil {
185                         continue
186                 }
187                 if stream.Block != nil {
188                         stream.uploader <- stream.Block
189                 }
190                 close(stream.uploader)
191                 stream.uploader = nil
192
193                 errors := <-stream.finish
194                 close(stream.finish)
195                 stream.finish = nil
196
197                 for _, r := range errors {
198                         errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
199                 }
200         }
201         if errstring != "" {
202                 return errors.New(errstring)
203         }
204         return nil
205 }
206
207 // ManifestText returns the manifest text of the collection.  Calls Finish()
208 // first to ensure that all blocks are written and that signed locators and
209 // available.
210 func (m *CollectionWriter) ManifestText() (mt string, err error) {
211         err = m.Finish()
212         if err != nil {
213                 return "", err
214         }
215
216         var buf bytes.Buffer
217
218         m.mtx.Lock()
219         defer m.mtx.Unlock()
220         for _, v := range m.Streams {
221                 if len(v.FileStreamSegments) == 0 {
222                         continue
223                 }
224                 k := v.StreamName
225                 if k == "." {
226                         buf.WriteString(".")
227                 } else {
228                         k = strings.Replace(k, " ", "\\040", -1)
229                         k = strings.Replace(k, "\n", "", -1)
230                         buf.WriteString("./" + k)
231                 }
232                 if len(v.Blocks) > 0 {
233                         for _, b := range v.Blocks {
234                                 buf.WriteString(" ")
235                                 buf.WriteString(b)
236                         }
237                 } else {
238                         buf.WriteString(" d41d8cd98f00b204e9800998ecf8427e+0")
239                 }
240                 for _, f := range v.FileStreamSegments {
241                         buf.WriteString(" ")
242                         name := strings.Replace(f.Name, " ", "\\040", -1)
243                         name = strings.Replace(name, "\n", "", -1)
244                         buf.WriteString(fmt.Sprintf("%v:%v:%v", f.SegPos, f.SegLen, name))
245                 }
246                 buf.WriteString("\n")
247         }
248         return buf.String(), nil
249 }
250
251 type WalkUpload struct {
252         MaxWriters  int
253         kc          IKeepClient
254         stripPrefix string
255         streamMap   map[string]*CollectionFileWriter
256         status      *log.Logger
257         workers     chan struct{}
258         mtx         sync.Mutex
259 }
260
261 // WalkFunc walks a directory tree, uploads each file found and adds it to the
262 // CollectionWriter.
263 func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
264
265         if info.IsDir() {
266                 return nil
267         }
268
269         var dir string
270         if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
271                 dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
272         }
273         if dir == "" {
274                 dir = "."
275         }
276
277         fn := path[(len(path) - len(info.Name())):]
278
279         if m.streamMap[dir] == nil {
280                 m.streamMap[dir] = &CollectionFileWriter{
281                         m.kc,
282                         &manifest.ManifestStream{StreamName: dir},
283                         0,
284                         0,
285                         nil,
286                         make(chan *Block),
287                         make(chan []error),
288                         ""}
289
290                 m.mtx.Lock()
291                 if m.workers == nil {
292                         if m.MaxWriters < 1 {
293                                 m.MaxWriters = 2
294                         }
295                         m.workers = make(chan struct{}, m.MaxWriters)
296                 }
297                 m.mtx.Unlock()
298
299                 go m.streamMap[dir].goUpload(m.workers)
300         }
301
302         fileWriter := m.streamMap[dir]
303
304         // Reset the CollectionFileWriter for a new file
305         fileWriter.NewFile(fn)
306
307         file, err := os.Open(path)
308         if err != nil {
309                 return err
310         }
311         defer file.Close()
312
313         m.status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
314
315         _, err = io.Copy(fileWriter, file)
316         if err != nil {
317                 return err
318         }
319
320         // Commits the current file.  Legal to call this repeatedly.
321         fileWriter.Close()
322
323         return nil
324 }
325
326 func (cw *CollectionWriter) WriteTree(root string, status *log.Logger) (manifest string, err error) {
327         streamMap := make(map[string]*CollectionFileWriter)
328         wu := &WalkUpload{0, cw.IKeepClient, root, streamMap, status, nil, sync.Mutex{}}
329         err = filepath.Walk(root, wu.WalkFunc)
330
331         if err != nil {
332                 return "", err
333         }
334
335         cw.mtx.Lock()
336         for _, st := range streamMap {
337                 cw.Streams = append(cw.Streams, st)
338         }
339         cw.mtx.Unlock()
340
341         return cw.ManifestText()
342 }