13804: Update comments for comments for "consecutive_idle_count"
[arvados.git] / services / crunch-run / upload.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 // Originally based on sdk/go/crunchrunner/upload.go
8 //
9 // Unlike the original, which iterates over a directory tree and uploads each
10 // file sequentially, this version supports opening and writing multiple files
11 // in a collection simultaneously.
12 //
13 // Eventually this should move into the Arvados Go SDK for a more comprehensive
14 // implementation of Collections.
15
16 import (
17         "bytes"
18         "crypto/md5"
19         "errors"
20         "fmt"
21         "io"
22         "log"
23         "os"
24         "path/filepath"
25         "strings"
26         "sync"
27
28         "git.curoverse.com/arvados.git/sdk/go/keepclient"
29         "git.curoverse.com/arvados.git/sdk/go/manifest"
30 )
31
32 // Block is a data block in a manifest stream
33 type Block struct {
34         data   []byte
35         offset int64
36 }
37
38 // CollectionFileWriter is a Writer that permits writing to a file in a Keep Collection.
39 type CollectionFileWriter struct {
40         IKeepClient
41         *manifest.ManifestStream
42         offset uint64
43         length uint64
44         *Block
45         uploader chan *Block
46         finish   chan []error
47         fn       string
48 }
49
50 // Write to a file in a keep collection
51 func (m *CollectionFileWriter) Write(p []byte) (int, error) {
52         n, err := m.ReadFrom(bytes.NewReader(p))
53         return int(n), err
54 }
55
56 // ReadFrom a Reader and write to the Keep collection file.
57 func (m *CollectionFileWriter) ReadFrom(r io.Reader) (n int64, err error) {
58         var total int64
59         var count int
60
61         for err == nil {
62                 if m.Block == nil {
63                         m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
64                 }
65                 count, err = r.Read(m.Block.data[m.Block.offset:])
66                 total += int64(count)
67                 m.Block.offset += int64(count)
68                 if m.Block.offset == keepclient.BLOCKSIZE {
69                         m.uploader <- m.Block
70                         m.Block = nil
71                 }
72         }
73
74         m.length += uint64(total)
75
76         if err == io.EOF {
77                 return total, nil
78         }
79         return total, err
80 }
81
82 // Close stops writing a file and adds it to the parent manifest.
83 func (m *CollectionFileWriter) Close() error {
84         m.ManifestStream.FileStreamSegments = append(m.ManifestStream.FileStreamSegments,
85                 manifest.FileStreamSegment{m.offset, m.length, m.fn})
86         return nil
87 }
88
89 func (m *CollectionFileWriter) NewFile(fn string) {
90         m.offset += m.length
91         m.length = 0
92         m.fn = fn
93 }
94
95 func (m *CollectionFileWriter) goUpload(workers chan struct{}) {
96         var mtx sync.Mutex
97         var wg sync.WaitGroup
98
99         var errors []error
100         uploader := m.uploader
101         finish := m.finish
102         for block := range uploader {
103                 mtx.Lock()
104                 m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, "")
105                 blockIndex := len(m.ManifestStream.Blocks) - 1
106                 mtx.Unlock()
107
108                 workers <- struct{}{} // wait for an available worker slot
109                 wg.Add(1)
110
111                 go func(block *Block, blockIndex int) {
112                         hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
113                         signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset])
114                         <-workers
115
116                         mtx.Lock()
117                         if err != nil {
118                                 errors = append(errors, err)
119                         } else {
120                                 m.ManifestStream.Blocks[blockIndex] = signedHash
121                         }
122                         mtx.Unlock()
123
124                         wg.Done()
125                 }(block, blockIndex)
126         }
127         wg.Wait()
128
129         finish <- errors
130 }
131
132 // CollectionWriter implements creating new Keep collections by opening files
133 // and writing to them.
134 type CollectionWriter struct {
135         MaxWriters int
136         IKeepClient
137         Streams []*CollectionFileWriter
138         workers chan struct{}
139         mtx     sync.Mutex
140 }
141
142 // Open a new file for writing in the Keep collection.
143 func (m *CollectionWriter) Open(path string) io.WriteCloser {
144         var dir string
145         var fn string
146
147         i := strings.Index(path, "/")
148         if i > -1 {
149                 dir = "./" + path[0:i]
150                 fn = path[i+1:]
151         } else {
152                 dir = "."
153                 fn = path
154         }
155
156         fw := &CollectionFileWriter{
157                 m.IKeepClient,
158                 &manifest.ManifestStream{StreamName: dir},
159                 0,
160                 0,
161                 nil,
162                 make(chan *Block),
163                 make(chan []error),
164                 fn}
165
166         m.mtx.Lock()
167         defer m.mtx.Unlock()
168         if m.workers == nil {
169                 if m.MaxWriters < 1 {
170                         m.MaxWriters = 2
171                 }
172                 m.workers = make(chan struct{}, m.MaxWriters)
173         }
174
175         go fw.goUpload(m.workers)
176
177         m.Streams = append(m.Streams, fw)
178
179         return fw
180 }
181
182 // Finish writing the collection, wait for all blocks to complete uploading.
183 func (m *CollectionWriter) Finish() error {
184         var errstring string
185         m.mtx.Lock()
186         defer m.mtx.Unlock()
187
188         for _, stream := range m.Streams {
189                 if stream.uploader == nil {
190                         continue
191                 }
192                 if stream.Block != nil {
193                         stream.uploader <- stream.Block
194                         stream.Block = nil
195                 }
196                 close(stream.uploader)
197                 stream.uploader = nil
198
199                 errors := <-stream.finish
200                 close(stream.finish)
201                 stream.finish = nil
202
203                 for _, r := range errors {
204                         errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
205                 }
206         }
207         if errstring != "" {
208                 return errors.New(errstring)
209         }
210         return nil
211 }
212
213 // ManifestText returns the manifest text of the collection.  Calls Finish()
214 // first to ensure that all blocks are written and that signed locators and
215 // available.
216 func (m *CollectionWriter) ManifestText() (mt string, err error) {
217         err = m.Finish()
218         if err != nil {
219                 return "", err
220         }
221
222         var buf bytes.Buffer
223
224         m.mtx.Lock()
225         defer m.mtx.Unlock()
226         for _, v := range m.Streams {
227                 if len(v.FileStreamSegments) == 0 {
228                         continue
229                 }
230                 k := v.StreamName
231                 if k == "." {
232                         buf.WriteString(".")
233                 } else {
234                         k = strings.Replace(k, " ", "\\040", -1)
235                         k = strings.Replace(k, "\n", "", -1)
236                         buf.WriteString("./" + k)
237                 }
238                 if len(v.Blocks) > 0 {
239                         for _, b := range v.Blocks {
240                                 buf.WriteString(" ")
241                                 buf.WriteString(b)
242                         }
243                 } else {
244                         buf.WriteString(" d41d8cd98f00b204e9800998ecf8427e+0")
245                 }
246                 for _, f := range v.FileStreamSegments {
247                         buf.WriteString(" ")
248                         name := strings.Replace(f.Name, " ", "\\040", -1)
249                         name = strings.Replace(name, "\n", "", -1)
250                         buf.WriteString(fmt.Sprintf("%v:%v:%v", f.SegPos, f.SegLen, name))
251                 }
252                 buf.WriteString("\n")
253         }
254         return buf.String(), nil
255 }
256
257 type WalkUpload struct {
258         MaxWriters  int
259         kc          IKeepClient
260         stripPrefix string
261         streamMap   map[string]*CollectionFileWriter
262         status      *log.Logger
263         workers     chan struct{}
264         mtx         sync.Mutex
265 }
266
267 func (m *WalkUpload) UploadFile(path string, sourcePath string) error {
268         var dir string
269         basename := filepath.Base(path)
270         if len(path) > (len(m.stripPrefix) + len(basename) + 1) {
271                 dir = path[len(m.stripPrefix)+1 : (len(path) - len(basename) - 1)]
272         }
273         if dir == "" {
274                 dir = "."
275         }
276
277         fn := path[(len(path) - len(basename)):]
278
279         info, err := os.Stat(sourcePath)
280         if err != nil {
281                 return err
282         }
283         file, err := os.Open(sourcePath)
284         if err != nil {
285                 return err
286         }
287         defer file.Close()
288
289         if m.streamMap[dir] == nil {
290                 m.streamMap[dir] = &CollectionFileWriter{
291                         m.kc,
292                         &manifest.ManifestStream{StreamName: dir},
293                         0,
294                         0,
295                         nil,
296                         make(chan *Block),
297                         make(chan []error),
298                         ""}
299
300                 m.mtx.Lock()
301                 if m.workers == nil {
302                         if m.MaxWriters < 1 {
303                                 m.MaxWriters = 2
304                         }
305                         m.workers = make(chan struct{}, m.MaxWriters)
306                 }
307                 m.mtx.Unlock()
308
309                 go m.streamMap[dir].goUpload(m.workers)
310         }
311
312         fileWriter := m.streamMap[dir]
313
314         // Reset the CollectionFileWriter for a new file
315         fileWriter.NewFile(fn)
316
317         m.status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
318
319         _, err = io.Copy(fileWriter, file)
320         if err != nil {
321                 m.status.Printf("Uh oh")
322                 return err
323         }
324
325         // Commits the current file.  Legal to call this repeatedly.
326         fileWriter.Close()
327
328         return nil
329 }
330
331 func (cw *CollectionWriter) BeginUpload(root string, status *log.Logger) *WalkUpload {
332         streamMap := make(map[string]*CollectionFileWriter)
333         return &WalkUpload{0, cw.IKeepClient, root, streamMap, status, nil, sync.Mutex{}}
334 }
335
336 func (cw *CollectionWriter) EndUpload(wu *WalkUpload) {
337         cw.mtx.Lock()
338         for _, st := range wu.streamMap {
339                 cw.Streams = append(cw.Streams, st)
340         }
341         cw.mtx.Unlock()
342 }