7816: Rename to crunch-run
[arvados.git] / services / crunch-run / upload.go
1 package main
2
3 // Originally based on sdk/go/crunchrunner/upload.go
4 //
5 // Unlike the original, which iterates over a directory tree and uploads each
6 // file sequentially, this version supports opening and writing multiple files
7 // in a collection simultaneously.
8 //
9 // Eventually this should move into the Arvados Go SDK for a more comprehensive
10 // implementation of Collections.
11
12 import (
13         "bytes"
14         "crypto/md5"
15         "errors"
16         "fmt"
17         "git.curoverse.com/arvados.git/sdk/go/keepclient"
18         "git.curoverse.com/arvados.git/sdk/go/manifest"
19         "io"
20         "strings"
21 )
22
23 // Block is a data block in a manifest stream
24 type Block struct {
25         data   []byte
26         offset int64
27 }
28
29 // CollectionFileWriter is a Writer that permits writing to a file in a Keep Collection.
30 type CollectionFileWriter struct {
31         IKeepClient
32         *manifest.ManifestStream
33         offset uint64
34         length uint64
35         *Block
36         uploader chan *Block
37         finish   chan []error
38         fn       string
39 }
40
41 // Write to a file in a keep collection
42 func (m *CollectionFileWriter) Write(p []byte) (int, error) {
43         n, err := m.ReadFrom(bytes.NewReader(p))
44         return int(n), err
45 }
46
47 // ReadFrom a Reader and write to the Keep collection file.
48 func (m *CollectionFileWriter) ReadFrom(r io.Reader) (n int64, err error) {
49         var total int64
50         var count int
51
52         for err == nil {
53                 if m.Block == nil {
54                         m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
55                 }
56                 count, err = r.Read(m.Block.data[m.Block.offset:])
57                 total += int64(count)
58                 m.Block.offset += int64(count)
59                 if m.Block.offset == keepclient.BLOCKSIZE {
60                         m.uploader <- m.Block
61                         m.Block = nil
62                 }
63         }
64
65         m.length += uint64(total)
66
67         if err == io.EOF {
68                 return total, nil
69         }
70         return total, err
71 }
72
73 // Close stops writing a file and adds it to the parent manifest.
74 func (m *CollectionFileWriter) Close() error {
75         m.ManifestStream.FileStreamSegments = append(m.ManifestStream.FileStreamSegments,
76                 manifest.FileStreamSegment{m.offset, m.length, m.fn})
77         return nil
78 }
79
80 func (m *CollectionFileWriter) goUpload() {
81         var errors []error
82         uploader := m.uploader
83         finish := m.finish
84         for block := range uploader {
85                 hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
86                 signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset])
87                 if err != nil {
88                         errors = append(errors, err)
89                 } else {
90                         m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
91                 }
92         }
93         finish <- errors
94 }
95
96 // CollectionWriter makes implements creating new Keep collections by opening files
97 // and writing to them.
98 type CollectionWriter struct {
99         IKeepClient
100         Streams []*CollectionFileWriter
101 }
102
103 // Open a new file for writing in the Keep collection.
104 func (m *CollectionWriter) Open(path string) io.WriteCloser {
105         var dir string
106         var fn string
107
108         i := strings.Index(path, "/")
109         if i > -1 {
110                 dir = "./" + path[0:i]
111                 fn = path[i+1:]
112         } else {
113                 dir = "."
114                 fn = path
115         }
116
117         fw := &CollectionFileWriter{
118                 m.IKeepClient,
119                 &manifest.ManifestStream{StreamName: dir},
120                 0,
121                 0,
122                 nil,
123                 make(chan *Block),
124                 make(chan []error),
125                 fn}
126         go fw.goUpload()
127
128         m.Streams = append(m.Streams, fw)
129
130         return fw
131 }
132
133 // Finish writing the collection, wait for all blocks to complete uploading.
134 func (m *CollectionWriter) Finish() error {
135         var errstring string
136         for _, stream := range m.Streams {
137                 if stream.uploader == nil {
138                         continue
139                 }
140                 if stream.Block != nil {
141                         stream.uploader <- stream.Block
142                 }
143                 close(stream.uploader)
144                 stream.uploader = nil
145
146                 errors := <-stream.finish
147                 close(stream.finish)
148                 stream.finish = nil
149
150                 for _, r := range errors {
151                         errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
152                 }
153         }
154         if errstring != "" {
155                 return errors.New(errstring)
156         }
157         return nil
158 }
159
160 // ManifestText returns the manifest text of the collection.  Calls Finish()
161 // first to ensure that all blocks are written and that signed locators and
162 // available.
163 func (m *CollectionWriter) ManifestText() (mt string, err error) {
164         err = m.Finish()
165         if err != nil {
166                 return "", err
167         }
168
169         var buf bytes.Buffer
170
171         for _, v := range m.Streams {
172                 k := v.StreamName
173                 if k == "." {
174                         buf.WriteString(".")
175                 } else {
176                         k = strings.Replace(k, " ", "\\040", -1)
177                         k = strings.Replace(k, "\n", "", -1)
178                         buf.WriteString("./" + k)
179                 }
180                 for _, b := range v.Blocks {
181                         buf.WriteString(" ")
182                         buf.WriteString(b)
183                 }
184                 for _, f := range v.FileStreamSegments {
185                         buf.WriteString(" ")
186                         name := strings.Replace(f.Name, " ", "\\040", -1)
187                         name = strings.Replace(name, "\n", "", -1)
188                         buf.WriteString(fmt.Sprintf("%v:%v:%v", f.SegPos, f.SegLen, name))
189                 }
190                 buf.WriteString("\n")
191         }
192         return buf.String(), nil
193 }