Skip to content

Commit

Permalink
Merge branch 'apache:trunk' into missing-mm2-jmx
Browse files Browse the repository at this point in the history
  • Loading branch information
alozano3 authored Jan 24, 2025
2 parents 64b8513 + 356f0d8 commit b7b5f4e
Show file tree
Hide file tree
Showing 62 changed files with 2,925 additions and 1,836 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ fail due to code changes. You can just run:
Using compiled files:

KAFKA_CLUSTER_ID="$(./bin/kafka-storage.sh random-uuid)"
./bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/kraft/reconfig-server.properties
./bin/kafka-server-start.sh config/kraft/reconfig-server.properties
./bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties
./bin/kafka-server-start.sh config/server.properties

Using docker image:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1154,7 +1154,8 @@ private void ensureValidRecordSize(int size) {
/**
* Invoking this method makes all buffered records immediately available to send (even if <code>linger.ms</code> is
* greater than 0) and blocks on the completion of the requests associated with these records. The post-condition
* of <code>flush()</code> is that any previously sent record will have completed (e.g. <code>Future.isDone() == true</code>).
* of <code>flush()</code> is that any previously sent record will have completed (e.g. <code>Future.isDone() == true</code>
* and callbacks passed to {@link #send(ProducerRecord,Callback)} have been called).
* A request is considered completed when it is successfully acknowledged
* according to the <code>acks</code> configuration you have specified or else it results in an error.
* <p>
Expand Down

Large diffs are not rendered by default.

File renamed without changes.
File renamed without changes.
129 changes: 0 additions & 129 deletions config/kraft/server.properties

This file was deleted.

File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ public int size() {
* A simple container class to hold all the attributes
* related to a pending batch.
*/
private static class CoordinatorBatch {
private class CoordinatorBatch {
/**
* The base (or first) offset of the batch. If the batch fails
* for any reason, the state machines is rolled back to it.
Expand Down Expand Up @@ -500,9 +500,9 @@ private static class CoordinatorBatch {
final Optional<TimerTask> lingerTimeoutTask;

/**
* The list of deferred events associated with the batch.
* The deferred events associated with the batch.
*/
final List<DeferredEvent> deferredEvents;
final DeferredEventCollection deferredEvents;

/**
* The next offset. This is updated when records
Expand All @@ -527,7 +527,7 @@ private static class CoordinatorBatch {
this.buffer = buffer;
this.builder = builder;
this.lingerTimeoutTask = lingerTimeoutTask;
this.deferredEvents = new ArrayList<>();
this.deferredEvents = new DeferredEventCollection();
}
}

Expand Down Expand Up @@ -742,7 +742,11 @@ private void unload() {
deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
failCurrentBatch(Errors.NOT_COORDINATOR.exception());
if (coordinator != null) {
coordinator.onUnloaded();
try {
coordinator.onUnloaded();
} catch (Throwable ex) {
log.error("Failed to unload coordinator for {} due to {}.", tp, ex.getMessage(), ex);
}
}
coordinator = null;
}
Expand Down Expand Up @@ -806,9 +810,7 @@ private void flushCurrentBatch() {
}

// Add all the pending deferred events to the deferred event queue.
for (DeferredEvent event : currentBatch.deferredEvents) {
deferredEventQueue.add(offset, event);
}
deferredEventQueue.add(offset, currentBatch.deferredEvents);

// Free up the current batch.
freeCurrentBatch();
Expand Down Expand Up @@ -839,9 +841,7 @@ private void maybeFlushCurrentBatch(long currentTimeMs) {
private void failCurrentBatch(Throwable t) {
if (currentBatch != null) {
coordinator.revertLastWrittenOffset(currentBatch.baseOffset);
for (DeferredEvent event : currentBatch.deferredEvents) {
event.complete(t);
}
currentBatch.deferredEvents.complete(t);
freeCurrentBatch();
}
}
Expand Down Expand Up @@ -1157,6 +1157,38 @@ public void run() {
}
}

/**
* A collection of {@link DeferredEvent}. When completed, completes all the events in the collection
* and logs any exceptions thrown.
*/
class DeferredEventCollection implements DeferredEvent {
private final List<DeferredEvent> events = new ArrayList<>();

@Override
public void complete(Throwable t) {
for (DeferredEvent event : events) {
try {
event.complete(t);
} catch (Throwable e) {
log.error("Completion of event {} failed due to {}.", event, e.getMessage(), e);
}
}
}

public boolean add(DeferredEvent event) {
return events.add(event);
}

public int size() {
return events.size();
}

@Override
public String toString() {
return "DeferredEventCollection(events=" + events + ")";
}
}

/**
* A coordinator write operation.
*
Expand Down Expand Up @@ -2387,9 +2419,19 @@ public void scheduleUnloadOperation(
try {
if (partitionEpoch.isEmpty() || context.epoch < partitionEpoch.getAsInt()) {
log.info("Started unloading metadata for {} with epoch {}.", tp, partitionEpoch);
context.transitionTo(CoordinatorState.CLOSED);
coordinators.remove(tp, context);
log.info("Finished unloading metadata for {} with epoch {}.", tp, partitionEpoch);
try {
context.transitionTo(CoordinatorState.CLOSED);
log.info("Finished unloading metadata for {} with epoch {}.", tp, partitionEpoch);
} catch (Throwable ex) {
// It's very unlikely that we will ever see an exception here, since we
// already make an effort to catch exceptions in the unload method.
log.error("Failed to unload metadata for {} with epoch {} due to {}.",
tp, partitionEpoch, ex.toString());
} finally {
// Always remove the coordinator context, otherwise the coordinator
// shard could be permanently stuck.
coordinators.remove(tp, context);
}
} else {
log.info("Ignored unloading metadata for {} in epoch {} since current epoch is {}.",
tp, partitionEpoch, context.epoch);
Expand Down Expand Up @@ -2470,6 +2512,8 @@ public void close() throws Exception {
context.lock.lock();
try {
context.transitionTo(CoordinatorState.CLOSED);
} catch (Throwable ex) {
log.warn("Failed to unload metadata for {} due to {}.", tp, ex.getMessage(), ex);
} finally {
context.lock.unlock();
}
Expand Down
Loading

0 comments on commit b7b5f4e

Please sign in to comment.