Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Harvest should support recording and replaying HTTP requests #3235

Merged
merged 3 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions cmd/collectors/keyperf/keyperf.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ import (
)

const (
latencyIoReqd = 10
latencyIoReqd = 10
numRecordsToSave = 60 // Number of records to save when using the recorder
)

type KeyPerf struct {
*rest.Rest // provides: AbstractCollector, Client, Object, Query, TemplateFn, TemplateType
perfProp *perfProp
*rest.Rest // provides: AbstractCollector, Client, Object, Query, TemplateFn, TemplateType
perfProp *perfProp
pollDataCalls uint8
}

type counter struct {
Expand Down Expand Up @@ -232,7 +234,16 @@ func (kp *KeyPerf) PollData() (map[string]*matrix.Matrix, error) {
return nil, errs.New(errs.ErrConfig, "empty url")
}

perfRecords, err = kp.GetRestData(href)
kp.pollDataCalls++
if kp.pollDataCalls > numRecordsToSave {
kp.pollDataCalls = 0
}

headers := map[string]string{
"From": strconv.Itoa(int(kp.pollDataCalls)),
}

perfRecords, err = kp.GetRestData(href, headers)
if err != nil {
return nil, fmt.Errorf("failed to fetch href=%s %w", href, err)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/collectors/rest/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,13 +677,13 @@ func (r *Rest) HandleResults(mat *matrix.Matrix, result []gjson.Result, prop *pr
return count, numPartials
}

func (r *Rest) GetRestData(href string) ([]gjson.Result, error) {
func (r *Rest) GetRestData(href string, headers ...map[string]string) ([]gjson.Result, error) {
r.Logger.Debug("Fetching data", slog.String("href", href))
if href == "" {
return nil, errs.New(errs.ErrConfig, "empty url")
}

result, err := rest.FetchAll(r.Client, href)
result, err := rest.FetchAll(r.Client, href, headers...)
if err != nil {
return r.handleError(err)
}
Expand Down
40 changes: 29 additions & 11 deletions cmd/collectors/restperf/restperf.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,19 @@ const (
objWorkloadClass = "user_defined|system_defined"
objWorkloadVolumeClass = "autovolume"
timestampMetricName = "timestamp"
numRecordsToSave = 60 // Number of records to save when using the recorder
)

var (
constituentRegex = regexp.MustCompile(`^(.*)__(\d{4})$`)
constituentRegex = regexp.MustCompile(`^(.*)__(\d{4})$`)
qosQuery = "api/cluster/counter/tables/qos"
qosVolumeQuery = "api/cluster/counter/tables/qos_volume"
qosDetailQuery = "api/cluster/counter/tables/qos_detail"
qosDetailVolumeQuery = "api/cluster/counter/tables/qos_detail_volume"
qosWorkloadQuery = "api/storage/qos/workloads"
workloadDetailMetrics = []string{"resource_latency"}
)

var qosQuery = "api/cluster/counter/tables/qos"
var qosVolumeQuery = "api/cluster/counter/tables/qos_volume"
var qosDetailQuery = "api/cluster/counter/tables/qos_detail"
var qosDetailVolumeQuery = "api/cluster/counter/tables/qos_detail_volume"
var qosWorkloadQuery = "api/storage/qos/workloads"

var workloadDetailMetrics = []string{"resource_latency"}

var qosQueries = map[string]string{
qosQuery: qosQuery,
qosVolumeQuery: qosVolumeQuery,
Expand All @@ -70,6 +69,8 @@ type RestPerf struct {
perfProp *perfProp
archivedMetrics map[string]*rest2.Metric // Keeps metric definitions that are not found in the counter schema. These metrics may be available in future ONTAP versions.
hasInstanceSchedule bool
pollInstanceCalls uint8
pollDataCalls uint8
}

type counter struct {
Expand Down Expand Up @@ -720,7 +721,15 @@ func (r *RestPerf) PollData() (map[string]*matrix.Matrix, error) {
return nil, errs.New(errs.ErrConfig, "empty url")
}

err = rest.FetchRestPerfData(r.Client, href, &perfRecords)
r.pollDataCalls++
if r.pollDataCalls > numRecordsToSave {
r.pollDataCalls = 0
}

headers := map[string]string{
"From": strconv.Itoa(int(r.pollDataCalls)),
}
err = rest.FetchRestPerfData(r.Client, href, &perfRecords, headers)
if err != nil {
return nil, fmt.Errorf("failed to fetch href=%s %w", href, err)
}
Expand Down Expand Up @@ -1475,6 +1484,15 @@ func (r *RestPerf) PollInstance() (map[string]*matrix.Matrix, error) {
}
}

r.pollInstanceCalls++
if r.pollInstanceCalls > numRecordsToSave/3 {
r.pollInstanceCalls = 0
}

headers := map[string]string{
"From": strconv.Itoa(int(r.pollInstanceCalls)),
}

href := rest.NewHrefBuilder().
APIPath(dataQuery).
Fields([]string{fields}).
Expand All @@ -1490,7 +1508,7 @@ func (r *RestPerf) PollInstance() (map[string]*matrix.Matrix, error) {

apiT := time.Now()
r.Client.Metadata.Reset()
records, err = rest.FetchAll(r.Client, href)
records, err = rest.FetchAll(r.Client, href, headers)
if err != nil {
return r.handleError(err, href)
}
Expand Down
55 changes: 39 additions & 16 deletions cmd/collectors/zapiperf/zapiperf.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,25 +70,28 @@ const (
objWorkloadVolumeClass = "autovolume"
BILLION = 1_000_000_000
timestampMetricName = "timestamp"
numRecordsToSave = 60 // Number of records to save when using the recorder
)

var workloadDetailMetrics = []string{"resource_latency"}

type ZapiPerf struct {
*zapi.Zapi // provides: AbstractCollector, Client, Object, Query, TemplateFn, TemplateType
object string
filter string
batchSize int
latencyIoReqd int
instanceKeys []string
instanceLabels map[string]string
histogramLabels map[string][]string
scalarCounters []string
qosLabels map[string]string
isCacheEmpty bool
keyName string
keyNameIndex int
testFilePath string // Used only from unit test
*zapi.Zapi // provides: AbstractCollector, Client, Object, Query, TemplateFn, TemplateType
object string
filter string
batchSize int
latencyIoReqd int
instanceKeys []string
instanceLabels map[string]string
histogramLabels map[string][]string
scalarCounters []string
qosLabels map[string]string
isCacheEmpty bool
keyName string
keyNameIndex int
testFilePath string // Used only from unit test
pollDataCalls uint8
pollInstanceCalls uint8
}

func init() {
Expand Down Expand Up @@ -483,7 +486,17 @@ func (z *ZapiPerf) PollData() (map[string]*matrix.Matrix, error) {
return nil, err
}

response, rd, pd, err := z.Client.InvokeWithTimers(z.testFilePath)
z.pollDataCalls++
if z.pollDataCalls > numRecordsToSave {
z.pollDataCalls = 0
}

headers := map[string]string{
"From": strconv.Itoa(int(z.pollDataCalls)),
}

response, rd, pd, err := z.Client.InvokeWithTimers(z.testFilePath, headers)

if err != nil {
errMsg := strings.ToLower(err.Error())
// if ONTAP complains about batch size, use a smaller batch size
Expand Down Expand Up @@ -1621,7 +1634,17 @@ func (z *ZapiPerf) PollInstance() (map[string]*matrix.Matrix, error) {

for {
apiT = time.Now()
responseData, err := z.Client.InvokeBatchRequest(request, batchTag, z.testFilePath)

z.pollInstanceCalls++
if z.pollInstanceCalls > numRecordsToSave/3 {
z.pollInstanceCalls = 0
}

headers := map[string]string{
"From": strconv.Itoa(int(z.pollInstanceCalls)),
}

responseData, err := z.Client.InvokeBatchRequest(request, batchTag, z.testFilePath, headers)

if err != nil {
if errors.Is(err, errs.ErrAPIRequestRejected) {
Expand Down
13 changes: 10 additions & 3 deletions cmd/tools/rest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (c *Client) printRequestAndResponse(req string, response []byte) {
}

// GetPlainRest makes a REST request to the cluster and returns a json response as a []byte
func (c *Client) GetPlainRest(request string, encodeURL bool) ([]byte, error) {
func (c *Client) GetPlainRest(request string, encodeURL bool, headers ...map[string]string) ([]byte, error) {
var err error
if strings.Index(request, "/") == 0 {
request = request[1:]
Expand All @@ -131,6 +131,13 @@ func (c *Client) GetPlainRest(request string, encodeURL bool) ([]byte, error) {
return nil, err
}
c.request.Header.Set("Accept", "application/json")

for _, hs := range headers {
for k, v := range hs {
c.request.Header.Set(k, v)
}
}

pollerAuth, err := c.auth.GetPollerAuth()
if err != nil {
return nil, err
Expand All @@ -156,8 +163,8 @@ func (c *Client) GetPlainRest(request string, encodeURL bool) ([]byte, error) {
}

// GetRest makes a REST request to the cluster and returns a json response as a []byte
func (c *Client) GetRest(request string) ([]byte, error) {
return c.GetPlainRest(request, true)
func (c *Client) GetRest(request string, headers ...map[string]string) ([]byte, error) {
return c.GetPlainRest(request, true, headers...)
}

func (c *Client) invokeWithAuthRetry() ([]byte, error) {
Expand Down
12 changes: 6 additions & 6 deletions cmd/tools/rest/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,14 +408,14 @@ func FetchForCli(client *Client, href string, records *[]any, downloadAll bool,

// FetchAll collects all records.
// If you want to limit the number of records returned, use FetchSome.
func FetchAll(client *Client, href string) ([]gjson.Result, error) {
func FetchAll(client *Client, href string, headers ...map[string]string) ([]gjson.Result, error) {
var (
records []gjson.Result
result []gjson.Result
err error
)

err = fetchAll(client, href, &records)
err = fetchAll(client, href, &records, headers...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -463,8 +463,8 @@ func FetchAnalytics(client *Client, href string) ([]gjson.Result, gjson.Result,
return result, *analytics, nil
}

func fetchAll(client *Client, href string, records *[]gjson.Result) error {
getRest, err := client.GetRest(href)
func fetchAll(client *Client, href string, records *[]gjson.Result, headers ...map[string]string) error {
getRest, err := client.GetRest(href, headers...)
if err != nil {
return fmt.Errorf("error making request %w", err)
}
Expand Down Expand Up @@ -634,8 +634,8 @@ func fetchAnalytics(client *Client, href string, records *[]gjson.Result, analyt
}

// FetchRestPerfData This method is used in PerfRest collector. This method returns timestamp per batch
func FetchRestPerfData(client *Client, href string, perfRecords *[]PerfRecord) error {
getRest, err := client.GetRest(href)
func FetchRestPerfData(client *Client, href string, perfRecords *[]PerfRecord, headers ...map[string]string) error {
getRest, err := client.GetRest(href, headers...)
if err != nil {
return fmt.Errorf("error making request %w", err)
}
Expand Down
24 changes: 15 additions & 9 deletions pkg/api/ontapi/zapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (c *Client) Invoke(testFilePath string) (*node.Node, error) {
// Else -> will issue API requests in series, once there
// are no more instances returned by the server, returned results will be nil
// Use the returned tag for subsequent calls to this method
func (c *Client) InvokeBatchRequest(request *node.Node, tag string, testFilePath string) (Response, error) {
func (c *Client) InvokeBatchRequest(request *node.Node, tag string, testFilePath string, headers ...map[string]string) (Response, error) {
if testFilePath != "" && tag != "" {
testData, err := tree.ImportXML(testFilePath)
if err != nil {
Expand All @@ -333,13 +333,13 @@ func (c *Client) InvokeBatchRequest(request *node.Node, tag string, testFilePath
return Response{Result: testData, Tag: "", Rd: time.Second, Pd: time.Second}, nil
}
// wasteful of course, need to rewrite later @TODO
results, tag, rd, pd, err := c.InvokeBatchWithTimers(request, tag)
results, tag, rd, pd, err := c.InvokeBatchWithTimers(request, tag, headers...)
return Response{Result: results, Tag: tag, Rd: rd, Pd: pd}, err
}

// InvokeBatchWithTimers does the same as InvokeBatchRequest, but it also
// returns API time and XML parse time
func (c *Client) InvokeBatchWithTimers(request *node.Node, tag string) (*node.Node, string, time.Duration, time.Duration, error) {
func (c *Client) InvokeBatchWithTimers(request *node.Node, tag string, headers ...map[string]string) (*node.Node, string, time.Duration, time.Duration, error) {

var (
results *node.Node
Expand All @@ -360,7 +360,7 @@ func (c *Client) InvokeBatchWithTimers(request *node.Node, tag string) (*node.No
return nil, "", rd, pd, err
}

if results, rd, pd, err = c.invokeWithAuthRetry(true); err != nil {
if results, rd, pd, err = c.invokeWithAuthRetry(true, headers...); err != nil {
return nil, "", rd, pd, err
}

Expand Down Expand Up @@ -397,18 +397,18 @@ func (c *Client) InvokeRequest(request *node.Node) (*node.Node, error) {
// Else -> invokes the request and returns parsed XML response and timers:
// API wait time and XML parse time.
// This method should only be called after building the request
func (c *Client) InvokeWithTimers(testFilePath string) (*node.Node, time.Duration, time.Duration, error) {
func (c *Client) InvokeWithTimers(testFilePath string, headers ...map[string]string) (*node.Node, time.Duration, time.Duration, error) {
if testFilePath != "" {
testData, err := tree.ImportXML(testFilePath)
if err != nil {
return nil, 0, 0, err
}
return testData, 0, 0, nil
}
return c.invokeWithAuthRetry(true)
return c.invokeWithAuthRetry(true, headers...)
}

func (c *Client) invokeWithAuthRetry(withTimers bool) (*node.Node, time.Duration, time.Duration, error) {
func (c *Client) invokeWithAuthRetry(withTimers bool, headers ...map[string]string) (*node.Node, time.Duration, time.Duration, error) {
var buffer bytes.Buffer
pollerAuth, err := c.auth.GetPollerAuth()
if err != nil {
Expand All @@ -420,7 +420,7 @@ func (c *Client) invokeWithAuthRetry(withTimers bool) (*node.Node, time.Duration
buffer = *c.buffer
}

resp, t1, t2, err := c.invoke(withTimers)
resp, t1, t2, err := c.invoke(withTimers, headers...)

if err != nil {
var he errs.HarvestError
Expand Down Expand Up @@ -448,7 +448,7 @@ func (c *Client) invokeWithAuthRetry(withTimers bool) (*node.Node, time.Duration
}

// invokes the request that has been built with one of the BuildRequest* methods
func (c *Client) invoke(withTimers bool) (*node.Node, time.Duration, time.Duration, error) {
func (c *Client) invoke(withTimers bool, headers ...map[string]string) (*node.Node, time.Duration, time.Duration, error) {

var (
root, result *node.Node
Expand Down Expand Up @@ -478,6 +478,12 @@ func (c *Client) invoke(withTimers bool) (*node.Node, time.Duration, time.Durati
zapiReq = c.buffer.String()
}

for _, hs := range headers {
cgrinds marked this conversation as resolved.
Show resolved Hide resolved
for k, v := range hs {
c.request.Header.Set(k, v)
}
}

if response, err = c.client.Do(c.request); err != nil {
return result, responseT, parseT, errs.New(errs.ErrConnection, err.Error())
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/auth/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ func recording(poller *conf.Poller, transport *http.Transport) http.RoundTripper
response *http.Response
)

_ = os.MkdirAll(basePath, 0750)
err = os.MkdirAll(basePath, 0750)
if err != nil {
return nil, fmt.Errorf("problem while creating directories=%s transport: %w", basePath, err)
}
b, err := DumpRequest(req, true)
if err != nil {
return nil, err
Expand Down