Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ByteStream consumer can write to interface{} #273

Merged
merged 1 commit into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 91 additions & 40 deletions bytestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,16 @@ type byteStreamOpts struct {
Close bool
}

// ByteStreamConsumer creates a consumer for byte streams,
// takes a Writer/BinaryUnmarshaler interface or binary slice by reference,
// and reads from the provided reader
// ByteStreamConsumer creates a consumer for byte streams.
//
// The consumer consumes from a provided reader into the data passed by reference.
//
// Supported output underlying types and interfaces, prioritized in this order:
// - io.ReaderFrom (for maximum control)
// - io.Writer (performs io.Copy)
// - encoding.BinaryUnmarshaler
// - *string
// - *[]byte
func ByteStreamConsumer(opts ...byteStreamOpt) Consumer {
var vals byteStreamOpts
for _, opt := range opts {
Expand All @@ -51,45 +58,70 @@ func ByteStreamConsumer(opts ...byteStreamOpt) Consumer {
if reader == nil {
return errors.New("ByteStreamConsumer requires a reader") // early exit
}
if data == nil {
return errors.New("nil destination for ByteStreamConsumer")
}

closer := defaultCloser
if vals.Close {
if cl, ok := reader.(io.Closer); ok {
if cl, isReaderCloser := reader.(io.Closer); isReaderCloser {
closer = cl.Close
}
}
defer func() {
_ = closer()
}()

if wrtr, ok := data.(io.Writer); ok {
_, err := io.Copy(wrtr, reader)
if readerFrom, isReaderFrom := data.(io.ReaderFrom); isReaderFrom {
_, err := readerFrom.ReadFrom(reader)
return err
}

buf := new(bytes.Buffer)
if writer, isDataWriter := data.(io.Writer); isDataWriter {
_, err := io.Copy(writer, reader)
return err
}

// buffers input before writing to data
var buf bytes.Buffer
_, err := buf.ReadFrom(reader)
if err != nil {
return err
}
b := buf.Bytes()

if bu, ok := data.(encoding.BinaryUnmarshaler); ok {
return bu.UnmarshalBinary(b)
}
switch destinationPointer := data.(type) {
case encoding.BinaryUnmarshaler:
return destinationPointer.UnmarshalBinary(b)
case *any:
switch (*destinationPointer).(type) {
case string:
*destinationPointer = string(b)

return nil

case []byte:
*destinationPointer = b

if data != nil {
if str, ok := data.(*string); ok {
*str = string(b)
return nil
}
}
default:
fredbi marked this conversation as resolved.
Show resolved Hide resolved
// check for the underlying type to be pointer to []byte or string,
if ptr := reflect.TypeOf(data); ptr.Kind() != reflect.Ptr {
return errors.New("destination must be a pointer")
}

if t := reflect.TypeOf(data); data != nil && t.Kind() == reflect.Ptr {
v := reflect.Indirect(reflect.ValueOf(data))
if t = v.Type(); t.Kind() == reflect.Slice && t.Elem().Kind() == reflect.Uint8 {
t := v.Type()

switch {
case t.Kind() == reflect.Slice && t.Elem().Kind() == reflect.Uint8:
v.SetBytes(b)
return nil

case t.Kind() == reflect.String:
v.SetString(string(b))
return nil
}
}

Expand All @@ -98,68 +130,87 @@ func ByteStreamConsumer(opts ...byteStreamOpt) Consumer {
})
}

// ByteStreamProducer creates a producer for byte streams,
// takes a Reader/BinaryMarshaler interface or binary slice,
// and writes to a writer (essentially a pipe)
// ByteStreamProducer creates a producer for byte streams.
//
// The producer takes input data then writes to an output writer (essentially as a pipe).
//
// Supported input underlying types and interfaces, prioritized in this order:
// - io.WriterTo (for maximum control)
// - io.Reader (performs io.Copy). A ReadCloser is closed before exiting.
// - encoding.BinaryMarshaler
// - error (writes as a string)
// - []byte
// - string
// - struct, other slices: writes as JSON
func ByteStreamProducer(opts ...byteStreamOpt) Producer {
var vals byteStreamOpts
for _, opt := range opts {
opt(&vals)
}

return ProducerFunc(func(writer io.Writer, data interface{}) error {
if writer == nil {
return errors.New("ByteStreamProducer requires a writer") // early exit
}
if data == nil {
return errors.New("nil destination for ByteStreamProducer")
}

closer := defaultCloser
if vals.Close {
if cl, ok := writer.(io.Closer); ok {
if cl, isWriterCloser := writer.(io.Closer); isWriterCloser {
closer = cl.Close
}
}
defer func() {
_ = closer()
}()

if rc, ok := data.(io.ReadCloser); ok {
if rc, isDataCloser := data.(io.ReadCloser); isDataCloser {
defer rc.Close()
}

if rdr, ok := data.(io.Reader); ok {
_, err := io.Copy(writer, rdr)
switch origin := data.(type) {
case io.WriterTo:
_, err := origin.WriteTo(writer)
return err

case io.Reader:
_, err := io.Copy(writer, origin)
return err
}

if bm, ok := data.(encoding.BinaryMarshaler); ok {
bytes, err := bm.MarshalBinary()
case encoding.BinaryMarshaler:
bytes, err := origin.MarshalBinary()
if err != nil {
return err
}

_, err = writer.Write(bytes)
return err
}

if data != nil {
if str, ok := data.(string); ok {
_, err := writer.Write([]byte(str))
return err
}

if e, ok := data.(error); ok {
_, err := writer.Write([]byte(e.Error()))
return err
}
case error:
_, err := writer.Write([]byte(origin.Error()))
return err

default:
v := reflect.Indirect(reflect.ValueOf(data))
if t := v.Type(); t.Kind() == reflect.Slice && t.Elem().Kind() == reflect.Uint8 {
t := v.Type()

switch {
case t.Kind() == reflect.Slice && t.Elem().Kind() == reflect.Uint8:
_, err := writer.Write(v.Bytes())
return err
}
if t := v.Type(); t.Kind() == reflect.Struct || t.Kind() == reflect.Slice {

case t.Kind() == reflect.String:
_, err := writer.Write([]byte(v.String()))
return err

case t.Kind() == reflect.Struct || t.Kind() == reflect.Slice:
b, err := swag.WriteJSON(data)
if err != nil {
return err
}

_, err = writer.Write(b)
return err
}
Expand Down
Loading
Loading