Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YUNIKORN-2249] Add compression option to getQueueApplication API #757

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions pkg/webservice/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package webservice

import (
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -59,6 +61,7 @@
GroupNameMissing = "Group name is missing"
ApplicationDoesNotExists = "Application not found"
NodeDoesNotExists = "Node not found"
UnsupportedCompType = "Compression type not support"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "not supported"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be here at all. As I've repeatedly indicated we should never fail with an error, just fall back to no compression.

)

var allowedActiveStatusMsg string
Expand Down Expand Up @@ -153,6 +156,10 @@
w.Header().Set("Access-Control-Allow-Headers", "X-Requested-With,Content-Type,Accept,Origin")
}

func writeHeader(w http.ResponseWriter, key, val string) {
w.Header().Set(key, val)
}

func buildJSONErrorResponse(w http.ResponseWriter, detail string, code int) {
w.WriteHeader(code)
errorInfo := dao.NewYAPIError(nil, code, detail)
Expand Down Expand Up @@ -455,7 +462,7 @@
}

if err := json.NewEncoder(w).Encode(result); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)

Check warning on line 465 in pkg/webservice/handlers.go

View check run for this annotation

Codecov / codecov/patch

pkg/webservice/handlers.go#L465

Added line #L465 was not covered by tests
}
}

Expand Down Expand Up @@ -668,7 +675,7 @@
}
queueDao := queue.GetPartitionQueueDAOInfo(r.URL.Query().Has("subtree"))
if err := json.NewEncoder(w).Encode(queueDao); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)

Check warning on line 678 in pkg/webservice/handlers.go

View check run for this annotation

Codecov / codecov/patch

pkg/webservice/handlers.go#L678

Added line #L678 was not covered by tests
}
}

Expand Down Expand Up @@ -745,9 +752,13 @@
for _, app := range queue.GetCopyOfApps() {
appsDao = append(appsDao, getApplicationDAO(app))
}
if checkHeader(r.Header, "Accept-Encoding", "gzip") {
compress(w, appsDao)
return

Check warning on line 757 in pkg/webservice/handlers.go

View check run for this annotation

Codecov / codecov/patch

pkg/webservice/handlers.go#L756-L757

Added lines #L756 - L757 were not covered by tests
}

if err := json.NewEncoder(w).Encode(appsDao); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)

Check warning on line 761 in pkg/webservice/handlers.go

View check run for this annotation

Codecov / codecov/patch

pkg/webservice/handlers.go#L761

Added line #L761 was not covered by tests
}
}

Expand Down Expand Up @@ -1175,8 +1186,8 @@
if err := enc.Encode(dao.YunikornID{
InstanceUUID: schedulerContext.GetUUID(),
}); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
return

Check warning on line 1190 in pkg/webservice/handlers.go

View check run for this annotation

Codecov / codecov/patch

pkg/webservice/handlers.go#L1189-L1190

Added lines #L1189 - L1190 were not covered by tests
}
f.Flush()

Expand Down Expand Up @@ -1207,12 +1218,62 @@
}

if err := enc.Encode(e); err != nil {
log.Log(log.REST).Error("Marshalling error",
zap.String("host", r.Host))
buildJSONErrorResponse(w, err.Error(), http.StatusOK) // status code is 200 at this point, cannot be changed
return

Check warning on line 1224 in pkg/webservice/handlers.go

View check run for this annotation

Codecov / codecov/patch

pkg/webservice/handlers.go#L1221-L1224

Added lines #L1221 - L1224 were not covered by tests
}
f.Flush()
}
}
}

func checkHeader(h http.Header, key string, value string) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we return error if users use unsupported compression?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may not be necessary. Skipping it is a solution when dealing with unsupported compression types in requests. Do you think it's essential for users to require this one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please take a look https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/406

It seems to me following the standard error can avoid the misunderstanding in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me following the standard error can avoid the misunderstanding in the future.

NO. We should never return an error to the client if it request an encoding we don't understand. Accept-Encoding: identity is the default, which means the identity encoding is always allowed. Therefore, if an unacceptable encoding is requested, we simply send the request uncompressed and without a Content-Encoding: gzip header.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NO. We should never return an error to the client if it request an encoding we don't understand. Accept-Encoding: identity is the default, which means the identity encoding is always allowed. Therefore, if an unacceptable encoding is requested, we simply send the request uncompressed and without a Content-Encoding: gzip header.

That is an acceptable way to me, but I'd like to have more discussion for my own education :)

Should we support full representation of Accept-Encoding ( weight and coding )? If yes, we need to consider the Accept-Encoding: gzip;q=1.0, identity; q=0.

Or we ignore the weight and only check the existence of gzip from the Accept-Encoding. This is the solution adopted by this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a common saying in network protocol design: Be liberal in what you accept, strict in what you produce. In other words, we can get away with just checking for the substring gzip in Accept-Encoding, and produce exactly Content-Encoding: gzip in that case. If we choose not to compress due to size, or gzip was not requested, then we use the standard identity version. Weights are not really necessary; yes, they are part of the spec, but the client is only giving its preference; we do not have to honor it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see a very simple function like: GetCompressedWriter(headers, writer) (writer) that checks for the gzip header and wraps the given writer with a gzip-compressed one, else returns the original writer. Then in any endpoint we want to (potentially compress), we just replace our writer with that one instead.

Copy link
Contributor

@wilfred-s wilfred-s Mar 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That will cause a leak as the gzip writer must be closed for it not to leak. Calling close on the http.ResponseWriter is not possible so we need more. Probably the easiest solution is to use the same solution as we have for the loggingHandler(). We wrap the compression choice in a handler function, which then gets wrapped in the logging handler. That means we have it all in one place and expand on it with compressor pooling or other things in the future.

Example code, which is not complete but gives some idea on how we can close the compressor. That can be expanded to use a sync.Pool to not recreate the zip writer each time and just reset it before use.

type gzipResponseWriter struct {
	io.Writer
	http.ResponseWriter
}

func (w gzipResponseWriter) Write(b []byte) (int, error) {
	return w.Writer.Write(b)
}

func makeGzipHandler(fn http.HandlerFunc) http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		if !strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
			fn(w, r)
			return
		}
		w.Header().Set("Content-Encoding", "gzip")
                w.Header().Del("Content-Length")
		gz := gzip.NewWriter(w)
		defer gz.Close()
		gzr := gzipResponseWriter{Writer: gz, ResponseWriter: w}
		fn(gzr, r)
	}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks to everyone for all the advice. I have got some questions about what @wilfred-s said.
In the provided example, the compress handler would be wrapped within the logging handler. So we first see which API is being called, and then decide whether to use the gzip handler, since we haven't decided to compress all APIs yet? Is there any misunderstanding?

values := h.Values(key)
for _, v := range values {
v2 := strings.Split(v, ",")
for _, item := range v2 {
item = strings.TrimSpace(item)
if item == value {
return true

Check warning on line 1238 in pkg/webservice/handlers.go

View check run for this annotation

Codecov / codecov/patch

pkg/webservice/handlers.go#L1234-L1238

Added lines #L1234 - L1238 were not covered by tests
}
}
}
return false
}

func compress(w http.ResponseWriter, data any) {
response, err := json.Marshal(data)
if err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
return

Check warning on line 1249 in pkg/webservice/handlers.go

View check run for this annotation

Codecov / codecov/patch

pkg/webservice/handlers.go#L1248-L1249

Added lines #L1248 - L1249 were not covered by tests
}

// don't compress the data if it is smaller than MTU size
if len(response) < 1500 {
targetoee marked this conversation as resolved.
Show resolved Hide resolved
if err = json.NewEncoder(w).Encode(data); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)

Check warning on line 1255 in pkg/webservice/handlers.go

View check run for this annotation

Codecov / codecov/patch

pkg/webservice/handlers.go#L1255

Added line #L1255 was not covered by tests
}
return
}

var compressedData bytes.Buffer
writer := gzip.NewWriter(&compressedData)
_, err = writer.Write(response)
if err != nil {
_ = writer.Close()
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
targetoee marked this conversation as resolved.
Show resolved Hide resolved
return

Check warning on line 1266 in pkg/webservice/handlers.go

View check run for this annotation

Codecov / codecov/patch

pkg/webservice/handlers.go#L1264-L1266

Added lines #L1264 - L1266 were not covered by tests
}

err = writer.Close()
if err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
return

Check warning on line 1272 in pkg/webservice/handlers.go

View check run for this annotation

Codecov / codecov/patch

pkg/webservice/handlers.go#L1271-L1272

Added lines #L1271 - L1272 were not covered by tests
}

writeHeader(w, "Content-Encoding", "gzip")
if _, err = w.Write(compressedData.Bytes()); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)

Check warning on line 1277 in pkg/webservice/handlers.go

View check run for this annotation

Codecov / codecov/patch

pkg/webservice/handlers.go#L1277

Added line #L1277 was not covered by tests
}
}
60 changes: 60 additions & 0 deletions pkg/webservice/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@
package webservice

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
"reflect"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -2600,3 +2604,59 @@ func NewResponseRecorderWithDeadline() *ResponseRecorderWithDeadline {
ResponseRecorder: httptest.NewRecorder(),
}
}

func TestCompressQueueApplications(t *testing.T) {
var appsDao []*dao.ApplicationDAOInfo

// case 1: data size is smaller than MTU size, so compression step is skipped
for i := 0; i < 2; i++ {
appName := "app-" + strconv.Itoa(i)
app := newApplication(appName, "part-01", "queue-1", rmID, security.UserGroup{})
appsDao = append(appsDao, getApplicationDAO(app))
}

resp := &MockResponseWriter{}
compress(resp, appsDao)

var decodedData []*dao.ApplicationDAOInfo
err := json.Unmarshal(resp.outputBytes, &decodedData)
assert.NilError(t, err, "Error when unmarshal data.")

for i := range decodedData {
assert.Equal(t, appsDao[i].ApplicationID, decodedData[i].ApplicationID)
assert.Equal(t, appsDao[i].Partition, decodedData[i].Partition)
assert.Equal(t, appsDao[i].QueueName, decodedData[i].QueueName)
assert.Equal(t, appsDao[i].SubmissionTime, decodedData[i].SubmissionTime)
assert.Equal(t, appsDao[i].User, decodedData[i].User)
assert.Equal(t, appsDao[i].Groups[0], decodedData[i].Groups[0])
}

// case 2: data size is greater than MTU size, so do the compression
for i := 2; i < 10; i++ {
appName := "app-" + strconv.Itoa(i)
app := newApplication(appName, "part-01", "queue-1", rmID, security.UserGroup{})
appsDao = append(appsDao, getApplicationDAO(app))
}
compress(resp, appsDao)

buf := bytes.NewBuffer(resp.outputBytes)
gzipReader, err := gzip.NewReader(buf)
assert.NilError(t, err, "Error while decompressing data.")
err = gzipReader.Close()
assert.NilError(t, err, "Error when close gzip reader.")

uncompressedData, err := io.ReadAll(gzipReader)
assert.NilError(t, err, "Error when read decoded data.")

err = json.Unmarshal(uncompressedData, &decodedData)
assert.NilError(t, err, "Error when unmarshal decoded data.")

for i := range decodedData {
assert.Equal(t, appsDao[i].ApplicationID, decodedData[i].ApplicationID)
assert.Equal(t, appsDao[i].Partition, decodedData[i].Partition)
assert.Equal(t, appsDao[i].QueueName, decodedData[i].QueueName)
assert.Equal(t, appsDao[i].SubmissionTime, decodedData[i].SubmissionTime)
assert.Equal(t, appsDao[i].User, decodedData[i].User)
assert.Equal(t, appsDao[i].Groups[0], decodedData[i].Groups[0])
}
}
Loading