12183: Improve error handling if unable to delete symlinks marked for deletion.
[arvados.git] / services / crunch-run / upload.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 // Originally based on sdk/go/crunchrunner/upload.go
8 //
9 // Unlike the original, which iterates over a directory tree and uploads each
10 // file sequentially, this version supports opening and writing multiple files
11 // in a collection simultaneously.
12 //
13 // Eventually this should move into the Arvados Go SDK for a more comprehensive
14 // implementation of Collections.
15
16 import (
17         "bytes"
18         "crypto/md5"
19         "errors"
20         "fmt"
21         "io"
22         "io/ioutil"
23         "log"
24         "os"
25         "path/filepath"
26         "strings"
27         "sync"
28
29         "git.curoverse.com/arvados.git/sdk/go/keepclient"
30         "git.curoverse.com/arvados.git/sdk/go/manifest"
31 )
32
33 // Block is a data block in a manifest stream
34 type Block struct {
35         data   []byte
36         offset int64
37 }
38
39 // CollectionFileWriter is a Writer that permits writing to a file in a Keep Collection.
40 type CollectionFileWriter struct {
41         IKeepClient
42         *manifest.ManifestStream
43         offset uint64
44         length uint64
45         *Block
46         uploader chan *Block
47         finish   chan []error
48         fn       string
49 }
50
51 // Write to a file in a keep collection
52 func (m *CollectionFileWriter) Write(p []byte) (int, error) {
53         n, err := m.ReadFrom(bytes.NewReader(p))
54         return int(n), err
55 }
56
57 // ReadFrom a Reader and write to the Keep collection file.
58 func (m *CollectionFileWriter) ReadFrom(r io.Reader) (n int64, err error) {
59         var total int64
60         var count int
61
62         for err == nil {
63                 if m.Block == nil {
64                         m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
65                 }
66                 count, err = r.Read(m.Block.data[m.Block.offset:])
67                 total += int64(count)
68                 m.Block.offset += int64(count)
69                 if m.Block.offset == keepclient.BLOCKSIZE {
70                         m.uploader <- m.Block
71                         m.Block = nil
72                 }
73         }
74
75         m.length += uint64(total)
76
77         if err == io.EOF {
78                 return total, nil
79         }
80         return total, err
81 }
82
83 // Close stops writing a file and adds it to the parent manifest.
84 func (m *CollectionFileWriter) Close() error {
85         m.ManifestStream.FileStreamSegments = append(m.ManifestStream.FileStreamSegments,
86                 manifest.FileStreamSegment{m.offset, m.length, m.fn})
87         return nil
88 }
89
90 func (m *CollectionFileWriter) NewFile(fn string) {
91         m.offset += m.length
92         m.length = 0
93         m.fn = fn
94 }
95
96 func (m *CollectionFileWriter) goUpload(workers chan struct{}) {
97         var mtx sync.Mutex
98         var wg sync.WaitGroup
99
100         var errors []error
101         uploader := m.uploader
102         finish := m.finish
103         for block := range uploader {
104                 mtx.Lock()
105                 m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, "")
106                 blockIndex := len(m.ManifestStream.Blocks) - 1
107                 mtx.Unlock()
108
109                 workers <- struct{}{} // wait for an available worker slot
110                 wg.Add(1)
111
112                 go func(block *Block, blockIndex int) {
113                         hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
114                         signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset])
115                         <-workers
116
117                         mtx.Lock()
118                         if err != nil {
119                                 errors = append(errors, err)
120                         } else {
121                                 m.ManifestStream.Blocks[blockIndex] = signedHash
122                         }
123                         mtx.Unlock()
124
125                         wg.Done()
126                 }(block, blockIndex)
127         }
128         wg.Wait()
129
130         finish <- errors
131 }
132
133 // CollectionWriter implements creating new Keep collections by opening files
134 // and writing to them.
135 type CollectionWriter struct {
136         MaxWriters int
137         IKeepClient
138         Streams []*CollectionFileWriter
139         workers chan struct{}
140         mtx     sync.Mutex
141 }
142
143 // Open a new file for writing in the Keep collection.
144 func (m *CollectionWriter) Open(path string) io.WriteCloser {
145         var dir string
146         var fn string
147
148         i := strings.Index(path, "/")
149         if i > -1 {
150                 dir = "./" + path[0:i]
151                 fn = path[i+1:]
152         } else {
153                 dir = "."
154                 fn = path
155         }
156
157         fw := &CollectionFileWriter{
158                 m.IKeepClient,
159                 &manifest.ManifestStream{StreamName: dir},
160                 0,
161                 0,
162                 nil,
163                 make(chan *Block),
164                 make(chan []error),
165                 fn}
166
167         m.mtx.Lock()
168         defer m.mtx.Unlock()
169         if m.workers == nil {
170                 if m.MaxWriters < 1 {
171                         m.MaxWriters = 2
172                 }
173                 m.workers = make(chan struct{}, m.MaxWriters)
174         }
175
176         go fw.goUpload(m.workers)
177
178         m.Streams = append(m.Streams, fw)
179
180         return fw
181 }
182
183 // Finish writing the collection, wait for all blocks to complete uploading.
184 func (m *CollectionWriter) Finish() error {
185         var errstring string
186         m.mtx.Lock()
187         defer m.mtx.Unlock()
188
189         for _, stream := range m.Streams {
190                 if stream.uploader == nil {
191                         continue
192                 }
193                 if stream.Block != nil {
194                         stream.uploader <- stream.Block
195                 }
196                 close(stream.uploader)
197                 stream.uploader = nil
198
199                 errors := <-stream.finish
200                 close(stream.finish)
201                 stream.finish = nil
202
203                 for _, r := range errors {
204                         errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
205                 }
206         }
207         if errstring != "" {
208                 return errors.New(errstring)
209         }
210         return nil
211 }
212
213 // ManifestText returns the manifest text of the collection.  Calls Finish()
214 // first to ensure that all blocks are written and that signed locators and
215 // available.
216 func (m *CollectionWriter) ManifestText() (mt string, err error) {
217         err = m.Finish()
218         if err != nil {
219                 return "", err
220         }
221
222         var buf bytes.Buffer
223
224         m.mtx.Lock()
225         defer m.mtx.Unlock()
226         for _, v := range m.Streams {
227                 if len(v.FileStreamSegments) == 0 {
228                         continue
229                 }
230                 k := v.StreamName
231                 if k == "." {
232                         buf.WriteString(".")
233                 } else {
234                         k = strings.Replace(k, " ", "\\040", -1)
235                         k = strings.Replace(k, "\n", "", -1)
236                         buf.WriteString("./" + k)
237                 }
238                 if len(v.Blocks) > 0 {
239                         for _, b := range v.Blocks {
240                                 buf.WriteString(" ")
241                                 buf.WriteString(b)
242                         }
243                 } else {
244                         buf.WriteString(" d41d8cd98f00b204e9800998ecf8427e+0")
245                 }
246                 for _, f := range v.FileStreamSegments {
247                         buf.WriteString(" ")
248                         name := strings.Replace(f.Name, " ", "\\040", -1)
249                         name = strings.Replace(name, "\n", "", -1)
250                         buf.WriteString(fmt.Sprintf("%v:%v:%v", f.SegPos, f.SegLen, name))
251                 }
252                 buf.WriteString("\n")
253         }
254         return buf.String(), nil
255 }
256
257 type WalkUpload struct {
258         MaxWriters  int
259         kc          IKeepClient
260         stripPrefix string
261         streamMap   map[string]*CollectionFileWriter
262         status      *log.Logger
263         workers     chan struct{}
264         mtx         sync.Mutex
265 }
266
267 // WalkFunc walks a directory tree, uploads each file found and adds it to the
268 // CollectionWriter.
269 func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
270         if err != nil {
271                 return err
272         }
273
274         targetPath, targetInfo := path, info
275         if info.Mode()&os.ModeSymlink != 0 {
276                 // Update targetpath/info to reflect the symlink
277                 // target, not the symlink itself
278                 targetPath, err = filepath.EvalSymlinks(path)
279                 if err != nil {
280                         return err
281                 }
282                 targetInfo, err = os.Stat(targetPath)
283                 if err != nil {
284                         return fmt.Errorf("stat symlink %q target %q: %s", path, targetPath, err)
285                 }
286                 if targetInfo.IsDir() {
287                         // Symlinks to directories don't get walked, so do it
288                         // here.  We've previously checked that they stay in
289                         // the output directory and don't result in an endless
290                         // loop.
291                         var rd []os.FileInfo
292                         rd, err = ioutil.ReadDir(path)
293                         if err != nil {
294                                 return err
295                         }
296                         for _, ent := range rd {
297                                 err = filepath.Walk(filepath.Join(path, ent.Name()), m.WalkFunc)
298                         }
299                 }
300         }
301
302         if targetInfo.Mode()&os.ModeType != 0 {
303                 // Skip directories, pipes, other non-regular files
304                 return nil
305         }
306
307         var dir string
308         if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
309                 dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
310         }
311         if dir == "" {
312                 dir = "."
313         }
314
315         fn := path[(len(path) - len(info.Name())):]
316
317         if m.streamMap[dir] == nil {
318                 m.streamMap[dir] = &CollectionFileWriter{
319                         m.kc,
320                         &manifest.ManifestStream{StreamName: dir},
321                         0,
322                         0,
323                         nil,
324                         make(chan *Block),
325                         make(chan []error),
326                         ""}
327
328                 m.mtx.Lock()
329                 if m.workers == nil {
330                         if m.MaxWriters < 1 {
331                                 m.MaxWriters = 2
332                         }
333                         m.workers = make(chan struct{}, m.MaxWriters)
334                 }
335                 m.mtx.Unlock()
336
337                 go m.streamMap[dir].goUpload(m.workers)
338         }
339
340         fileWriter := m.streamMap[dir]
341
342         // Reset the CollectionFileWriter for a new file
343         fileWriter.NewFile(fn)
344
345         file, err := os.Open(path)
346         if err != nil {
347                 return err
348         }
349         defer file.Close()
350
351         m.status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
352
353         _, err = io.Copy(fileWriter, file)
354         if err != nil {
355                 return err
356         }
357
358         // Commits the current file.  Legal to call this repeatedly.
359         fileWriter.Close()
360
361         return nil
362 }
363
364 func (cw *CollectionWriter) WriteTree(root string, status *log.Logger) (manifest string, err error) {
365         streamMap := make(map[string]*CollectionFileWriter)
366         wu := &WalkUpload{0, cw.IKeepClient, root, streamMap, status, nil, sync.Mutex{}}
367         err = filepath.Walk(root, wu.WalkFunc)
368
369         if err != nil {
370                 return "", err
371         }
372
373         cw.mtx.Lock()
374         for _, st := range streamMap {
375                 cw.Streams = append(cw.Streams, st)
376         }
377         cw.mtx.Unlock()
378
379         return cw.ManifestText()
380 }