Skip to content

Commit

Permalink
added web streaming support (#905)
Browse files Browse the repository at this point in the history
* added web streaming support

* adding web streaming in wasm

* adding web streaming in wasm

* changed stdin to localpath ffmpeg
  • Loading branch information
Manish-210 authored Apr 28, 2023
1 parent 6fb50cd commit f0d0889
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 30 deletions.
12 changes: 6 additions & 6 deletions mobilesdk/zbox/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,11 @@ func (a *Allocation) DownloadThumbnail(remotePath, localPath string, statusCb St
// - thumbnailPath: the local full path of thumbnail
// - encrypt: the file should be ecnrypted or not on uploading
// - statusCb: callback of status
func (a *Allocation) RepairFile(workdir, localPath, remotePath, thumbnailPath string, encrypt bool, statusCb StatusCallbackMocked) error {
func (a *Allocation) RepairFile(workdir, localPath, remotePath, thumbnailPath string, encrypt bool, webStreaming bool, statusCb StatusCallbackMocked) error {
if a == nil || a.sdkAllocation == nil {
return ErrInvalidAllocation
}
return a.sdkAllocation.StartChunkedUpload(workdir, localPath, remotePath, &StatusCallbackWrapped{Callback: statusCb}, true, true, thumbnailPath, encrypt)
return a.sdkAllocation.StartChunkedUpload(workdir, localPath, remotePath, &StatusCallbackWrapped{Callback: statusCb}, true, true, thumbnailPath, encrypt, webStreaming)
}

// UploadFile - upload file/thumbnail from local path to remote path
Expand All @@ -158,11 +158,11 @@ func (a *Allocation) RepairFile(workdir, localPath, remotePath, thumbnailPath st
// - thumbnailPath: the local full path of thumbnail
// - encrypt: the file should be ecnrypted or not on uploading
// - statusCb: callback of status
func (a *Allocation) UploadFile(workdir, localPath, remotePath, thumbnailPath string, encrypt bool, statusCb StatusCallbackMocked) error {
func (a *Allocation) UploadFile(workdir, localPath, remotePath, thumbnailPath string, encrypt bool, webStreaming bool, statusCb StatusCallbackMocked) error {
if a == nil || a.sdkAllocation == nil {
return ErrInvalidAllocation
}
return a.sdkAllocation.StartChunkedUpload(workdir, localPath, remotePath, &StatusCallbackWrapped{Callback: statusCb}, false, false, thumbnailPath, encrypt)
return a.sdkAllocation.StartChunkedUpload(workdir, localPath, remotePath, &StatusCallbackWrapped{Callback: statusCb}, false, false, thumbnailPath, encrypt, webStreaming)
}

// UploadFile - update file/thumbnail from local path to remote path
Expand All @@ -173,12 +173,12 @@ func (a *Allocation) UploadFile(workdir, localPath, remotePath, thumbnailPath st
// - thumbnailPath: the local full path of thumbnail
// - encrypt: the file should be ecnrypted or not on uploading
// - statusCb: callback of status
func (a *Allocation) UpdateFile(workdir, localPath, remotePath, thumbnailPath string, encrypt bool, statusCb StatusCallbackMocked) error {
func (a *Allocation) UpdateFile(workdir, localPath, remotePath, thumbnailPath string, encrypt bool, webStreaming bool, statusCb StatusCallbackMocked) error {
if a == nil || a.sdkAllocation == nil {
return ErrInvalidAllocation
}

return a.sdkAllocation.StartChunkedUpload(workdir, localPath, remotePath, &StatusCallbackWrapped{Callback: statusCb}, true, false, thumbnailPath, encrypt)
return a.sdkAllocation.StartChunkedUpload(workdir, localPath, remotePath, &StatusCallbackWrapped{Callback: statusCb}, true, false, thumbnailPath, encrypt, webStreaming)
}

// DeleteFile - delete file from remote path
Expand Down
12 changes: 6 additions & 6 deletions mobilesdk/zbox/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,13 @@ func DownloadThumbnail(allocationID, remotePath, localPath string, statusCb Stat
//
// ## Outputs
// - error
func RepairFile(allocationID, workdir, localPath, remotePath, thumbnailPath string, encrypt bool, statusCb StatusCallbackMocked) error {
func RepairFile(allocationID, workdir, localPath, remotePath, thumbnailPath string, encrypt bool, webStreaming bool, statusCb StatusCallbackMocked) error {
a, err := getAllocation(allocationID)
if err != nil {
return err

}
return a.StartChunkedUpload(workdir, localPath, remotePath, &StatusCallbackWrapped{Callback: statusCb}, true, true, thumbnailPath, encrypt)
return a.StartChunkedUpload(workdir, localPath, remotePath, &StatusCallbackWrapped{Callback: statusCb}, true, true, thumbnailPath, encrypt, webStreaming)
}

// UploadFile - upload file/thumbnail from local path to remote path
Expand All @@ -198,12 +198,12 @@ func RepairFile(allocationID, workdir, localPath, remotePath, thumbnailPath stri
//
// ## Outputs
// - error
func UploadFile(allocationID, workdir, localPath, remotePath, thumbnailPath string, encrypt bool, statusCb StatusCallbackMocked) error {
func UploadFile(allocationID, workdir, localPath, remotePath, thumbnailPath string, encrypt bool, webStreaming bool, statusCb StatusCallbackMocked) error {
a, err := getAllocation(allocationID)
if err != nil {
return err
}
return a.StartChunkedUpload(workdir, localPath, remotePath, &StatusCallbackWrapped{Callback: statusCb}, false, false, thumbnailPath, encrypt)
return a.StartChunkedUpload(workdir, localPath, remotePath, &StatusCallbackWrapped{Callback: statusCb}, false, false, thumbnailPath, encrypt, webStreaming)
}

// UploadFile - update file/thumbnail from local path to remote path
Expand All @@ -217,14 +217,14 @@ func UploadFile(allocationID, workdir, localPath, remotePath, thumbnailPath stri
//
// ## Ouputs
// - error
func UpdateFile(allocationID, workdir, localPath, remotePath, thumbnailPath string, encrypt bool, statusCb StatusCallbackMocked) error {
func UpdateFile(allocationID, workdir, localPath, remotePath, thumbnailPath string, encrypt bool, webStreaming bool, statusCb StatusCallbackMocked) error {
a, err := getAllocation(allocationID)

if err != nil {
return err
}

return a.StartChunkedUpload(workdir, localPath, remotePath, &StatusCallbackWrapped{Callback: statusCb}, true, false, thumbnailPath, encrypt)
return a.StartChunkedUpload(workdir, localPath, remotePath, &StatusCallbackWrapped{Callback: statusCb}, true, false, thumbnailPath, encrypt, webStreaming)
}

// DeleteFile - delete file from remote path
Expand Down
10 changes: 6 additions & 4 deletions wasmsdk/blobber.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ type BulkUploadOption struct {

ThumbnailBytes jsbridge.Bytes `json:"thumbnailBytes,omitempty"`
Encrypt bool `json:"encrypt,omitempty"`
webstreaming bool `json:"webstreaming,omitempty"`
IsUpdate bool `json:"isUpdate,omitempty"`
IsRepair bool `json:"isRepair,omitempty"`

Expand Down Expand Up @@ -409,6 +410,7 @@ func bulkUpload(jsonBulkUploadOptions string) ([]BulkUploadResult, error) {
o.ReadChunkFuncName,
o.FileSize,
o.ThumbnailBytes.Buffer,
o.webstreaming,
o.Encrypt,
o.IsUpdate,
o.IsRepair,
Expand All @@ -433,7 +435,7 @@ func bulkUpload(jsonBulkUploadOptions string) ([]BulkUploadResult, error) {
return results, nil
}

func uploadWithJsFuncs(allocationID, remotePath string, readChunkFuncName string, fileSize int64, thumbnailBytes []byte, encrypt, isUpdate, isRepair bool, numBlocks int, callbackFuncName string) (bool, error) {
func uploadWithJsFuncs(allocationID, remotePath string, readChunkFuncName string, fileSize int64, thumbnailBytes []byte, webStreaming, encrypt, isUpdate, isRepair bool, numBlocks int, callbackFuncName string) (bool, error) {

if len(allocationID) == 0 {
return false, RequiredArg("allocationID")
Expand Down Expand Up @@ -493,7 +495,7 @@ func uploadWithJsFuncs(allocationID, remotePath string, readChunkFuncName string
numBlocks = 100
}

ChunkedUpload, err := sdk.CreateChunkedUpload("/", allocationObj, fileMeta, fileReader, isUpdate, isRepair,
ChunkedUpload, err := sdk.CreateChunkedUpload("/", allocationObj, fileMeta, fileReader, isUpdate, isRepair, webStreaming,
sdk.WithThumbnail(thumbnailBytes),
sdk.WithEncrypt(encrypt),
sdk.WithStatusCallback(statusBar),
Expand All @@ -519,7 +521,7 @@ func uploadWithJsFuncs(allocationID, remotePath string, readChunkFuncName string
}

// upload upload file
func upload(allocationID, remotePath string, fileBytes, thumbnailBytes []byte, encrypt, isUpdate, isRepair bool, numBlocks int) (*FileCommandResponse, error) {
func upload(allocationID, remotePath string, fileBytes, thumbnailBytes []byte, webStreaming, encrypt, isUpdate, isRepair bool, numBlocks int) (*FileCommandResponse, error) {
if len(allocationID) == 0 {
return nil, RequiredArg("allocationID")
}
Expand Down Expand Up @@ -572,7 +574,7 @@ func upload(allocationID, remotePath string, fileBytes, thumbnailBytes []byte, e
numBlocks = 100
}

ChunkedUpload, err := sdk.CreateChunkedUpload("/", allocationObj, fileMeta, fileReader, isUpdate, isRepair,
ChunkedUpload, err := sdk.CreateChunkedUpload("/", allocationObj, fileMeta, fileReader, isUpdate, isRepair, webStreaming,
sdk.WithThumbnail(thumbnailBytes),
sdk.WithEncrypt(encrypt),
sdk.WithStatusCallback(statusBar),
Expand Down
2 changes: 2 additions & 0 deletions wasmsdk/demo/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ <h2>please download zcn.wasm from https://github.com/0chain/gosdk/releases/lates
file: file,
thumbnailBytes: await readBytes(file),//only for demo, don't upload original file as thumbnail in production
encrypt: false,
webstreaming: false,
isUpdate: false,
isRepair: false,
numBlocks: 100,
Expand Down Expand Up @@ -435,6 +436,7 @@ <h2>please download zcn.wasm from https://github.com/0chain/gosdk/releases/lates
file: file,
thumbnailBytes: await readBytes(file),//only for demo, don't upload original file as thumbnail in production
encrypt: true,
webstreaming: false,
isUpdate: false,
isRepair: false,
numBlocks: 100,
Expand Down
1 change: 1 addition & 0 deletions wasmsdk/demo/zcn.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ async function bulkUpload(options) {
fileSize: obj.file.size,
thumbnailBytes:obj.thumbnailBytes?obj.thumbnailBytes.toString():"",
encrypt:obj.encrypt,
encrypt:obj.webstreaming,
isUpdate:obj.isUpdate,
isRepair:obj.isRepair,
numBlocks:obj.numBlocks,
Expand Down
21 changes: 11 additions & 10 deletions zboxcore/sdk/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,14 +315,14 @@ func (a *Allocation) CanRename() bool {
func (a *Allocation) UpdateFile(workdir, localpath string, remotepath string,
status StatusCallback) error {

return a.StartChunkedUpload(workdir, localpath, remotepath, status, true, false, "", false)
return a.StartChunkedUpload(workdir, localpath, remotepath, status, true, false, "", false, false)
}

// UploadFile [Deprecated]please use CreateChunkedUpload
func (a *Allocation) UploadFile(workdir, localpath string, remotepath string,
status StatusCallback) error {

return a.StartChunkedUpload(workdir, localpath, remotepath, status, false, false, "", false)
return a.StartChunkedUpload(workdir, localpath, remotepath, status, false, false, "", false, false)
}

func (a *Allocation) CreateDir(remotePath string) error {
Expand Down Expand Up @@ -364,15 +364,15 @@ func (a *Allocation) RepairFile(localpath string, remotepath string,

idr, _ := homedir.Dir()
return a.StartChunkedUpload(idr, localpath, remotepath, status, false, true,
"", false)
"", false, false)
}

// UpdateFileWithThumbnail [Deprecated]please use CreateChunkedUpload
func (a *Allocation) UpdateFileWithThumbnail(workdir, localpath string, remotepath string,
thumbnailpath string, status StatusCallback) error {

return a.StartChunkedUpload(workdir, localpath, remotepath, status, true, false,
thumbnailpath, false)
thumbnailpath, false, false)
}

// UploadFileWithThumbnail [Deprecated]please use CreateChunkedUpload
Expand All @@ -381,29 +381,29 @@ func (a *Allocation) UploadFileWithThumbnail(workdir string, localpath string,
status StatusCallback) error {

return a.StartChunkedUpload(workdir, localpath, remotepath, status, false, false,
thumbnailpath, false)
thumbnailpath, false, false)
}

// EncryptAndUpdateFile [Deprecated]please use CreateChunkedUpload
func (a *Allocation) EncryptAndUpdateFile(workdir string, localpath string, remotepath string,
status StatusCallback) error {

return a.StartChunkedUpload(workdir, localpath, remotepath, status, true, false, "", true)
return a.StartChunkedUpload(workdir, localpath, remotepath, status, true, false, "", true, false)
}

// EncryptAndUploadFile [Deprecated]please use CreateChunkedUpload
func (a *Allocation) EncryptAndUploadFile(workdir string, localpath string, remotepath string,
status StatusCallback) error {

return a.StartChunkedUpload(workdir, localpath, remotepath, status, false, false, "", true)
return a.StartChunkedUpload(workdir, localpath, remotepath, status, false, false, "", true, false)
}

// EncryptAndUpdateFileWithThumbnail [Deprecated]please use CreateChunkedUpload
func (a *Allocation) EncryptAndUpdateFileWithThumbnail(workdir string, localpath string,
remotepath string, thumbnailpath string, status StatusCallback) error {

return a.StartChunkedUpload(workdir, localpath, remotepath, status, true, false,
thumbnailpath, true)
thumbnailpath, true, false)
}

// EncryptAndUploadFileWithThumbnail [Deprecated]please use CreateChunkedUpload
Expand All @@ -424,6 +424,7 @@ func (a *Allocation) EncryptAndUploadFileWithThumbnail(
false,
thumbnailpath,
true,
false,
)
}

Expand All @@ -434,7 +435,7 @@ func (a *Allocation) StartChunkedUpload(workdir, localPath string,
isRepair bool,
thumbnailPath string,
encryption bool,

webStreaming bool,
) error {

if !a.isInitialized() {
Expand Down Expand Up @@ -496,7 +497,7 @@ func (a *Allocation) StartChunkedUpload(workdir, localPath string,
ChunkedUpload, err := CreateChunkedUpload(workdir,
a, fileMeta, fileReader,
isUpdate, isRepair,
options...)
webStreaming, options...)
if err != nil {
return err
}
Expand Down
16 changes: 14 additions & 2 deletions zboxcore/sdk/chunked_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func CreateChunkedUpload(
workdir string, allocationObj *Allocation,
fileMeta FileMeta, fileReader io.Reader,
isUpdate, isRepair bool,
opts ...ChunkedUploadOption,
webStreaming bool, opts ...ChunkedUploadOption,
) (*ChunkedUpload, error) {

if allocationObj == nil {
Expand All @@ -99,6 +99,15 @@ func CreateChunkedUpload(
return nil, thrown.Throw(constants.ErrFileOptionNotPermitted, "file_option_not_permitted ")
}

if webStreaming {
newFileReader, newFileMeta, err := TranscodeWebStreaming(fileReader, fileMeta)
if err != nil {
return nil, thrown.New("upload_failed", err.Error())
}
fileMeta = *newFileMeta
fileReader = newFileReader
}

err := ValidateRemoteFileName(fileMeta.RemoteName)
if err != nil {
return nil, err
Expand Down Expand Up @@ -156,6 +165,7 @@ func CreateChunkedUpload(
chunkSize: DefaultChunkSize,
chunkNumber: 1,
encryptOnUpload: false,
webStreaming: false,

consensus: consensus,
uploadTimeOut: DefaultUploadTimeOut,
Expand Down Expand Up @@ -217,7 +227,7 @@ func CreateChunkedUpload(

su.fileHasher = CreateHasher(getShardSize(su.fileMeta.ActualSize, su.allocationObj.DataShards, su.encryptOnUpload))

// encrypt option has been chaned.upload it from scratch
// encrypt option has been changed. upload it from scratch
// chunkSize has been changed. upload it from scratch
if su.progress.EncryptOnUpload != su.encryptOnUpload || su.progress.ChunkSize != su.chunkSize {
su.progress = su.createUploadProgress()
Expand Down Expand Up @@ -318,6 +328,8 @@ type ChunkedUpload struct {

// encryptOnUpload encrypt data on upload or not.
encryptOnUpload bool
// webStreaming whether data has to be encoded.
webStreaming bool
// chunkSize how much bytes a chunk has. 64KB is default value.
chunkSize int64
// chunkNumber the number of chunks in a http upload request. 1 is default value
Expand Down
2 changes: 1 addition & 1 deletion zboxcore/sdk/chunked_upload_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func BenchmarkChunkedUpload(b *testing.B) {
RemotePath: "/test.txt",
}

chunkedUpload, err := CreateChunkedUpload("/tmp", a, fileMeta, reader, false, false)
chunkedUpload, err := CreateChunkedUpload("/tmp", a, fileMeta, reader, false, false, false)
if err != nil {
b.Fatal(err)
return
Expand Down
63 changes: 63 additions & 0 deletions zboxcore/sdk/chunked_upload_web_streaming.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package sdk

import (
"bufio"
"bytes"
"io"
"os/exec"
"path"
"strings"

thrown "github.com/0chain/errors"
"github.com/0chain/gosdk/zboxcore/logger"
)

// Converting the video file to fmp4 format for web streaming
func TranscodeWebStreaming(fileReader io.Reader, fileMeta FileMeta) (io.Reader, *FileMeta, error) {
var stdOut bytes.Buffer
var stdErr bytes.Buffer

args := []string{"-i", fileMeta.Path, "-g", "30", "-f", "mp4", "-movflags", "frag_keyframe+empty_moov", "pipe:1"}
cmdFfmpeg := exec.Command("ffmpeg", args...)

cmdFfmpeg.Stdout = bufio.NewWriter(&stdOut)
cmdFfmpeg.Stderr = bufio.NewWriter(&stdErr)

err := cmdFfmpeg.Run()

if err != nil {
logger.Logger.Error(err)
return nil, nil, thrown.New("Transcoding Failed: ", err.Error())
}

trascodedBufSlice := stdOut.Bytes()
transcodedFileReader := bytes.NewReader(trascodedBufSlice)

remoteName, remotePath := getRemoteNameAndRemotePath(fileMeta.RemoteName, fileMeta.RemotePath)

transcodedFileMeta := &FileMeta{
MimeType: "video/fmp4",
Path: fileMeta.Path,
ThumbnailPath: fileMeta.ThumbnailPath,
ActualHash: fileMeta.ActualHash,
ActualSize: int64(len(trascodedBufSlice)),
ActualThumbnailSize: fileMeta.ActualThumbnailSize,
ActualThumbnailHash: fileMeta.ActualThumbnailHash,
RemoteName: remoteName,
RemotePath: remotePath,
}

return transcodedFileReader, transcodedFileMeta, nil
}

func getRemoteNameAndRemotePath(remoteName string, remotePath string) (string, string) {
newRemotePath, newRemoteName := path.Split(remotePath)
newRemoteNameSlice := strings.Split(newRemoteName, ".")
if len(newRemoteNameSlice) > 0 {
newRemoteNameSlice = newRemoteNameSlice[:len(newRemoteNameSlice)-1]
}
newRemoteNameWithoutType := strings.Join(newRemoteNameSlice, ".")
newRemoteName = "raw." + newRemoteNameWithoutType + ".mp4"
newRemotePath = newRemotePath + newRemoteName
return newRemoteName, newRemotePath
}
2 changes: 1 addition & 1 deletion zboxcore/sdk/live_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (lu *LiveUpload) createClipsUpload(clipsIndex int, reader LiveUploadReader)
RemotePath: lu.liveMeta.RemotePath + "/" + reader.GetClipsFileName(clipsIndex),
}

return CreateChunkedUpload(lu.homedir, lu.allocationObj, fileMeta, reader, false, false,
return CreateChunkedUpload(lu.homedir, lu.allocationObj, fileMeta, reader, false, false, false,
WithEncrypt(lu.encryptOnUpload),
WithStatusCallback(lu.statusCallback()))
}

0 comments on commit f0d0889

Please sign in to comment.