Merge branch '8784-dir-listings'
[arvados.git] / services / crunch-run / upload.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 // Originally based on sdk/go/crunchrunner/upload.go
8 //
9 // Unlike the original, which iterates over a directory tree and uploads each
10 // file sequentially, this version supports opening and writing multiple files
11 // in a collection simultaneously.
12 //
13 // Eventually this should move into the Arvados Go SDK for a more comprehensive
14 // implementation of Collections.
15
16 import (
17         "bytes"
18         "crypto/md5"
19         "errors"
20         "fmt"
21         "git.curoverse.com/arvados.git/sdk/go/keepclient"
22         "git.curoverse.com/arvados.git/sdk/go/manifest"
23         "io"
24         "log"
25         "os"
26         "path/filepath"
27         "strings"
28         "sync"
29 )
30
31 // Block is a data block in a manifest stream
32 type Block struct {
33         data   []byte
34         offset int64
35 }
36
37 // CollectionFileWriter is a Writer that permits writing to a file in a Keep Collection.
38 type CollectionFileWriter struct {
39         IKeepClient
40         *manifest.ManifestStream
41         offset uint64
42         length uint64
43         *Block
44         uploader chan *Block
45         finish   chan []error
46         fn       string
47 }
48
49 // Write to a file in a keep collection
50 func (m *CollectionFileWriter) Write(p []byte) (int, error) {
51         n, err := m.ReadFrom(bytes.NewReader(p))
52         return int(n), err
53 }
54
55 // ReadFrom a Reader and write to the Keep collection file.
56 func (m *CollectionFileWriter) ReadFrom(r io.Reader) (n int64, err error) {
57         var total int64
58         var count int
59
60         for err == nil {
61                 if m.Block == nil {
62                         m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
63                 }
64                 count, err = r.Read(m.Block.data[m.Block.offset:])
65                 total += int64(count)
66                 m.Block.offset += int64(count)
67                 if m.Block.offset == keepclient.BLOCKSIZE {
68                         m.uploader <- m.Block
69                         m.Block = nil
70                 }
71         }
72
73         m.length += uint64(total)
74
75         if err == io.EOF {
76                 return total, nil
77         }
78         return total, err
79 }
80
81 // Close stops writing a file and adds it to the parent manifest.
82 func (m *CollectionFileWriter) Close() error {
83         m.ManifestStream.FileStreamSegments = append(m.ManifestStream.FileStreamSegments,
84                 manifest.FileStreamSegment{m.offset, m.length, m.fn})
85         return nil
86 }
87
88 func (m *CollectionFileWriter) NewFile(fn string) {
89         m.offset += m.length
90         m.length = 0
91         m.fn = fn
92 }
93
94 func (m *CollectionFileWriter) goUpload(workers chan struct{}) {
95         var mtx sync.Mutex
96         var wg sync.WaitGroup
97
98         var errors []error
99         uploader := m.uploader
100         finish := m.finish
101         for block := range uploader {
102                 mtx.Lock()
103                 m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, "")
104                 blockIndex := len(m.ManifestStream.Blocks) - 1
105                 mtx.Unlock()
106
107                 workers <- struct{}{} // wait for an available worker slot
108                 wg.Add(1)
109
110                 go func(block *Block, blockIndex int) {
111                         hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
112                         signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset])
113                         <-workers
114
115                         mtx.Lock()
116                         if err != nil {
117                                 errors = append(errors, err)
118                         } else {
119                                 m.ManifestStream.Blocks[blockIndex] = signedHash
120                         }
121                         mtx.Unlock()
122
123                         wg.Done()
124                 }(block, blockIndex)
125         }
126         wg.Wait()
127
128         finish <- errors
129 }
130
131 // CollectionWriter implements creating new Keep collections by opening files
132 // and writing to them.
133 type CollectionWriter struct {
134         MaxWriters int
135         IKeepClient
136         Streams []*CollectionFileWriter
137         workers chan struct{}
138         mtx     sync.Mutex
139 }
140
141 // Open a new file for writing in the Keep collection.
142 func (m *CollectionWriter) Open(path string) io.WriteCloser {
143         var dir string
144         var fn string
145
146         i := strings.Index(path, "/")
147         if i > -1 {
148                 dir = "./" + path[0:i]
149                 fn = path[i+1:]
150         } else {
151                 dir = "."
152                 fn = path
153         }
154
155         fw := &CollectionFileWriter{
156                 m.IKeepClient,
157                 &manifest.ManifestStream{StreamName: dir},
158                 0,
159                 0,
160                 nil,
161                 make(chan *Block),
162                 make(chan []error),
163                 fn}
164
165         m.mtx.Lock()
166         defer m.mtx.Unlock()
167         if m.workers == nil {
168                 if m.MaxWriters < 1 {
169                         m.MaxWriters = 2
170                 }
171                 m.workers = make(chan struct{}, m.MaxWriters)
172         }
173
174         go fw.goUpload(m.workers)
175
176         m.Streams = append(m.Streams, fw)
177
178         return fw
179 }
180
181 // Finish writing the collection, wait for all blocks to complete uploading.
182 func (m *CollectionWriter) Finish() error {
183         var errstring string
184         m.mtx.Lock()
185         defer m.mtx.Unlock()
186
187         for _, stream := range m.Streams {
188                 if stream.uploader == nil {
189                         continue
190                 }
191                 if stream.Block != nil {
192                         stream.uploader <- stream.Block
193                 }
194                 close(stream.uploader)
195                 stream.uploader = nil
196
197                 errors := <-stream.finish
198                 close(stream.finish)
199                 stream.finish = nil
200
201                 for _, r := range errors {
202                         errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
203                 }
204         }
205         if errstring != "" {
206                 return errors.New(errstring)
207         }
208         return nil
209 }
210
211 // ManifestText returns the manifest text of the collection.  Calls Finish()
212 // first to ensure that all blocks are written and that signed locators and
213 // available.
214 func (m *CollectionWriter) ManifestText() (mt string, err error) {
215         err = m.Finish()
216         if err != nil {
217                 return "", err
218         }
219
220         var buf bytes.Buffer
221
222         m.mtx.Lock()
223         defer m.mtx.Unlock()
224         for _, v := range m.Streams {
225                 if len(v.FileStreamSegments) == 0 {
226                         continue
227                 }
228                 k := v.StreamName
229                 if k == "." {
230                         buf.WriteString(".")
231                 } else {
232                         k = strings.Replace(k, " ", "\\040", -1)
233                         k = strings.Replace(k, "\n", "", -1)
234                         buf.WriteString("./" + k)
235                 }
236                 if len(v.Blocks) > 0 {
237                         for _, b := range v.Blocks {
238                                 buf.WriteString(" ")
239                                 buf.WriteString(b)
240                         }
241                 } else {
242                         buf.WriteString(" d41d8cd98f00b204e9800998ecf8427e+0")
243                 }
244                 for _, f := range v.FileStreamSegments {
245                         buf.WriteString(" ")
246                         name := strings.Replace(f.Name, " ", "\\040", -1)
247                         name = strings.Replace(name, "\n", "", -1)
248                         buf.WriteString(fmt.Sprintf("%v:%v:%v", f.SegPos, f.SegLen, name))
249                 }
250                 buf.WriteString("\n")
251         }
252         return buf.String(), nil
253 }
254
255 type WalkUpload struct {
256         MaxWriters  int
257         kc          IKeepClient
258         stripPrefix string
259         streamMap   map[string]*CollectionFileWriter
260         status      *log.Logger
261         workers     chan struct{}
262         mtx         sync.Mutex
263 }
264
265 // WalkFunc walks a directory tree, uploads each file found and adds it to the
266 // CollectionWriter.
267 func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
268
269         if info.IsDir() {
270                 return nil
271         }
272
273         var dir string
274         if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
275                 dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
276         }
277         if dir == "" {
278                 dir = "."
279         }
280
281         fn := path[(len(path) - len(info.Name())):]
282
283         if m.streamMap[dir] == nil {
284                 m.streamMap[dir] = &CollectionFileWriter{
285                         m.kc,
286                         &manifest.ManifestStream{StreamName: dir},
287                         0,
288                         0,
289                         nil,
290                         make(chan *Block),
291                         make(chan []error),
292                         ""}
293
294                 m.mtx.Lock()
295                 if m.workers == nil {
296                         if m.MaxWriters < 1 {
297                                 m.MaxWriters = 2
298                         }
299                         m.workers = make(chan struct{}, m.MaxWriters)
300                 }
301                 m.mtx.Unlock()
302
303                 go m.streamMap[dir].goUpload(m.workers)
304         }
305
306         fileWriter := m.streamMap[dir]
307
308         // Reset the CollectionFileWriter for a new file
309         fileWriter.NewFile(fn)
310
311         file, err := os.Open(path)
312         if err != nil {
313                 return err
314         }
315         defer file.Close()
316
317         m.status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
318
319         _, err = io.Copy(fileWriter, file)
320         if err != nil {
321                 return err
322         }
323
324         // Commits the current file.  Legal to call this repeatedly.
325         fileWriter.Close()
326
327         return nil
328 }
329
330 func (cw *CollectionWriter) WriteTree(root string, status *log.Logger) (manifest string, err error) {
331         streamMap := make(map[string]*CollectionFileWriter)
332         wu := &WalkUpload{0, cw.IKeepClient, root, streamMap, status, nil, sync.Mutex{}}
333         err = filepath.Walk(root, wu.WalkFunc)
334
335         if err != nil {
336                 return "", err
337         }
338
339         cw.mtx.Lock()
340         for _, st := range streamMap {
341                 cw.Streams = append(cw.Streams, st)
342         }
343         cw.mtx.Unlock()
344
345         return cw.ManifestText()
346 }