Merge branch '8784-dir-listings'
[arvados.git] / sdk / go / crunchrunner / upload.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package main
6
7 import (
8         "bytes"
9         "crypto/md5"
10         "errors"
11         "fmt"
12         "git.curoverse.com/arvados.git/sdk/go/keepclient"
13         "git.curoverse.com/arvados.git/sdk/go/manifest"
14         "io"
15         "log"
16         "os"
17         "path/filepath"
18         "sort"
19         "strings"
20 )
21
22 type Block struct {
23         data   []byte
24         offset int64
25 }
26
27 type ManifestStreamWriter struct {
28         *ManifestWriter
29         *manifest.ManifestStream
30         offset int64
31         *Block
32         uploader chan *Block
33         finish   chan []error
34 }
35
36 type IKeepClient interface {
37         PutHB(hash string, buf []byte) (string, int, error)
38 }
39
40 func (m *ManifestStreamWriter) Write(p []byte) (int, error) {
41         n, err := m.ReadFrom(bytes.NewReader(p))
42         return int(n), err
43 }
44
45 func (m *ManifestStreamWriter) ReadFrom(r io.Reader) (n int64, err error) {
46         var total int64
47         var count int
48
49         for err == nil {
50                 if m.Block == nil {
51                         m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
52                 }
53                 count, err = r.Read(m.Block.data[m.Block.offset:])
54                 total += int64(count)
55                 m.Block.offset += int64(count)
56                 if m.Block.offset == keepclient.BLOCKSIZE {
57                         m.uploader <- m.Block
58                         m.Block = nil
59                 }
60         }
61
62         if err == io.EOF {
63                 return total, nil
64         } else {
65                 return total, err
66         }
67
68 }
69
70 func (m *ManifestStreamWriter) goUpload() {
71         var errors []error
72         uploader := m.uploader
73         finish := m.finish
74         for block := range uploader {
75                 hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
76                 signedHash, _, err := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset])
77                 if err != nil {
78                         errors = append(errors, err)
79                 } else {
80                         m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
81                 }
82         }
83         finish <- errors
84 }
85
86 type ManifestWriter struct {
87         IKeepClient
88         stripPrefix string
89         Streams     map[string]*ManifestStreamWriter
90 }
91
92 func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) error {
93         if info.IsDir() {
94                 return nil
95         }
96
97         var dir string
98         if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
99                 dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
100         }
101         if dir == "" {
102                 dir = "."
103         }
104
105         fn := path[(len(path) - len(info.Name())):]
106
107         if m.Streams[dir] == nil {
108                 m.Streams[dir] = &ManifestStreamWriter{
109                         m,
110                         &manifest.ManifestStream{StreamName: dir},
111                         0,
112                         nil,
113                         make(chan *Block),
114                         make(chan []error)}
115                 go m.Streams[dir].goUpload()
116         }
117
118         stream := m.Streams[dir]
119
120         fileStart := stream.offset
121
122         file, err := os.Open(path)
123         if err != nil {
124                 return err
125         }
126
127         log.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
128
129         var count int64
130         count, err = io.Copy(stream, file)
131         if err != nil {
132                 return err
133         }
134
135         stream.offset += count
136
137         stream.ManifestStream.FileStreamSegments = append(stream.ManifestStream.FileStreamSegments,
138                 manifest.FileStreamSegment{uint64(fileStart), uint64(count), fn})
139
140         return nil
141 }
142
143 func (m *ManifestWriter) Finish() error {
144         var errstring string
145         for _, stream := range m.Streams {
146                 if stream.uploader == nil {
147                         continue
148                 }
149                 if stream.Block != nil {
150                         stream.uploader <- stream.Block
151                 }
152                 close(stream.uploader)
153                 stream.uploader = nil
154
155                 errors := <-stream.finish
156                 close(stream.finish)
157                 stream.finish = nil
158
159                 for _, r := range errors {
160                         errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
161                 }
162         }
163         if errstring != "" {
164                 return errors.New(errstring)
165         } else {
166                 return nil
167         }
168 }
169
170 func (m *ManifestWriter) ManifestText() string {
171         m.Finish()
172         var buf bytes.Buffer
173
174         dirs := make([]string, len(m.Streams))
175         i := 0
176         for k := range m.Streams {
177                 dirs[i] = k
178                 i++
179         }
180         sort.Strings(dirs)
181
182         for _, k := range dirs {
183                 v := m.Streams[k]
184
185                 if k == "." {
186                         buf.WriteString(".")
187                 } else {
188                         k = strings.Replace(k, " ", "\\040", -1)
189                         k = strings.Replace(k, "\n", "", -1)
190                         buf.WriteString("./" + k)
191                 }
192                 for _, b := range v.Blocks {
193                         buf.WriteString(" ")
194                         buf.WriteString(b)
195                 }
196                 for _, f := range v.FileStreamSegments {
197                         buf.WriteString(" ")
198                         name := strings.Replace(f.Name, " ", "\\040", -1)
199                         name = strings.Replace(name, "\n", "", -1)
200                         buf.WriteString(fmt.Sprintf("%d:%d:%s", f.SegPos, f.SegLen, name))
201                 }
202                 buf.WriteString("\n")
203         }
204         return buf.String()
205 }
206
207 func WriteTree(kc IKeepClient, root string) (manifest string, err error) {
208         mw := ManifestWriter{kc, root, map[string]*ManifestStreamWriter{}}
209         err = filepath.Walk(root, mw.WalkFunc)
210
211         if err != nil {
212                 return "", err
213         }
214
215         err = mw.Finish()
216         if err != nil {
217                 return "", err
218         }
219
220         return mw.ManifestText(), nil
221 }