7582: Uploader passes tests
[arvados.git] / sdk / go / crunchrunner / upload.go
1 package main
2
3 import (
4         "bytes"
5         "crypto/md5"
6         "errors"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/keepclient"
9         "git.curoverse.com/arvados.git/sdk/go/manifest"
10         "io"
11         "os"
12         "path/filepath"
13 )
14
15 type Block struct {
16         data   []byte
17         offset int64
18 }
19
20 type ManifestStreamWriter struct {
21         *ManifestWriter
22         *manifest.ManifestStream
23         offset int64
24         *Block
25         uploader chan *Block
26         finish   chan []error
27 }
28
29 type IKeepClient interface {
30         PutHB(hash string, buf []byte) (string, int, error)
31 }
32
33 func (m *ManifestStreamWriter) Write(p []byte) (n int, err error) {
34         // Needed to conform to Writer interface, but not implemented
35         // because io.Copy will actually use ReadFrom instead.
36         return 0, nil
37 }
38
39 func (m *ManifestStreamWriter) ReadFrom(r io.Reader) (n int64, err error) {
40         var total int64
41         var count int
42
43         for err == nil {
44                 if m.Block == nil {
45                         m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
46                 }
47                 count, err = r.Read(m.Block.data[m.Block.offset:])
48                 total += int64(count)
49                 m.Block.offset += int64(count)
50                 if count > 0 {
51                         if m.Block.offset == keepclient.BLOCKSIZE {
52                                 m.uploader <- m.Block
53                                 m.Block = nil
54                         }
55                 }
56         }
57
58         return total, err
59 }
60
61 func (m *ManifestStreamWriter) goUpload() {
62         var errors []error
63         uploader := m.uploader
64         finish := m.finish
65         for true {
66                 select {
67                 case block, valid := <-uploader:
68                         if !valid {
69                                 finish <- errors
70                                 return
71                         }
72                         hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
73                         signedHash, _, err := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset])
74                         if err != nil {
75                                 errors = append(errors, err)
76                         } else {
77                                 m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
78                         }
79                 }
80         }
81 }
82
83 type ManifestWriter struct {
84         IKeepClient
85         stripPrefix string
86         Streams     map[string]*ManifestStreamWriter
87 }
88
89 func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) error {
90         if info.IsDir() {
91                 return nil
92         }
93
94         var dir string
95         if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
96                 dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
97         }
98
99         fn := path[(len(path) - len(info.Name())):]
100
101         if m.Streams[dir] == nil {
102                 m.Streams[dir] = &ManifestStreamWriter{
103                         m,
104                         &manifest.ManifestStream{StreamName: dir},
105                         0,
106                         nil,
107                         make(chan *Block),
108                         make(chan []error)}
109                 go m.Streams[dir].goUpload()
110         }
111
112         stream := m.Streams[dir]
113
114         fileStart := stream.offset
115
116         file, err := os.Open(path)
117         if err != nil {
118                 return err
119         }
120
121         var count int64
122         count, err = io.Copy(stream, file)
123         if err != nil && err != io.EOF {
124                 return err
125         }
126
127         stream.offset += count
128
129         stream.ManifestStream.Files = append(stream.ManifestStream.Files,
130                 fmt.Sprintf("%v:%v:%v", fileStart, count, fn))
131
132         return nil
133 }
134
135 func (m *ManifestWriter) Finish() error {
136         var errstring string
137         for _, v := range m.Streams {
138                 if v.uploader != nil {
139                         if v.Block != nil {
140                                 v.uploader <- v.Block
141                         }
142                         close(v.uploader)
143                         v.uploader = nil
144
145                         errors := <-v.finish
146                         close(v.finish)
147                         v.finish = nil
148
149                         if errors != nil {
150                                 for _, r := range errors {
151                                         errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
152                                 }
153                         }
154                 }
155         }
156         if errstring != "" {
157                 return errors.New(errstring)
158         } else {
159                 return nil
160         }
161 }
162
163 func (m *ManifestWriter) ManifestText() string {
164         m.Finish()
165         var buf bytes.Buffer
166         for k, v := range m.Streams {
167                 if k == "" {
168                         buf.WriteString(".")
169                 } else {
170                         buf.WriteString("./" + k)
171                 }
172                 for _, b := range v.Blocks {
173                         buf.WriteString(" ")
174                         buf.WriteString(b)
175                 }
176                 for _, f := range v.Files {
177                         buf.WriteString(" ")
178                         buf.WriteString(f)
179                 }
180                 buf.WriteString("\n")
181         }
182         return buf.String()
183 }
184
185 func WriteTree(kc IKeepClient, root string) (manifest string, err error) {
186         mw := ManifestWriter{kc, root, map[string]*ManifestStreamWriter{}}
187         err = filepath.Walk(root, mw.WalkFunc)
188
189         if err != nil {
190                 return "", err
191         }
192
193         err = mw.Finish()
194         if err != nil {
195                 return "", err
196         }
197
198         return mw.ManifestText(), nil
199 }