8015: Finished initial implemention of input and output mounts. Needs tests.
[arvados.git] / services / crunch-run / upload.go
1 package main
2
3 // Originally based on sdk/go/crunchrunner/upload.go
4 //
5 // Unlike the original, which iterates over a directory tree and uploads each
6 // file sequentially, this version supports opening and writing multiple files
7 // in a collection simultaneously.
8 //
9 // Eventually this should move into the Arvados Go SDK for a more comprehensive
10 // implementation of Collections.
11
12 import (
13         "bytes"
14         "crypto/md5"
15         "errors"
16         "fmt"
17         "git.curoverse.com/arvados.git/sdk/go/keepclient"
18         "git.curoverse.com/arvados.git/sdk/go/manifest"
19         "io"
20         "log"
21         "strings"
22         "sync"
23 )
24
25 // Block is a data block in a manifest stream
26 type Block struct {
27         data   []byte
28         offset int64
29 }
30
31 // CollectionFileWriter is a Writer that permits writing to a file in a Keep Collection.
32 type CollectionFileWriter struct {
33         IKeepClient
34         *manifest.ManifestStream
35         offset uint64
36         length uint64
37         *Block
38         uploader chan *Block
39         finish   chan []error
40         fn       string
41 }
42
43 // Write to a file in a keep collection
44 func (m *CollectionFileWriter) Write(p []byte) (int, error) {
45         n, err := m.ReadFrom(bytes.NewReader(p))
46         return int(n), err
47 }
48
49 // ReadFrom a Reader and write to the Keep collection file.
50 func (m *CollectionFileWriter) ReadFrom(r io.Reader) (n int64, err error) {
51         var total int64
52         var count int
53
54         for err == nil {
55                 if m.Block == nil {
56                         m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
57                 }
58                 count, err = r.Read(m.Block.data[m.Block.offset:])
59                 total += int64(count)
60                 m.Block.offset += int64(count)
61                 if m.Block.offset == keepclient.BLOCKSIZE {
62                         m.uploader <- m.Block
63                         m.Block = nil
64                 }
65         }
66
67         m.length += uint64(total)
68
69         if err == io.EOF {
70                 return total, nil
71         }
72         return total, err
73 }
74
75 // Close stops writing a file and adds it to the parent manifest.
76 func (m *CollectionFileWriter) Close() error {
77         m.ManifestStream.FileStreamSegments = append(m.ManifestStream.FileStreamSegments,
78                 manifest.FileStreamSegment{m.offset, m.length, m.fn})
79         return nil
80 }
81
82 func (m *CollectionFileWriter) NewFile(fn string) {
83         m.offset += m.length
84         m.length = 0
85         m.fn = fn
86 }
87
88 func (m *CollectionFileWriter) goUpload() {
89         var errors []error
90         uploader := m.uploader
91         finish := m.finish
92         for block := range uploader {
93                 hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
94                 signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset])
95                 if err != nil {
96                         errors = append(errors, err)
97                 } else {
98                         m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
99                 }
100         }
101         finish <- errors
102 }
103
104 // CollectionWriter makes implements creating new Keep collections by opening files
105 // and writing to them.
106 type CollectionWriter struct {
107         IKeepClient
108         Streams []*CollectionFileWriter
109         mtx     sync.Mutex
110 }
111
112 // Open a new file for writing in the Keep collection.
113 func (m *CollectionWriter) Open(path string) io.WriteCloser {
114         var dir string
115         var fn string
116
117         i := strings.Index(path, "/")
118         if i > -1 {
119                 dir = "./" + path[0:i]
120                 fn = path[i+1:]
121         } else {
122                 dir = "."
123                 fn = path
124         }
125
126         fw := &CollectionFileWriter{
127                 m.IKeepClient,
128                 &manifest.ManifestStream{StreamName: dir},
129                 0,
130                 0,
131                 nil,
132                 make(chan *Block),
133                 make(chan []error),
134                 fn}
135         go fw.goUpload()
136
137         m.mtx.Lock()
138         defer m.mtx.Unlock()
139         m.Streams = append(m.Streams, fw)
140
141         return fw
142 }
143
144 // Finish writing the collection, wait for all blocks to complete uploading.
145 func (m *CollectionWriter) Finish() error {
146         var errstring string
147         m.mtx.Lock()
148         defer m.mtx.Unlock()
149
150         for _, stream := range m.Streams {
151                 if stream.uploader == nil {
152                         continue
153                 }
154                 if stream.Block != nil {
155                         stream.uploader <- stream.Block
156                 }
157                 close(stream.uploader)
158                 stream.uploader = nil
159
160                 errors := <-stream.finish
161                 close(stream.finish)
162                 stream.finish = nil
163
164                 for _, r := range errors {
165                         errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
166                 }
167         }
168         if errstring != "" {
169                 return errors.New(errstring)
170         }
171         return nil
172 }
173
174 // ManifestText returns the manifest text of the collection.  Calls Finish()
175 // first to ensure that all blocks are written and that signed locators and
176 // available.
177 func (m *CollectionWriter) ManifestText() (mt string, err error) {
178         err = m.Finish()
179         if err != nil {
180                 return "", err
181         }
182
183         var buf bytes.Buffer
184
185         m.mtx.Lock()
186         defer m.mtx.Unlock()
187         for _, v := range m.Streams {
188                 k := v.StreamName
189                 if k == "." {
190                         buf.WriteString(".")
191                 } else {
192                         k = strings.Replace(k, " ", "\\040", -1)
193                         k = strings.Replace(k, "\n", "", -1)
194                         buf.WriteString("./" + k)
195                 }
196                 for _, b := range v.Blocks {
197                         buf.WriteString(" ")
198                         buf.WriteString(b)
199                 }
200                 for _, f := range v.FileStreamSegments {
201                         buf.WriteString(" ")
202                         name := strings.Replace(f.Name, " ", "\\040", -1)
203                         name = strings.Replace(name, "\n", "", -1)
204                         buf.WriteString(fmt.Sprintf("%v:%v:%v", f.SegPos, f.SegLen, name))
205                 }
206                 buf.WriteString("\n")
207         }
208         return buf.String(), nil
209 }
210
211 // WalkFunc walks a directory tree, uploads each file found and adds it to the
212 // CollectionWriter.
213 func (m *CollectionWriter) WalkFunc(path string,
214         info os.FileInfo,
215         err error,
216         stripPrefix string,
217         streamMap map[string]*manifest.ManifestStream,
218         status log.Logger) error {
219
220         if info.IsDir() {
221                 return nil
222         }
223
224         var dir string
225         if len(path) > (len(stripPrefix) + len(info.Name()) + 1) {
226                 dir = path[len(stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
227         }
228         if dir == "" {
229                 dir = "."
230         }
231
232         fn := path[(len(path) - len(info.Name())):]
233
234         if streamMap[dir] == nil {
235                 streamMap[dir] = &CollectionFileWriter{
236                         m.IKeepClient,
237                         &manifest.ManifestStream{StreamName: dir},
238                         0,
239                         0,
240                         nil,
241                         make(chan *Block),
242                         make(chan []error),
243                         ""}
244                 go streamMap[dir].goUpload()
245         }
246
247         fileWriter := streamMap[dir]
248
249         // Reset the CollectionFileWriter for a new file
250         fileWriter.NewFile(fn)
251
252         file, err := os.Open(path)
253         if err != nil {
254                 return err
255         }
256         defer file.Close()
257
258         status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
259
260         var count int64
261         count, err = io.Copy(fileWriter, file)
262         if err != nil {
263                 return err
264         }
265
266         // Commits the current file.  Legal to call this repeatedly.
267         fileWriter.Close()
268
269         return nil
270 }
271
272 func (cw *CollectionWriter) WriteTree(root string, status log.Logger) (manifest string, err error) {
273         streamMap := make(map[string]*ManifestStreamWriter)
274         err = filepath.Walk(root, func(path string, info os.FileInfo, err error) {
275                 return cw.WalkFunc(path,
276                         info,
277                         err,
278                         root,
279                         streamMap,
280                         status)
281         })
282
283         if err != nil {
284                 return "", err
285         }
286
287         cw.mtx.Lock()
288         for _, st := range streamMap {
289                 cw.Streams = append(cw.Streams, st)
290         }
291         cw.mtx.Unlock()
292
293         return mw.ManifestText()
294 }