7816: Crunch2 executor prototype work in progress.
[arvados.git] / services / crunch-exec / upload.go
1 package main
2
3 import (
4         "bytes"
5         "crypto/md5"
6         "errors"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/keepclient"
9         "git.curoverse.com/arvados.git/sdk/go/manifest"
10         "io"
11         "strings"
12 )
13
14 type Block struct {
15         data   []byte
16         offset int64
17 }
18
19 type CollectionFileWriter struct {
20         IKeepClient
21         *manifest.ManifestStream
22         offset uint64
23         length uint64
24         *Block
25         uploader chan *Block
26         finish   chan []error
27         fn       string
28 }
29
30 func (m *CollectionFileWriter) Write(p []byte) (int, error) {
31         n, err := m.ReadFrom(bytes.NewReader(p))
32         return int(n), err
33 }
34
35 func (m *CollectionFileWriter) ReadFrom(r io.Reader) (n int64, err error) {
36         var total int64
37         var count int
38
39         for err == nil {
40                 if m.Block == nil {
41                         m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
42                 }
43                 count, err = r.Read(m.Block.data[m.Block.offset:])
44                 total += int64(count)
45                 m.Block.offset += int64(count)
46                 if m.Block.offset == keepclient.BLOCKSIZE {
47                         m.uploader <- m.Block
48                         m.Block = nil
49                 }
50         }
51
52         m.length += uint64(total)
53
54         if err == io.EOF {
55                 return total, nil
56         } else {
57                 return total, err
58         }
59 }
60
61 func (m *CollectionFileWriter) Close() error {
62         m.ManifestStream.FileStreamSegments = append(m.ManifestStream.FileStreamSegments,
63                 manifest.FileStreamSegment{m.offset, m.length, m.fn})
64         return nil
65 }
66
67 func (m *CollectionFileWriter) goUpload() {
68         var errors []error
69         uploader := m.uploader
70         finish := m.finish
71         for block := range uploader {
72                 hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
73                 signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset])
74                 if err != nil {
75                         errors = append(errors, err)
76                 } else {
77                         m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
78                 }
79         }
80         finish <- errors
81 }
82
83 type CollectionWriter struct {
84         IKeepClient
85         Streams []*CollectionFileWriter
86 }
87
88 func (m *CollectionWriter) Open(path string) io.WriteCloser {
89         var dir string
90         var fn string
91
92         i := strings.Index(path, "/")
93         if i > -1 {
94                 dir = "./" + path[0:i]
95                 fn = path[i+1:]
96         } else {
97                 dir = "."
98                 fn = path
99         }
100
101         fw := &CollectionFileWriter{
102                 m.IKeepClient,
103                 &manifest.ManifestStream{StreamName: dir},
104                 0,
105                 0,
106                 nil,
107                 make(chan *Block),
108                 make(chan []error),
109                 fn}
110         go fw.goUpload()
111
112         m.Streams = append(m.Streams, fw)
113
114         return fw
115 }
116
117 func (m *CollectionWriter) Finish() error {
118         var errstring string
119         for _, stream := range m.Streams {
120                 if stream.uploader == nil {
121                         continue
122                 }
123                 if stream.Block != nil {
124                         stream.uploader <- stream.Block
125                 }
126                 close(stream.uploader)
127                 stream.uploader = nil
128
129                 errors := <-stream.finish
130                 close(stream.finish)
131                 stream.finish = nil
132
133                 for _, r := range errors {
134                         errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
135                 }
136         }
137         if errstring != "" {
138                 return errors.New(errstring)
139         } else {
140                 return nil
141         }
142 }
143
144 func (m *CollectionWriter) ManifestText() (mt string, err error) {
145         err = m.Finish()
146         if err != nil {
147                 return "", err
148         }
149
150         var buf bytes.Buffer
151
152         for _, v := range m.Streams {
153                 k := v.StreamName
154                 if k == "." {
155                         buf.WriteString(".")
156                 } else {
157                         k = strings.Replace(k, " ", "\\040", -1)
158                         k = strings.Replace(k, "\n", "", -1)
159                         buf.WriteString("./" + k)
160                 }
161                 for _, b := range v.Blocks {
162                         buf.WriteString(" ")
163                         buf.WriteString(b)
164                 }
165                 for _, f := range v.FileStreamSegments {
166                         buf.WriteString(" ")
167                         name := strings.Replace(f.Name, " ", "\\040", -1)
168                         name = strings.Replace(name, "\n", "", -1)
169                         buf.WriteString(fmt.Sprintf("%v:%v:%v", f.SegPos, f.SegLen, name))
170                 }
171                 buf.WriteString("\n")
172         }
173         return buf.String(), nil
174 }