diff --git a/bindings/proxy/client/exception.go b/bindings/proxy/client/exception.go new file mode 100644 index 0000000..fa4f496 --- /dev/null +++ b/bindings/proxy/client/exception.go @@ -0,0 +1,28 @@ +package client + +import ( + "io" + "io/fs" + "os" + + "github.com/balazsgrill/potatodrive/bindings/proxy" +) + +func eurap(op string, err error) error { + if err == nil { + return nil + } + if e, ok := err.(*proxy.FilesystemException); ok { + if e.EOF { + return io.EOF + } + if e.Isnotexists { + return &fs.PathError{ + Op: op, + Path: e.Message, + Err: os.ErrNotExist, + } + } + } + return err +} diff --git a/bindings/proxy/client/file.go b/bindings/proxy/client/file.go index 98bb0be..3d9a959 100644 --- a/bindings/proxy/client/file.go +++ b/bindings/proxy/client/file.go @@ -36,10 +36,7 @@ func (f *file) Read(p []byte) (n int, err error) { // ReadAt implements afero.File. func (f *file) ReadAt(p []byte, off int64) (n int, err error) { data, err := f.fs.client.FreadAt(context.Background(), f.handle, int64(len(p)), off) - if err != nil { - return 0, err - } - return copy(p, data), nil + return copy(p, data), eurap("readat", err) } // Readdir implements afero.File. diff --git a/bindings/proxy/client/filesystem.go b/bindings/proxy/client/filesystem.go index b42d807..adf5b42 100644 --- a/bindings/proxy/client/filesystem.go +++ b/bindings/proxy/client/filesystem.go @@ -97,5 +97,6 @@ func (f *filesystemClient) Rename(oldname string, newname string) error { // Stat implements afero.Fs. func (f *filesystemClient) Stat(name string) (fs.FileInfo, error) { - return f.client.Stat(context.Background(), name) + fi, err := f.client.Stat(context.Background(), name) + return fi, eurap("stat", err) } diff --git a/bindings/proxy/proxy.thrift b/bindings/proxy/proxy.thrift index d10f152..e407a4b 100644 --- a/bindings/proxy/proxy.thrift +++ b/bindings/proxy/proxy.thrift @@ -4,6 +4,8 @@ typedef i64 Timestamp exception FilesystemException { 1: string message + 2: bool isnotexists + 3: bool eof } struct FileInfo { diff --git a/bindings/proxy/server/exception.go b/bindings/proxy/server/exception.go new file mode 100644 index 0000000..82f0848 --- /dev/null +++ b/bindings/proxy/server/exception.go @@ -0,0 +1,19 @@ +package server + +import ( + "io" + "os" + + "github.com/balazsgrill/potatodrive/bindings/proxy" +) + +func ewrap(err error) error { + if err == nil { + return nil + } + return &proxy.FilesystemException{ + Message: err.Error(), + Isnotexists: os.IsNotExist(err), + EOF: err == io.EOF, + } +} diff --git a/bindings/proxy/server/file.go b/bindings/proxy/server/file.go index f14414e..13cc54b 100644 --- a/bindings/proxy/server/file.go +++ b/bindings/proxy/server/file.go @@ -45,7 +45,7 @@ func (fs *FilesystemServer) FreadAt(ctx context.Context, file proxy.FileHandle, } buffer := make([]byte, bufferSize) n, err := f.ReadAt(buffer, offset) - return buffer[:n], err + return buffer[:n], ewrap(err) } // Freaddir implements proxy.Filesystem. diff --git a/bindings/proxy/server/filesystem.go b/bindings/proxy/server/filesystem.go index 0253f27..39dfe82 100644 --- a/bindings/proxy/server/filesystem.go +++ b/bindings/proxy/server/filesystem.go @@ -103,11 +103,11 @@ func wrapFileInfo(file os.FileInfo) *proxy.FileInfo { func (fs *FilesystemServer) Stat(ctx context.Context, name string) (_r *proxy.FileInfo, _err error) { file, err := fs.fs.Stat(name) if err != nil { - return nil, err + return nil, ewrap(err) } return wrapFileInfo(file), nil } func (fs *FilesystemServer) Chmod(ctx context.Context, name string, mode proxy.FileMode) error { - return fs.fs.Chmod(name, os.FileMode(mode)) + return ewrap(fs.fs.Chmod(name, os.FileMode(mode))) } diff --git a/cmd/main/main.go b/cmd/main/main.go index b9ecce9..8a98126 100644 --- a/cmd/main/main.go +++ b/cmd/main/main.go @@ -47,7 +47,7 @@ func main() { Logger: icon.Logger, StateCallback: func(state core.ConnectionState) { if state.LastSyncError != nil { - icon.Logger.Err(err).Msgf("%s is offline %v", keyname, err) + icon.Logger.Err(state.LastSyncError).Msgf("%s is offline %v", keyname, state.LastSyncError) } }, FileStateCallback: func(fss core.FileSyncState) { diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index 0bc6a39..04000ff 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -7,6 +7,8 @@ import ( "net/http" "os" + "github.com/rs/zerolog/log" + "github.com/balazsgrill/potatodrive/bindings/proxy/server" ) @@ -34,6 +36,7 @@ func main() { os.Exit(1) } + log.Info().Str("version", Version).Msg("Starting PotatoDrive Proxy") config := loadConfig(*configfile) mux := http.NewServeMux() @@ -43,6 +46,7 @@ func main() { fmt.Printf("Error creating handler: %v\n", err) os.Exit(1) } + log.Info().Msgf("Serving %s on %s", c.Directory, pattern) mux.HandleFunc(pattern, handler) } @@ -50,6 +54,7 @@ func main() { Addr: *addr, Handler: mux, } + log.Info().Msg("Listening on " + *addr) httpserver.ListenAndServe() } diff --git a/core/cfapi/filesystem/fetchdata.go b/core/cfapi/filesystem/fetchdata.go index 05df678..23becd8 100644 --- a/core/cfapi/filesystem/fetchdata.go +++ b/core/cfapi/filesystem/fetchdata.go @@ -11,7 +11,7 @@ import ( "github.com/balazsgrill/potatodrive/core/cfapi" ) -const BUFFER_SIZE int64 = 1024 * 1024 +const BUFFER_SIZE int64 = 100 * 1024 func (instance *VirtualizationInstance) callback_getFilePath(info *cfapi.CF_CALLBACK_INFO) string { return core.GetString(info.VolumeDosName) + core.GetString(info.NormalizedPath) @@ -101,7 +101,10 @@ func (instance *VirtualizationInstance) fetchData(info *cfapi.CF_CALLBACK_INFO, var n int var count int64 for count < length { - n, err = file.ReadAt(tb.buffer[tb.count:], byteOffset+count) + instance.Logger.Debug().Msgf("Reading %d bytes", length-count) + // last read may be partial + n, err = file.ReadAt(tb.buffer[tb.count:min(BUFFER_SIZE, length-count)], byteOffset+count) + instance.Logger.Debug().Msgf("Received %d bytes (%v)", n, err) count += int64(n) tb.count += int64(n) if err == io.EOF { diff --git a/core/cfapi/filesystem/synclocaltoremote.go b/core/cfapi/filesystem/synclocaltoremote.go index 3855be3..2059cbe 100644 --- a/core/cfapi/filesystem/synclocaltoremote.go +++ b/core/cfapi/filesystem/synclocaltoremote.go @@ -2,6 +2,7 @@ package filesystem import ( "bytes" + "fmt" "io/fs" "os" "path/filepath" @@ -44,6 +45,8 @@ func (instance *VirtualizationInstance) isDeletedRemotely(remotepath string, loc } func (instance *VirtualizationInstance) syncLocalToRemote() error { + instance.lock.Lock() + defer instance.lock.Unlock() return filepath.Walk(instance.rootPath, func(localpath string, localinfo fs.FileInfo, err error) error { instance.Logger.Debug().Msgf("Syncing local file '%s'", localpath) if os.IsNotExist(err) { @@ -54,15 +57,15 @@ func (instance *VirtualizationInstance) syncLocalToRemote() error { } path := instance.path_localToRemote(localpath) + if strings.HasPrefix(path, ".") { + return filepath.SkipDir + } if localinfo.IsDir() { if dir, err := afero.IsDir(instance.fs, path); dir { return err } return instance.fs.MkdirAll(path, 0777) } - if strings.HasPrefix(path, ".") { - return nil - } localstate, err := getPlaceholderState(localpath) if err != nil { @@ -79,7 +82,16 @@ func (instance *VirtualizationInstance) syncLocalToRemote() error { // local file is a hydrated placeholder, but not in sync, upload it if local is newer remoteinfo, err := instance.fs.Stat(path) - localisnewer := os.IsNotExist(err) || (localinfo.ModTime().UTC().Unix() > remoteinfo.ModTime().UTC().Unix()) + var localisnewer bool + if os.IsNotExist(err) { + localisnewer = false + } else if err != nil { + return fmt.Errorf("syncLocalToRemote.1 %w", err) + } else if remoteinfo == nil { + return fmt.Errorf("syncLocalToRemote.2 NPE") + } else { + localisnewer = (localinfo.ModTime().UTC().Unix() > remoteinfo.ModTime().UTC().Unix()) + } if localisnewer { // TODO Add file to queue instead of doing it here diff --git a/core/cfapi/filesystem/syncremotetolocal.go b/core/cfapi/filesystem/syncremotetolocal.go index 7ca2017..27978d3 100644 --- a/core/cfapi/filesystem/syncremotetolocal.go +++ b/core/cfapi/filesystem/syncremotetolocal.go @@ -14,6 +14,8 @@ import ( ) func (instance *VirtualizationInstance) syncRemoteToLocal() error { + instance.lock.Lock() + defer instance.lock.Unlock() return utils.Walk(instance.fs, "", func(path string, remoteinfo fs.FileInfo, err error) error { instance.Logger.Debug().Msgf("Syncing remote file '%s'", path) if os.IsNotExist(err) { @@ -22,19 +24,25 @@ func (instance *VirtualizationInstance) syncRemoteToLocal() error { } if err != nil { instance.Logger.Err(err).Send() - return err + return fmt.Errorf("syncRemoteToLocal.1 %w", err) } filename := instance.path_getNameRemote(path) - if strings.HasPrefix(filename, ".") { + if strings.HasPrefix(filename, ".") && len(filename) > 1 { + // do not skip "." + if remoteinfo.IsDir() { + return filepath.SkipDir + } return nil } + localpath := instance.path_remoteToLocal(path) placeholderstate, err := getPlaceholderState(localpath) instance.Logger.Debug().Msgf("Placeholder state for '%s' is %x", localpath, placeholderstate) if os.IsNotExist(err) { if remoteinfo.IsDir() { // local dir does not exist, create it + instance.Logger.Debug().Msgf("Creating local dir '%s'", localpath) return os.MkdirAll(localpath, 0777) } else { localdir := filepath.Dir(localpath) @@ -43,17 +51,17 @@ func (instance *VirtualizationInstance) syncRemoteToLocal() error { var EntriesProcessed uint32 hr := cfapi.CfCreatePlaceholders(core.GetPointer(localdir), &placeholder, 1, cfapi.CF_CREATE_FLAG_NONE, &EntriesProcessed) if hr != 0 { - return core.ErrorByCode(hr) + return core.ErrorByCodeWithContext("syncRemoteToLocal:CfCreatePlaceholders", hr) } if EntriesProcessed != 1 { - return fmt.Errorf("unexpected number of entries processed: %d", EntriesProcessed) + return fmt.Errorf("syncRemoteToLocal: unexpected number of entries processed: %d", EntriesProcessed) } // done here, return return nil } } if err != nil { - return err + return fmt.Errorf("syncRemoteToLocal.2 %w", err) } insync := (placeholderstate & cfapi.CF_PLACEHOLDER_STATE_IN_SYNC) != 0 @@ -67,7 +75,7 @@ func (instance *VirtualizationInstance) syncRemoteToLocal() error { var handle syscall.Handle hr := cfapi.CfOpenFileWithOplock(core.GetPointer(localpath), cfapi.CF_OPEN_FILE_FLAG_WRITE_ACCESS|cfapi.CF_OPEN_FILE_FLAG_EXCLUSIVE, &handle) if hr != 0 { - return core.ErrorByCode(hr) + return core.ErrorByCodeWithContext("syncRemoteToLocal:CfOpenFileWithOplock", hr) } defer cfapi.CfCloseHandle(handle) placeholder := getPlaceholder(remoteinfo) @@ -77,14 +85,14 @@ func (instance *VirtualizationInstance) syncRemoteToLocal() error { instance.Logger.Info().Msgf("Converting to placeholder '%s'", path) hr = cfapi.CfConvertToPlaceholder(handle, placeholder.FileIdentity, placeholder.FileIdentityLength, cfapi.CF_CONVERT_FLAG_NONE, 0, 0) if hr != 0 { - return core.ErrorByCode(hr) + return core.ErrorByCodeWithContext("syncRemoteToLocal:CfConvertToPlaceholder", hr) } } if !insync { // updating a placeholder only works if it is marked as in-sync hr = cfapi.CfSetInSyncState(handle, cfapi.CF_IN_SYNC_STATE_IN_SYNC, cfapi.CF_SET_IN_SYNC_FLAG_NONE, nil) if hr != 0 { - return core.ErrorByCode(hr) + return core.ErrorByCodeWithContext("syncRemoteToLocal:CfSetInSyncState", hr) } } var fileRange cfapi.CF_FILE_RANGE @@ -92,7 +100,7 @@ func (instance *VirtualizationInstance) syncRemoteToLocal() error { fileRange.Length = localinfo.Size() hr = cfapi.CfUpdatePlaceholder(handle, &placeholder.FsMetadata, placeholder.FileIdentity, placeholder.FileIdentityLength, &fileRange, 1, cfapi.CF_UPDATE_FLAG_CLEAR_IN_SYNC|cfapi.CF_UPDATE_FLAG_DEHYDRATE, nil, 0) if hr != 0 { - return core.ErrorByCode(hr) + return core.ErrorByCodeWithContext("syncRemoteToLocal:CfUpdatePlaceholder", hr) } instance.NotifyFileState(localpath, core.FileSyncStateDirty) diff --git a/core/errorcode.go b/core/errorcode.go index 258a4b1..32fe020 100644 --- a/core/errorcode.go +++ b/core/errorcode.go @@ -13,8 +13,16 @@ func ErrorByCode(result uintptr) error { message := make([]uint16, 256) _, err := windows.FormatMessage(windows.FORMAT_MESSAGE_IGNORE_INSERTS|windows.FORMAT_MESSAGE_FROM_SYSTEM, 0, uint32(result), 0, message, nil) if err != nil { - return fmt.Errorf("can't extract message of %x: %v", result, err) + return fmt.Errorf("can't extract message of %x: %w", result, err) } return fmt.Errorf("error result: %x - %s", result, windows.UTF16ToString(message)) } } + +func ErrorByCodeWithContext(context string, result uintptr) error { + err := ErrorByCode(result) + if err != nil { + return fmt.Errorf("%s: %w", context, err) + } + return nil +} diff --git a/core/projfs/filesystem/filesystem.go b/core/projfs/filesystem/filesystem.go index e5b397c..edc32ab 100644 --- a/core/projfs/filesystem/filesystem.go +++ b/core/projfs/filesystem/filesystem.go @@ -532,7 +532,7 @@ func (instance *VirtualizationInstance) GetFileData(callbackData *projfs.PRJ_CAL var n int var count uint32 for count < length { - n, err = file.ReadAt(buffer[count:], int64(byteOffset+uint64(count))) + n, err = file.ReadAt(buffer[count:min(len(buffer), int(length)-int(count))], int64(byteOffset+uint64(count))) count += uint32(n) if err == io.EOF { err = nil diff --git a/test/minio/minio_test.go b/test/minio/minio_test.go index a1bf43e..fec0f86 100644 --- a/test/minio/minio_test.go +++ b/test/minio/minio_test.go @@ -124,7 +124,7 @@ func TestDownloadingLargeFile(t *testing.T) { defer instance.Close() // generate data and upload it - data := generateTestData(2*1024*1024, "2megabytesof2megabytes") + data := generateTestData(11*1024*1024, "11megabytesof2megabytes") inputfile := filepath.Join(instance.tempdir, "inputfile.dat") err := os.WriteFile(inputfile, data, 0644) if err != nil {