1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
19 "git.curoverse.com/arvados.git/sdk/go/keepclient"
20 "git.curoverse.com/arvados.git/sdk/go/manifest"
28 type ManifestStreamWriter struct {
30 *manifest.ManifestStream
37 type IKeepClient interface {
38 PutHB(hash string, buf []byte) (string, int, error)
41 func (m *ManifestStreamWriter) Write(p []byte) (int, error) {
42 n, err := m.ReadFrom(bytes.NewReader(p))
46 func (m *ManifestStreamWriter) ReadFrom(r io.Reader) (n int64, err error) {
52 m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
54 count, err = r.Read(m.Block.data[m.Block.offset:])
56 m.Block.offset += int64(count)
57 if m.Block.offset == keepclient.BLOCKSIZE {
71 func (m *ManifestStreamWriter) goUpload() {
73 uploader := m.uploader
75 for block := range uploader {
76 hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
77 signedHash, _, err := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset])
79 errors = append(errors, err)
81 m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
87 type ManifestWriter struct {
90 Streams map[string]*ManifestStreamWriter
93 func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) error {
98 targetPath, targetInfo := path, info
99 if info.Mode()&os.ModeSymlink != 0 {
100 // Update targetpath/info to reflect the symlink
101 // target, not the symlink itself
102 targetPath, err = filepath.EvalSymlinks(path)
106 targetInfo, err = os.Stat(targetPath)
108 return fmt.Errorf("stat symlink %q target %q: %s", path, targetPath, err)
112 if targetInfo.Mode()&os.ModeType != 0 {
113 // Skip directories, pipes, other non-regular files
118 if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
119 dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
125 fn := path[(len(path) - len(info.Name())):]
127 if m.Streams[dir] == nil {
128 m.Streams[dir] = &ManifestStreamWriter{
130 &manifest.ManifestStream{StreamName: dir},
135 go m.Streams[dir].goUpload()
138 stream := m.Streams[dir]
140 fileStart := stream.offset
142 file, err := os.Open(path)
147 log.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
150 count, err = io.Copy(stream, file)
155 stream.offset += count
157 stream.ManifestStream.FileStreamSegments = append(stream.ManifestStream.FileStreamSegments,
158 manifest.FileStreamSegment{uint64(fileStart), uint64(count), fn})
163 func (m *ManifestWriter) Finish() error {
165 for _, stream := range m.Streams {
166 if stream.uploader == nil {
169 if stream.Block != nil {
170 stream.uploader <- stream.Block
172 close(stream.uploader)
173 stream.uploader = nil
175 errors := <-stream.finish
179 for _, r := range errors {
180 errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
184 return errors.New(errstring)
190 func (m *ManifestWriter) ManifestText() string {
194 dirs := make([]string, len(m.Streams))
196 for k := range m.Streams {
202 for _, k := range dirs {
208 k = strings.Replace(k, " ", "\\040", -1)
209 k = strings.Replace(k, "\n", "", -1)
210 buf.WriteString("./" + k)
212 for _, b := range v.Blocks {
216 for _, f := range v.FileStreamSegments {
218 name := strings.Replace(f.Name, " ", "\\040", -1)
219 name = strings.Replace(name, "\n", "", -1)
220 buf.WriteString(fmt.Sprintf("%d:%d:%s", f.SegPos, f.SegLen, name))
222 buf.WriteString("\n")
227 func WriteTree(kc IKeepClient, root string) (manifest string, err error) {
228 mw := ManifestWriter{kc, root, map[string]*ManifestStreamWriter{}}
229 err = filepath.Walk(root, mw.WalkFunc)
240 return mw.ManifestText(), nil