Skip to content

Commit

Permalink
GEODE-8947: Use waiting thread pool only in limited senario. (#6038)
Browse files Browse the repository at this point in the history
* GEODE-8947: Use waiting thread pool only in limited senario.

  * Only when processing transactional message and when conserve-sockets set to true case, a
    separate thread in waiting pool will be used to process the message.
  * This is to addresses performance issue if there won't be deadlock.

(cherry picked from commit b98cf4a)
  • Loading branch information
pivotal-eshu committed Feb 17, 2021
1 parent 9006226 commit 3a91d92
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package org.apache.geode.internal.cache.tx;


import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
Expand Down Expand Up @@ -176,7 +177,23 @@ public void process(final ClusterDistributionManager dm) {
sendReply(getSender(), this.processorId, dm, replyException, null, 0);
return;
}
dm.getExecutors().getWaitingThreadPool().execute(() -> doRemoteOperation(dm, cache));

if (dm.getSystem().threadOwnsResources()) {
// reply inline if thread owns socket.
doRemoteOperation(dm, cache);
return;
}

if (isTransactional()) {
dm.getExecutors().getWaitingThreadPool().execute(() -> doRemoteOperation(dm, cache));
} else {
// reply inline for non-transactional case.
doRemoteOperation(dm, cache);
}
}

boolean isTransactional() {
return getTXUniqId() != TXManagerImpl.NOTX && canParticipateInTransaction();
}

void doRemoteOperation(ClusterDistributionManager dm, InternalCache cache) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -302,6 +303,52 @@ public void processWithNullPointerExceptionFromOperationOnRegionWithSystemFailur
.hasMessageContaining("system failure");
}


@Test
public void processInvokesDoRemoteOperationIfThreadOwnsResources() {
when(system.threadOwnsResources()).thenReturn(true);
doNothing().when(msg).doRemoteOperation(dm, cache);

msg.process(dm);

verify(msg).doRemoteOperation(dm, cache);
verify(msg, never()).isTransactional();
}

@Test
public void processInvokesDoRemoteOperationIfThreadDoesNotOwnResourcesAndNotTransactional() {
when(system.threadOwnsResources()).thenReturn(false);
doReturn(false).when(msg).isTransactional();
doNothing().when(msg).doRemoteOperation(dm, cache);

msg.process(dm);

verify(msg).doRemoteOperation(dm, cache);
verify(msg).isTransactional();
}

@Test
public void isTransactionalReturnsFalseIfTXUniqueIdIsNOTX() {
assertThat(msg.getTXUniqId()).isEqualTo(TXManagerImpl.NOTX);
assertThat(msg.isTransactional()).isFalse();
}

@Test
public void isTransactionalReturnsFalseIfCannotParticipateInTransaction() {
doReturn(1).when(msg).getTXUniqId();
doReturn(false).when(msg).canParticipateInTransaction();

assertThat(msg.isTransactional()).isFalse();
}

@Test
public void isTransactionalReturnsTrueIfHasTXUniqueIdAndCanParticipateInTransaction() {
doReturn(1).when(msg).getTXUniqId();

assertThat(msg.canParticipateInTransaction()).isTrue();
assertThat(msg.isTransactional()).isTrue();
}

private static class TestableRemoteOperationMessage extends RemoteOperationMessage {

private boolean operationOnRegionResult = true;
Expand Down

0 comments on commit 3a91d92

Please sign in to comment.