Skip to content

Commit

Permalink
query: increase logs to find deadlock issue.
Browse files Browse the repository at this point in the history
  • Loading branch information
ziggie1984 committed Dec 13, 2023
1 parent 42a196f commit ca5f91e
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 4 deletions.
13 changes: 10 additions & 3 deletions query/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package query

import (
"errors"
"fmt"
"time"

"github.com/btcsuite/btcd/wire"
Expand Down Expand Up @@ -43,6 +44,12 @@ func (q *queryJob) Index() uint64 {
return q.index
}

// String returns the string representation of the queryJob code.
func (q *queryJob) String() string {
return fmt.Sprintf("QueryJob(index=%v): tries=%v, timeout=%v, +"+
"request_msg: %v", q.index, q.tries, q.timeout, q.Req)
}

// jobResult is the final result of the worker's handling of the queryJob.
type jobResult struct {
job *queryJob
Expand Down Expand Up @@ -96,7 +103,7 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
select {
// Poll a new job from the nextJob channel.
case job = <-w.nextJob:
log.Tracef("Worker %v picked up job with index %v",
log.Infof("Worker %v picked up job with index %v",
peer.Addr(), job.Index())

// Ignore any message received while not working on anything.
Expand Down Expand Up @@ -154,7 +161,7 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
job.Req, resp, peer.Addr(),
)

log.Tracef("Worker %v handled msg %T while "+
log.Infof("Worker %v handled msg %T while "+
"waiting for response to %T (job=%v). "+
"Finished=%v, progressed=%v",
peer.Addr(), resp, job.Req, job.Index(),
Expand Down Expand Up @@ -189,7 +196,7 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
// The query did experience a timeout and will
// be given to someone else.
jobErr = ErrQueryTimeout
log.Tracef("Worker %v timeout for request %T "+
log.Infof("Worker %v timeout for request %T "+
"with job index %v", peer.Addr(),
job.Req, job.Index())

Expand Down
17 changes: 16 additions & 1 deletion query/workmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,15 @@ Loop:
delete(currentQueries, result.job.index)
batch := currentBatches[batchNum]

log.Infof("Job(%v) received a result=%v, "+
"batchNum=%v, batch=%v, pendingQueries=%v, "+
"pendingBatches=%v", result.job, result,
batchNum, batch, len(currentQueries),
len(currentBatches))

log.Infof("Batch info: %v,%v,%v", batch.maxRetries,
batch.timeout, batch.noRetryMax)

switch {
// If the query ended because it was canceled, drop it.
case result.err == ErrJobCanceled:
Expand Down Expand Up @@ -431,9 +440,15 @@ Loop:
// Add all new queries in the batch to our work queue,
// with priority given by the order they were
// scheduled.
log.Debugf("Adding new batch(%d) of %d queries to "+
log.Infof("Adding new batch(%d) of %d queries to "+
"work queue", batchIndex, len(batch.requests))

log.Infof("Current queries to manage %d: %v",
len(currentQueries), currentQueries)

log.Infof("Current batches to manage %d: %v",
len(currentBatches), currentBatches)

for _, q := range batch.requests {
heap.Push(work, &queryJob{
index: queryIndex,
Expand Down

0 comments on commit ca5f91e

Please sign in to comment.