Merge branch '9187-requeued-containers' closes #9187
[arvados.git] / sdk / go / crunchrunner / upload.go
1 package main
2
3 import (
4         "bytes"
5         "crypto/md5"
6         "errors"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/keepclient"
9         "git.curoverse.com/arvados.git/sdk/go/manifest"
10         "io"
11         "log"
12         "os"
13         "path/filepath"
14         "sort"
15         "strings"
16 )
17
18 type Block struct {
19         data   []byte
20         offset int64
21 }
22
23 type ManifestStreamWriter struct {
24         *ManifestWriter
25         *manifest.ManifestStream
26         offset int64
27         *Block
28         uploader chan *Block
29         finish   chan []error
30 }
31
32 type IKeepClient interface {
33         PutHB(hash string, buf []byte) (string, int, error)
34 }
35
36 func (m *ManifestStreamWriter) Write(p []byte) (int, error) {
37         n, err := m.ReadFrom(bytes.NewReader(p))
38         return int(n), err
39 }
40
41 func (m *ManifestStreamWriter) ReadFrom(r io.Reader) (n int64, err error) {
42         var total int64
43         var count int
44
45         for err == nil {
46                 if m.Block == nil {
47                         m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
48                 }
49                 count, err = r.Read(m.Block.data[m.Block.offset:])
50                 total += int64(count)
51                 m.Block.offset += int64(count)
52                 if m.Block.offset == keepclient.BLOCKSIZE {
53                         m.uploader <- m.Block
54                         m.Block = nil
55                 }
56         }
57
58         if err == io.EOF {
59                 return total, nil
60         } else {
61                 return total, err
62         }
63
64 }
65
66 func (m *ManifestStreamWriter) goUpload() {
67         var errors []error
68         uploader := m.uploader
69         finish := m.finish
70         for block := range uploader {
71                 hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
72                 signedHash, _, err := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset])
73                 if err != nil {
74                         errors = append(errors, err)
75                 } else {
76                         m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
77                 }
78         }
79         finish <- errors
80 }
81
82 type ManifestWriter struct {
83         IKeepClient
84         stripPrefix string
85         Streams     map[string]*ManifestStreamWriter
86 }
87
88 func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) error {
89         if info.IsDir() {
90                 return nil
91         }
92
93         var dir string
94         if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
95                 dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
96         }
97         if dir == "" {
98                 dir = "."
99         }
100
101         fn := path[(len(path) - len(info.Name())):]
102
103         if m.Streams[dir] == nil {
104                 m.Streams[dir] = &ManifestStreamWriter{
105                         m,
106                         &manifest.ManifestStream{StreamName: dir},
107                         0,
108                         nil,
109                         make(chan *Block),
110                         make(chan []error)}
111                 go m.Streams[dir].goUpload()
112         }
113
114         stream := m.Streams[dir]
115
116         fileStart := stream.offset
117
118         file, err := os.Open(path)
119         if err != nil {
120                 return err
121         }
122
123         log.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
124
125         var count int64
126         count, err = io.Copy(stream, file)
127         if err != nil {
128                 return err
129         }
130
131         stream.offset += count
132
133         stream.ManifestStream.FileStreamSegments = append(stream.ManifestStream.FileStreamSegments,
134                 manifest.FileStreamSegment{uint64(fileStart), uint64(count), fn})
135
136         return nil
137 }
138
139 func (m *ManifestWriter) Finish() error {
140         var errstring string
141         for _, stream := range m.Streams {
142                 if stream.uploader == nil {
143                         continue
144                 }
145                 if stream.Block != nil {
146                         stream.uploader <- stream.Block
147                 }
148                 close(stream.uploader)
149                 stream.uploader = nil
150
151                 errors := <-stream.finish
152                 close(stream.finish)
153                 stream.finish = nil
154
155                 for _, r := range errors {
156                         errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
157                 }
158         }
159         if errstring != "" {
160                 return errors.New(errstring)
161         } else {
162                 return nil
163         }
164 }
165
166 func (m *ManifestWriter) ManifestText() string {
167         m.Finish()
168         var buf bytes.Buffer
169
170         dirs := make([]string, len(m.Streams))
171         i := 0
172         for k := range m.Streams {
173                 dirs[i] = k
174                 i++
175         }
176         sort.Strings(dirs)
177
178         for _, k := range dirs {
179                 v := m.Streams[k]
180
181                 if k == "." {
182                         buf.WriteString(".")
183                 } else {
184                         k = strings.Replace(k, " ", "\\040", -1)
185                         k = strings.Replace(k, "\n", "", -1)
186                         buf.WriteString("./" + k)
187                 }
188                 for _, b := range v.Blocks {
189                         buf.WriteString(" ")
190                         buf.WriteString(b)
191                 }
192                 for _, f := range v.FileStreamSegments {
193                         buf.WriteString(" ")
194                         name := strings.Replace(f.Name, " ", "\\040", -1)
195                         name = strings.Replace(name, "\n", "", -1)
196                         buf.WriteString(fmt.Sprintf("%d:%d:%s", f.SegPos, f.SegLen, name))
197                 }
198                 buf.WriteString("\n")
199         }
200         return buf.String()
201 }
202
203 func WriteTree(kc IKeepClient, root string) (manifest string, err error) {
204         mw := ManifestWriter{kc, root, map[string]*ManifestStreamWriter{}}
205         err = filepath.Walk(root, mw.WalkFunc)
206
207         if err != nil {
208                 return "", err
209         }
210
211         err = mw.Finish()
212         if err != nil {
213                 return "", err
214         }
215
216         return mw.ManifestText(), nil
217 }