From ca5f91e88713e792f599ad5bc6f152d9664c135e Mon Sep 17 00:00:00 2001 From: ziggie Date: Sun, 10 Dec 2023 14:06:15 +0100 Subject: [PATCH] query: increase logs to find deadlock issue. --- query/worker.go | 13 ++++++++++--- query/workmanager.go | 17 ++++++++++++++++- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/query/worker.go b/query/worker.go index dc15a18c..d81b46bd 100644 --- a/query/worker.go +++ b/query/worker.go @@ -2,6 +2,7 @@ package query import ( "errors" + "fmt" "time" "github.com/btcsuite/btcd/wire" @@ -43,6 +44,12 @@ func (q *queryJob) Index() uint64 { return q.index } +// String returns the string representation of the queryJob code. +func (q *queryJob) String() string { + return fmt.Sprintf("QueryJob(index=%v): tries=%v, timeout=%v, +"+ + "request_msg: %v", q.index, q.tries, q.timeout, q.Req) +} + // jobResult is the final result of the worker's handling of the queryJob. type jobResult struct { job *queryJob @@ -96,7 +103,7 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) { select { // Poll a new job from the nextJob channel. case job = <-w.nextJob: - log.Tracef("Worker %v picked up job with index %v", + log.Infof("Worker %v picked up job with index %v", peer.Addr(), job.Index()) // Ignore any message received while not working on anything. @@ -154,7 +161,7 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) { job.Req, resp, peer.Addr(), ) - log.Tracef("Worker %v handled msg %T while "+ + log.Infof("Worker %v handled msg %T while "+ "waiting for response to %T (job=%v). "+ "Finished=%v, progressed=%v", peer.Addr(), resp, job.Req, job.Index(), @@ -189,7 +196,7 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) { // The query did experience a timeout and will // be given to someone else. jobErr = ErrQueryTimeout - log.Tracef("Worker %v timeout for request %T "+ + log.Infof("Worker %v timeout for request %T "+ "with job index %v", peer.Addr(), job.Req, job.Index()) diff --git a/query/workmanager.go b/query/workmanager.go index e99f57ab..216a5e45 100644 --- a/query/workmanager.go +++ b/query/workmanager.go @@ -308,6 +308,15 @@ Loop: delete(currentQueries, result.job.index) batch := currentBatches[batchNum] + log.Infof("Job(%v) received a result=%v, "+ + "batchNum=%v, batch=%v, pendingQueries=%v, "+ + "pendingBatches=%v", result.job, result, + batchNum, batch, len(currentQueries), + len(currentBatches)) + + log.Infof("Batch info: %v,%v,%v", batch.maxRetries, + batch.timeout, batch.noRetryMax) + switch { // If the query ended because it was canceled, drop it. case result.err == ErrJobCanceled: @@ -431,9 +440,15 @@ Loop: // Add all new queries in the batch to our work queue, // with priority given by the order they were // scheduled. - log.Debugf("Adding new batch(%d) of %d queries to "+ + log.Infof("Adding new batch(%d) of %d queries to "+ "work queue", batchIndex, len(batch.requests)) + log.Infof("Current queries to manage %d: %v", + len(currentQueries), currentQueries) + + log.Infof("Current batches to manage %d: %v", + len(currentBatches), currentBatches) + for _, q := range batch.requests { heap.Push(work, &queryJob{ index: queryIndex,