Skip to content

Commit

Permalink
Fixed streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
balazsgrill committed Oct 26, 2024
1 parent f493e56 commit 8505410
Show file tree
Hide file tree
Showing 15 changed files with 110 additions and 27 deletions.
28 changes: 28 additions & 0 deletions bindings/proxy/client/exception.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package client

import (
"io"
"io/fs"
"os"

"github.com/balazsgrill/potatodrive/bindings/proxy"
)

func eurap(op string, err error) error {
if err == nil {
return nil
}
if e, ok := err.(*proxy.FilesystemException); ok {
if e.EOF {
return io.EOF
}
if e.Isnotexists {
return &fs.PathError{
Op: op,
Path: e.Message,
Err: os.ErrNotExist,
}
}
}
return err
}
5 changes: 1 addition & 4 deletions bindings/proxy/client/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@ func (f *file) Read(p []byte) (n int, err error) {
// ReadAt implements afero.File.
func (f *file) ReadAt(p []byte, off int64) (n int, err error) {
data, err := f.fs.client.FreadAt(context.Background(), f.handle, int64(len(p)), off)
if err != nil {
return 0, err
}
return copy(p, data), nil
return copy(p, data), eurap("readat", err)
}

// Readdir implements afero.File.
Expand Down
3 changes: 2 additions & 1 deletion bindings/proxy/client/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,5 +97,6 @@ func (f *filesystemClient) Rename(oldname string, newname string) error {

// Stat implements afero.Fs.
func (f *filesystemClient) Stat(name string) (fs.FileInfo, error) {
return f.client.Stat(context.Background(), name)
fi, err := f.client.Stat(context.Background(), name)
return fi, eurap("stat", err)
}
2 changes: 2 additions & 0 deletions bindings/proxy/proxy.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ typedef i64 Timestamp

exception FilesystemException {
1: string message
2: bool isnotexists
3: bool eof
}

struct FileInfo {
Expand Down
19 changes: 19 additions & 0 deletions bindings/proxy/server/exception.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package server

import (
"io"
"os"

"github.com/balazsgrill/potatodrive/bindings/proxy"
)

func ewrap(err error) error {
if err == nil {
return nil
}
return &proxy.FilesystemException{
Message: err.Error(),
Isnotexists: os.IsNotExist(err),
EOF: err == io.EOF,
}
}
2 changes: 1 addition & 1 deletion bindings/proxy/server/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (fs *FilesystemServer) FreadAt(ctx context.Context, file proxy.FileHandle,
}
buffer := make([]byte, bufferSize)
n, err := f.ReadAt(buffer, offset)
return buffer[:n], err
return buffer[:n], ewrap(err)
}

// Freaddir implements proxy.Filesystem.
Expand Down
4 changes: 2 additions & 2 deletions bindings/proxy/server/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ func wrapFileInfo(file os.FileInfo) *proxy.FileInfo {
func (fs *FilesystemServer) Stat(ctx context.Context, name string) (_r *proxy.FileInfo, _err error) {
file, err := fs.fs.Stat(name)
if err != nil {
return nil, err
return nil, ewrap(err)
}
return wrapFileInfo(file), nil
}

func (fs *FilesystemServer) Chmod(ctx context.Context, name string, mode proxy.FileMode) error {
return fs.fs.Chmod(name, os.FileMode(mode))
return ewrap(fs.fs.Chmod(name, os.FileMode(mode)))
}
2 changes: 1 addition & 1 deletion cmd/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func main() {
Logger: icon.Logger,
StateCallback: func(state core.ConnectionState) {
if state.LastSyncError != nil {
icon.Logger.Err(err).Msgf("%s is offline %v", keyname, err)
icon.Logger.Err(state.LastSyncError).Msgf("%s is offline %v", keyname, state.LastSyncError)
}
},
FileStateCallback: func(fss core.FileSyncState) {
Expand Down
5 changes: 5 additions & 0 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"net/http"
"os"

"github.com/rs/zerolog/log"

"github.com/balazsgrill/potatodrive/bindings/proxy/server"
)

Expand Down Expand Up @@ -34,6 +36,7 @@ func main() {
os.Exit(1)
}

log.Info().Str("version", Version).Msg("Starting PotatoDrive Proxy")
config := loadConfig(*configfile)

mux := http.NewServeMux()
Expand All @@ -43,13 +46,15 @@ func main() {
fmt.Printf("Error creating handler: %v\n", err)
os.Exit(1)
}
log.Info().Msgf("Serving %s on %s", c.Directory, pattern)
mux.HandleFunc(pattern, handler)
}

httpserver := http.Server{
Addr: *addr,
Handler: mux,
}
log.Info().Msg("Listening on " + *addr)
httpserver.ListenAndServe()
}

Expand Down
7 changes: 5 additions & 2 deletions core/cfapi/filesystem/fetchdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/balazsgrill/potatodrive/core/cfapi"
)

const BUFFER_SIZE int64 = 1024 * 1024
const BUFFER_SIZE int64 = 100 * 1024

func (instance *VirtualizationInstance) callback_getFilePath(info *cfapi.CF_CALLBACK_INFO) string {
return core.GetString(info.VolumeDosName) + core.GetString(info.NormalizedPath)
Expand Down Expand Up @@ -101,7 +101,10 @@ func (instance *VirtualizationInstance) fetchData(info *cfapi.CF_CALLBACK_INFO,
var n int
var count int64
for count < length {
n, err = file.ReadAt(tb.buffer[tb.count:], byteOffset+count)
instance.Logger.Debug().Msgf("Reading %d bytes", length-count)
// last read may be partial
n, err = file.ReadAt(tb.buffer[tb.count:min(BUFFER_SIZE, length-count)], byteOffset+count)
instance.Logger.Debug().Msgf("Received %d bytes (%v)", n, err)
count += int64(n)
tb.count += int64(n)
if err == io.EOF {
Expand Down
20 changes: 16 additions & 4 deletions core/cfapi/filesystem/synclocaltoremote.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package filesystem

import (
"bytes"
"fmt"
"io/fs"
"os"
"path/filepath"
Expand Down Expand Up @@ -44,6 +45,8 @@ func (instance *VirtualizationInstance) isDeletedRemotely(remotepath string, loc
}

func (instance *VirtualizationInstance) syncLocalToRemote() error {
instance.lock.Lock()
defer instance.lock.Unlock()
return filepath.Walk(instance.rootPath, func(localpath string, localinfo fs.FileInfo, err error) error {
instance.Logger.Debug().Msgf("Syncing local file '%s'", localpath)
if os.IsNotExist(err) {
Expand All @@ -54,15 +57,15 @@ func (instance *VirtualizationInstance) syncLocalToRemote() error {
}

path := instance.path_localToRemote(localpath)
if strings.HasPrefix(path, ".") {
return filepath.SkipDir
}
if localinfo.IsDir() {
if dir, err := afero.IsDir(instance.fs, path); dir {
return err
}
return instance.fs.MkdirAll(path, 0777)
}
if strings.HasPrefix(path, ".") {
return nil
}

localstate, err := getPlaceholderState(localpath)
if err != nil {
Expand All @@ -79,7 +82,16 @@ func (instance *VirtualizationInstance) syncLocalToRemote() error {
// local file is a hydrated placeholder, but not in sync, upload it if local is newer

remoteinfo, err := instance.fs.Stat(path)
localisnewer := os.IsNotExist(err) || (localinfo.ModTime().UTC().Unix() > remoteinfo.ModTime().UTC().Unix())
var localisnewer bool
if os.IsNotExist(err) {
localisnewer = false
} else if err != nil {
return fmt.Errorf("syncLocalToRemote.1 %w", err)
} else if remoteinfo == nil {
return fmt.Errorf("syncLocalToRemote.2 NPE")
} else {
localisnewer = (localinfo.ModTime().UTC().Unix() > remoteinfo.ModTime().UTC().Unix())
}

if localisnewer {
// TODO Add file to queue instead of doing it here
Expand Down
26 changes: 17 additions & 9 deletions core/cfapi/filesystem/syncremotetolocal.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
)

func (instance *VirtualizationInstance) syncRemoteToLocal() error {
instance.lock.Lock()
defer instance.lock.Unlock()
return utils.Walk(instance.fs, "", func(path string, remoteinfo fs.FileInfo, err error) error {
instance.Logger.Debug().Msgf("Syncing remote file '%s'", path)
if os.IsNotExist(err) {
Expand All @@ -22,19 +24,25 @@ func (instance *VirtualizationInstance) syncRemoteToLocal() error {
}
if err != nil {
instance.Logger.Err(err).Send()
return err
return fmt.Errorf("syncRemoteToLocal.1 %w", err)
}

filename := instance.path_getNameRemote(path)
if strings.HasPrefix(filename, ".") {
if strings.HasPrefix(filename, ".") && len(filename) > 1 {
// do not skip "."
if remoteinfo.IsDir() {
return filepath.SkipDir
}
return nil
}

localpath := instance.path_remoteToLocal(path)
placeholderstate, err := getPlaceholderState(localpath)
instance.Logger.Debug().Msgf("Placeholder state for '%s' is %x", localpath, placeholderstate)
if os.IsNotExist(err) {
if remoteinfo.IsDir() {
// local dir does not exist, create it
instance.Logger.Debug().Msgf("Creating local dir '%s'", localpath)
return os.MkdirAll(localpath, 0777)
} else {
localdir := filepath.Dir(localpath)
Expand All @@ -43,17 +51,17 @@ func (instance *VirtualizationInstance) syncRemoteToLocal() error {
var EntriesProcessed uint32
hr := cfapi.CfCreatePlaceholders(core.GetPointer(localdir), &placeholder, 1, cfapi.CF_CREATE_FLAG_NONE, &EntriesProcessed)
if hr != 0 {
return core.ErrorByCode(hr)
return core.ErrorByCodeWithContext("syncRemoteToLocal:CfCreatePlaceholders", hr)
}
if EntriesProcessed != 1 {
return fmt.Errorf("unexpected number of entries processed: %d", EntriesProcessed)
return fmt.Errorf("syncRemoteToLocal: unexpected number of entries processed: %d", EntriesProcessed)
}
// done here, return
return nil
}
}
if err != nil {
return err
return fmt.Errorf("syncRemoteToLocal.2 %w", err)
}

insync := (placeholderstate & cfapi.CF_PLACEHOLDER_STATE_IN_SYNC) != 0
Expand All @@ -67,7 +75,7 @@ func (instance *VirtualizationInstance) syncRemoteToLocal() error {
var handle syscall.Handle
hr := cfapi.CfOpenFileWithOplock(core.GetPointer(localpath), cfapi.CF_OPEN_FILE_FLAG_WRITE_ACCESS|cfapi.CF_OPEN_FILE_FLAG_EXCLUSIVE, &handle)
if hr != 0 {
return core.ErrorByCode(hr)
return core.ErrorByCodeWithContext("syncRemoteToLocal:CfOpenFileWithOplock", hr)
}
defer cfapi.CfCloseHandle(handle)
placeholder := getPlaceholder(remoteinfo)
Expand All @@ -77,22 +85,22 @@ func (instance *VirtualizationInstance) syncRemoteToLocal() error {
instance.Logger.Info().Msgf("Converting to placeholder '%s'", path)
hr = cfapi.CfConvertToPlaceholder(handle, placeholder.FileIdentity, placeholder.FileIdentityLength, cfapi.CF_CONVERT_FLAG_NONE, 0, 0)
if hr != 0 {
return core.ErrorByCode(hr)
return core.ErrorByCodeWithContext("syncRemoteToLocal:CfConvertToPlaceholder", hr)
}
}
if !insync {
// updating a placeholder only works if it is marked as in-sync
hr = cfapi.CfSetInSyncState(handle, cfapi.CF_IN_SYNC_STATE_IN_SYNC, cfapi.CF_SET_IN_SYNC_FLAG_NONE, nil)
if hr != 0 {
return core.ErrorByCode(hr)
return core.ErrorByCodeWithContext("syncRemoteToLocal:CfSetInSyncState", hr)
}
}
var fileRange cfapi.CF_FILE_RANGE
fileRange.StartingOffset = 0
fileRange.Length = localinfo.Size()
hr = cfapi.CfUpdatePlaceholder(handle, &placeholder.FsMetadata, placeholder.FileIdentity, placeholder.FileIdentityLength, &fileRange, 1, cfapi.CF_UPDATE_FLAG_CLEAR_IN_SYNC|cfapi.CF_UPDATE_FLAG_DEHYDRATE, nil, 0)
if hr != 0 {
return core.ErrorByCode(hr)
return core.ErrorByCodeWithContext("syncRemoteToLocal:CfUpdatePlaceholder", hr)
}
instance.NotifyFileState(localpath, core.FileSyncStateDirty)

Expand Down
10 changes: 9 additions & 1 deletion core/errorcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,16 @@ func ErrorByCode(result uintptr) error {
message := make([]uint16, 256)
_, err := windows.FormatMessage(windows.FORMAT_MESSAGE_IGNORE_INSERTS|windows.FORMAT_MESSAGE_FROM_SYSTEM, 0, uint32(result), 0, message, nil)
if err != nil {
return fmt.Errorf("can't extract message of %x: %v", result, err)
return fmt.Errorf("can't extract message of %x: %w", result, err)
}
return fmt.Errorf("error result: %x - %s", result, windows.UTF16ToString(message))
}
}

func ErrorByCodeWithContext(context string, result uintptr) error {
err := ErrorByCode(result)
if err != nil {
return fmt.Errorf("%s: %w", context, err)
}
return nil
}
2 changes: 1 addition & 1 deletion core/projfs/filesystem/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ func (instance *VirtualizationInstance) GetFileData(callbackData *projfs.PRJ_CAL
var n int
var count uint32
for count < length {
n, err = file.ReadAt(buffer[count:], int64(byteOffset+uint64(count)))
n, err = file.ReadAt(buffer[count:min(len(buffer), int(length)-int(count))], int64(byteOffset+uint64(count)))
count += uint32(n)
if err == io.EOF {
err = nil
Expand Down
2 changes: 1 addition & 1 deletion test/minio/minio_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestDownloadingLargeFile(t *testing.T) {
defer instance.Close()

// generate data and upload it
data := generateTestData(2*1024*1024, "2megabytesof2megabytes")
data := generateTestData(11*1024*1024, "11megabytesof2megabytes")
inputfile := filepath.Join(instance.tempdir, "inputfile.dat")
err := os.WriteFile(inputfile, data, 0644)
if err != nil {
Expand Down

0 comments on commit 8505410

Please sign in to comment.