From e8306577fbdfec99e1566e0359381f33048878dd Mon Sep 17 00:00:00 2001 From: Aleksandr Polovtsev Date: Tue, 28 Jan 2025 16:02:53 +0200 Subject: [PATCH 1/2] IGNITE-22459 Implement zone Raft group listener --- .../replicator/ItReplicaLifecycleTest.java | 81 ---- .../replicator/ItZoneDataReplicationTest.java | 426 ++++++++++++++++++ .../partition/replicator/fixtures/Node.java | 16 +- .../fixtures/TestPlacementDriver.java | 51 ++- .../PartitionReplicaLifecycleManager.java | 74 ++- .../replicator/ZonePartitionRaftListener.java | 129 +++++- .../table/distributed/TableManager.java | 94 ++-- .../distributed/raft/PartitionListener.java | 7 +- .../replicator/PartitionReplicaListener.java | 4 +- 9 files changed, 670 insertions(+), 212 deletions(-) create mode 100644 modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java diff --git a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java index c2bb0c6cd05..1e433463f80 100644 --- a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java +++ b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java @@ -35,8 +35,6 @@ import static org.apache.ignite.sql.ColumnType.INT32; import static org.apache.ignite.sql.ColumnType.INT64; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.never; @@ -79,7 +77,6 @@ import org.apache.ignite.internal.partitiondistribution.Assignments; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.replicator.Replica; -import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.replicator.ZonePartitionId; import org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration; import org.apache.ignite.internal.storage.configurations.StorageConfiguration; @@ -92,9 +89,7 @@ import org.apache.ignite.internal.testframework.SystemPropertiesExtension; import org.apache.ignite.internal.testframework.WithSystemProperty; import org.apache.ignite.internal.tx.configuration.TransactionConfiguration; -import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest; import org.apache.ignite.network.NetworkAddress; -import org.apache.ignite.table.KeyValueView; import org.apache.ignite.table.Table; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterEach; @@ -262,70 +257,6 @@ private void stopNode(int idx) { nodes.remove(idx); } - @Test - public void testZoneReplicaListener(TestInfo testInfo) throws Exception { - startNodes(testInfo, 3); - - Assignment replicaAssignment = (Assignment) calculateAssignmentForPartition( - nodes.values().stream().map(n -> n.name).collect(toList()), 0, 1, 1).toArray()[0]; - - Node node = getNode(replicaAssignment.consistentId()); - - placementDriver.setPrimary(node.clusterService.topologyService().localMember()); - - createZone(node, "test_zone", 1, 1); - int zoneId = DistributionZonesTestUtil.getZoneId(node.catalogManager, "test_zone", node.hybridClock.nowLong()); - - long key = 1; - - { - createTable(node, "test_zone", "test_table"); - int tableId = TableTestUtils.getTableId(node.catalogManager, "test_table", node.hybridClock.nowLong()); - - prepareTableIdToZoneIdConverter( - node, - new TablePartitionId(tableId, 0), - new ZonePartitionId(zoneId, 0) - ); - - KeyValueView keyValueView = node.tableManager.table(tableId).keyValueView(Long.class, Integer.class); - - int val = 100; - - node.transactions().runInTransaction(tx -> { - assertDoesNotThrow(() -> keyValueView.put(tx, key, val)); - - assertEquals(val, keyValueView.get(tx, key)); - }); - - node.transactions().runInTransaction(tx -> { - // Check the replica read inside the another transaction - assertEquals(val, keyValueView.get(tx, key)); - }); - } - - { - createTable(node, "test_zone", "test_table1"); - int tableId = TableTestUtils.getTableId(node.catalogManager, "test_table1", node.hybridClock.nowLong()); - - prepareTableIdToZoneIdConverter( - node, - new TablePartitionId(tableId, 0), - new ZonePartitionId(zoneId, 0) - ); - - KeyValueView keyValueView = node.tableManager.table(tableId).keyValueView(Long.class, Integer.class); - - int val = 200; - - node.transactions().runInTransaction(tx -> { - assertDoesNotThrow(() -> keyValueView.put(tx, key, val)); - - assertEquals(val, keyValueView.get(tx, key)); - }); - } - } - @Test @Disabled("https://issues.apache.org/jira/browse/IGNITE-22944") void testAlterReplicaTrigger(TestInfo testInfo) throws Exception { @@ -716,18 +647,6 @@ void testStableAreWrittenAfterRestartAndConcurrentStableUpdate(TestInfo testInfo assertTrue(waitForCondition(() -> getNode(0).replicaManager.isReplicaStarted(partId), 10_000L)); } - private void prepareTableIdToZoneIdConverter(Node node, TablePartitionId tablePartitionId, ZonePartitionId zonePartitionId) { - node.converter.set(request -> { - if (request.groupId().asReplicationGroupId().equals(tablePartitionId) - && !(request instanceof WriteIntentSwitchReplicaRequest)) { - return zonePartitionId; - } else { - return request.groupId().asReplicationGroupId(); - } - }); - - } - private Node getNode(int nodeIndex) { return nodes.get(nodeIndex); } diff --git a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java new file mode 100644 index 00000000000..86170f6c7d3 --- /dev/null +++ b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static java.util.concurrent.CompletableFuture.allOf; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; +import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE; +import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.createZoneWithStorageProfile; +import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.getZoneId; +import static org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager.FEATURE_FLAG_NAME; +import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME; +import static org.apache.ignite.internal.table.TableTestUtils.getTableId; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.apache.ignite.internal.util.IgniteUtils.closeAll; +import static org.apache.ignite.sql.ColumnType.INT32; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.anEmptyMap; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Function; +import java.util.stream.IntStream; +import org.apache.ignite.internal.catalog.commands.ColumnParams; +import org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration; +import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot; +import org.apache.ignite.internal.configuration.SystemLocalConfiguration; +import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; +import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration; +import org.apache.ignite.internal.network.NodeFinder; +import org.apache.ignite.internal.network.StaticNodeFinder; +import org.apache.ignite.internal.partition.replicator.fixtures.Node; +import org.apache.ignite.internal.partition.replicator.fixtures.TestPlacementDriver; +import org.apache.ignite.internal.raft.configuration.RaftConfiguration; +import org.apache.ignite.internal.replicator.Member; +import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.replicator.ZonePartitionId; +import org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration; +import org.apache.ignite.internal.replicator.message.PrimaryReplicaChangeCommand; +import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; +import org.apache.ignite.internal.replicator.message.ReplicaRequest; +import org.apache.ignite.internal.storage.configurations.StorageConfiguration; +import org.apache.ignite.internal.table.TableTestUtils; +import org.apache.ignite.internal.testframework.ExecutorServiceExtension; +import org.apache.ignite.internal.testframework.IgniteAbstractTest; +import org.apache.ignite.internal.testframework.InjectExecutorService; +import org.apache.ignite.internal.testframework.SystemPropertiesExtension; +import org.apache.ignite.internal.testframework.WithSystemProperty; +import org.apache.ignite.internal.tx.configuration.TransactionConfiguration; +import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.table.KeyValueView; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * Class containing tests related to Raft-based replication for the Colocation feature. + */ +@ExtendWith(ConfigurationExtension.class) +@ExtendWith(ExecutorServiceExtension.class) +@ExtendWith(SystemPropertiesExtension.class) +// TODO: https://issues.apache.org/jira/browse/IGNITE-22522 remove this test after the switching to zone-based replication +@Disabled("https://issues.apache.org/jira/browse/IGNITE-23252") +@WithSystemProperty(key = FEATURE_FLAG_NAME, value = "true") +public class ItZoneDataReplicationTest extends IgniteAbstractTest { + private static final int BASE_PORT = 20_000; + + private static final String TEST_ZONE_NAME = "TEST_ZONE"; + + private static final String TEST_TABLE_NAME1 = "TEST_TABLE_1"; + + private static final String TEST_TABLE_NAME2 = "TEST_TABLE_2"; + + private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory(); + + @InjectConfiguration + private static TransactionConfiguration txConfiguration; + + @InjectConfiguration + private static RaftConfiguration raftConfiguration; + + @InjectConfiguration + private static SystemLocalConfiguration systemConfiguration; + + @InjectConfiguration + private static NodeAttributesConfiguration nodeAttributesConfiguration; + + @InjectConfiguration + private static ReplicationConfiguration replicationConfiguration; + + @InjectConfiguration + private static MetaStorageConfiguration metaStorageConfiguration; + + @InjectConfiguration("mock.profiles = {" + DEFAULT_STORAGE_PROFILE + ".engine = aipersist, test.engine=test}") + private static StorageConfiguration storageConfiguration; + + @InjectExecutorService + private static ScheduledExecutorService scheduledExecutorService; + + private final List cluster = new ArrayList<>(); + + private final TestPlacementDriver placementDriver = new TestPlacementDriver(); + + private NodeFinder nodeFinder; + + @AfterEach + void tearDown() throws Exception { + closeAll(cluster.parallelStream().map(node -> node::stop)); + } + + private void startCluster(TestInfo testInfo, int size) throws Exception { + List addresses = IntStream.range(0, size) + .mapToObj(i -> new NetworkAddress("localhost", BASE_PORT + i)) + .collect(toList()); + + nodeFinder = new StaticNodeFinder(addresses); + + IntStream.range(0, size) + .mapToObj(i -> newNode(testInfo, addresses.get(i), nodeFinder)) + .forEach(cluster::add); + + cluster.parallelStream().forEach(Node::start); + + Node node0 = cluster.get(0); + + node0.cmgManager.initCluster(List.of(node0.name), List.of(node0.name), "cluster"); + + setPrimaryReplica(node0, null); + + cluster.forEach(Node::waitWatches); + + assertThat( + allOf(cluster.stream().map(n -> n.cmgManager.onJoinReady()).toArray(CompletableFuture[]::new)), + willCompleteSuccessfully() + ); + + assertTrue(waitForCondition( + () -> { + CompletableFuture logicalTopologyFuture = node0.cmgManager.logicalTopology(); + + assertThat(logicalTopologyFuture, willCompleteSuccessfully()); + + return logicalTopologyFuture.join().nodes().size() == cluster.size(); + }, + 30_000 + )); + } + + private Node addNodeToCluster(TestInfo testInfo, Function requestConverter) { + Node node = newNode(testInfo, new NetworkAddress("localhost", BASE_PORT + cluster.size()), nodeFinder); + + node.setRequestConverter(requestConverter); + + cluster.add(node); + + node.start(); + + node.waitWatches(); + + assertThat(node.cmgManager.onJoinReady(), willCompleteSuccessfully()); + + return node; + } + + private Node newNode(TestInfo testInfo, NetworkAddress address, NodeFinder nodeFinder) { + return new Node( + testInfo, + address, + nodeFinder, + workDir, + placementDriver, + systemConfiguration, + raftConfiguration, + nodeAttributesConfiguration, + storageConfiguration, + metaStorageConfiguration, + replicationConfiguration, + txConfiguration, + scheduledExecutorService, + null + ); + } + + private int createZone(String zoneName, int partitions, int replicas) { + Node node = cluster.get(0); + + createZoneWithStorageProfile( + node.catalogManager, + zoneName, + partitions, + replicas, + DEFAULT_STORAGE_PROFILE + ); + + return getZoneId(node.catalogManager, zoneName, node.hybridClock.nowLong()); + } + + private int createTable(String zoneName, String tableName) { + Node node = cluster.get(0); + + TableTestUtils.createTable( + node.catalogManager, + DEFAULT_SCHEMA_NAME, + zoneName, + tableName, + List.of( + ColumnParams.builder().name("key").type(INT32).build(), + ColumnParams.builder().name("val").type(INT32).nullable(true).build() + ), + List.of("key") + ); + + return getTableId(node.catalogManager, tableName, node.hybridClock.nowLong()); + } + + /** + * Tests that inserted data is replicated to all replica nodes. + */ + @Test + void testReplicationOnAllNodes(TestInfo testInfo) throws Exception { + startCluster(testInfo, 3); + + // Create a zone with a single partition on every node. + int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size()); + + int tableId1 = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1); + int tableId2 = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME2); + + var zonePartitionId = new ZonePartitionId(zoneId, 0); + + setupTableIdToZoneIdConverter(zonePartitionId, new TablePartitionId(tableId1, 0), new TablePartitionId(tableId2, 0)); + + Node node = cluster.get(0); + + setPrimaryReplica(node, zonePartitionId); + + KeyValueView kvView1 = node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class, Integer.class); + KeyValueView kvView2 = node.tableManager.table(TEST_TABLE_NAME2).keyValueView(Integer.class, Integer.class); + + // Test single insert. + kvView1.put(null, 42, 69); + kvView2.put(null, 142, 169); + + for (Node n : cluster) { + setPrimaryReplica(n, zonePartitionId); + + assertThat(kvView1.get(null, 42), is(69)); + assertThat(kvView1.get(null, 142), is(nullValue())); + + assertThat(kvView2.get(null, 42), is(nullValue())); + assertThat(kvView2.get(null, 142), is(169)); + } + + // Test batch insert. + Map data1 = IntStream.range(0, 10).boxed().collect(toMap(Function.identity(), Function.identity())); + Map data2 = IntStream.range(10, 20).boxed().collect(toMap(Function.identity(), Function.identity())); + + kvView1.putAll(null, data1); + kvView2.putAll(null, data2); + + for (Node n : cluster) { + setPrimaryReplica(n, zonePartitionId); + + assertThat(kvView1.getAll(null, data1.keySet()), is(data1)); + assertThat(kvView1.getAll(null, data2.keySet()), is(anEmptyMap())); + + assertThat(kvView2.getAll(null, data1.keySet()), is(anEmptyMap())); + assertThat(kvView2.getAll(null, data2.keySet()), is(data2)); + } + } + + /** + * Tests that inserted data is replicated to a newly joined replica node. + */ + @Test + void testDataRebalance(TestInfo testInfo) throws Exception { + testDataRebalanceImpl(testInfo, false); + } + + /** + * Same as {@link #testDataRebalance} but replication is performed using a Raft snapshot instead of Raft log replay. + */ + @Disabled("https://issues.apache.org/jira/browse/IGNITE-22416") + @Test + void testDataRebalanceUsingRaftSnapshot(TestInfo testInfo) throws Exception { + testDataRebalanceImpl(testInfo, true); + } + + private void testDataRebalanceImpl(TestInfo testInfo, boolean truncateRaftLog) throws Exception { + startCluster(testInfo, 2); + + // Create a zone with a single partition on every node + one extra replica for the upcoming node. + int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size() + 1); + + int tableId1 = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1); + int tableId2 = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME2); + + var zonePartitionId = new ZonePartitionId(zoneId, 0); + + Function requestConverter = + requestConverter(zonePartitionId, new TablePartitionId(tableId1, 0), new TablePartitionId(tableId2, 0)); + + cluster.forEach(node -> node.setRequestConverter(requestConverter)); + + Node node = cluster.get(0); + + setPrimaryReplica(node, zonePartitionId); + + Map data1 = IntStream.range(0, 10).boxed().collect(toMap(Function.identity(), Function.identity())); + Map data2 = IntStream.range(10, 20).boxed().collect(toMap(Function.identity(), Function.identity())); + + KeyValueView kvView1 = node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class, Integer.class); + KeyValueView kvView2 = node.tableManager.table(TEST_TABLE_NAME2).keyValueView(Integer.class, Integer.class); + + kvView1.putAll(null, data1); + kvView2.putAll(null, data2); + + if (truncateRaftLog) { + truncateLogOnEveryNode(zonePartitionId); + } + + Node newNode = addNodeToCluster(testInfo, requestConverter); + + // Wait for the rebalance to kick in. + assertTrue(waitForCondition(() -> newNode.replicaManager.isReplicaStarted(zonePartitionId), 10_000L)); + + setPrimaryReplica(newNode, zonePartitionId); + + assertThat(kvView1.getAll(null, data1.keySet()), is(data1)); + assertThat(kvView1.getAll(null, data2.keySet()), is(anEmptyMap())); + + assertThat(kvView2.getAll(null, data1.keySet()), is(anEmptyMap())); + assertThat(kvView2.getAll(null, data2.keySet()), is(data2)); + } + + private void setupTableIdToZoneIdConverter(ZonePartitionId zonePartitionId, TablePartitionId... tablePartitionIds) { + Function requestConverter = requestConverter(zonePartitionId, tablePartitionIds); + + cluster.forEach(node -> node.setRequestConverter(requestConverter)); + } + + private static Function requestConverter( + ZonePartitionId zonePartitionId, TablePartitionId... tablePartitionIds + ) { + Set tablePartitionIdsSet = Set.of(tablePartitionIds); + + return request -> { + ReplicationGroupId replicationGroupId = request.groupId().asReplicationGroupId(); + + if (tablePartitionIdsSet.contains(replicationGroupId) && !(request instanceof WriteIntentSwitchReplicaRequest)) { + return zonePartitionId; + } else { + return replicationGroupId; + } + }; + } + + private void setPrimaryReplica(Node node, @Nullable ZonePartitionId zonePartitionId) { + ClusterNode newPrimaryReplicaNode = node.clusterService.topologyService().localMember(); + + HybridTimestamp leaseStartTime = node.hybridClock.now(); + + placementDriver.setPrimary(newPrimaryReplicaNode, leaseStartTime); + + if (zonePartitionId != null) { + PrimaryReplicaChangeCommand cmd = REPLICA_MESSAGES_FACTORY.primaryReplicaChangeCommand() + .primaryReplicaNodeId(newPrimaryReplicaNode.id()) + .primaryReplicaNodeName(newPrimaryReplicaNode.name()) + .leaseStartTime(leaseStartTime.longValue()) + .build(); + + CompletableFuture primaryReplicaChangeFuture = node.replicaManager + .replica(zonePartitionId) + .thenCompose(replica -> replica.raftClient().run(cmd)); + + assertThat(primaryReplicaChangeFuture, willCompleteSuccessfully()); + } + } + + private void truncateLogOnEveryNode(ReplicationGroupId groupId) { + CompletableFuture[] truncateFutures = cluster.stream() + .map(node -> truncateLog(node, groupId)) + .toArray(CompletableFuture[]::new); + + assertThat(allOf(truncateFutures), willCompleteSuccessfully()); + } + + private static CompletableFuture truncateLog(Node node, ReplicationGroupId groupId) { + Member member = Member.votingMember(node.name); + + // Using the infamous trick of triggering snapshot twice to cause Raft log truncation. + return node.replicaManager.replica(groupId) + .thenCompose(replica -> replica.createSnapshotOn(member).thenCompose(v -> replica.createSnapshotOn(member))); + } +} diff --git a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java index 4a764f4c425..2242dc5578f 100644 --- a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java +++ b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java @@ -38,16 +38,13 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executors; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.LongSupplier; import org.apache.ignite.internal.app.ThreadPoolsManager; @@ -124,7 +121,6 @@ import org.apache.ignite.internal.replicator.ReplicaManager; import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.replicator.ReplicationGroupId; -import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.replicator.ZonePartitionId; import org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration; import org.apache.ignite.internal.replicator.message.ReplicaRequest; @@ -227,9 +223,6 @@ public class Node { private final ConfigurationTreeGenerator clusterCfgGenerator; - private final Map> finishHandleChangeStableAssignmentEventFutures - = new ConcurrentHashMap<>(); - private final LowWatermarkImpl lowWatermark; /** The future have to be complete after the node start and all Meta storage watches are deployd. */ @@ -244,8 +237,7 @@ public class Node { /** Failure processor. */ private final FailureManager failureManager; - public final AtomicReference> converter = - new AtomicReference<>(request -> request.groupId().asReplicationGroupId()); + private volatile Function converter = request -> request.groupId().asReplicationGroupId(); private final LogStorageFactory partitionsLogStorageFactory; @@ -553,7 +545,7 @@ public CompletableFuture invoke( partitionRaftConfigurer, view -> new LocalLogStorageFactory(), ForkJoinPool.commonPool(), - t -> converter.get().apply(t), + t -> converter.apply(t), replicaGrpId -> metaStorageManager.get(pendingPartAssignmentsKey((ZonePartitionId) replicaGrpId)) .thenApply(Entry::value) ); @@ -787,6 +779,10 @@ public void setInvokeInterceptor(@Nullable InvokeInterceptor invokeInterceptor) this.invokeInterceptor = invokeInterceptor; } + public void setRequestConverter(Function converter) { + this.converter = converter; + } + private static Path resolveDir(Path workDir, String dirName) { Path newDirPath = workDir.resolve(dirName); diff --git a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/TestPlacementDriver.java b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/TestPlacementDriver.java index 88bb13f9825..3ca208cd67d 100644 --- a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/TestPlacementDriver.java +++ b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/TestPlacementDriver.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.ZonePartitionId; import org.apache.ignite.network.ClusterNode; +import org.jetbrains.annotations.Nullable; /** * Trivial placement driver for tests. @@ -43,15 +44,39 @@ public class TestPlacementDriver extends AbstractEventProducer getPrimaryReplicaMeta(ReplicationGroupId throw new IllegalStateException("Primary replica is not defined in test PlacementDriver"); } - return CompletableFuture.completedFuture(new ReplicaMeta() { - @Override - public String getLeaseholder() { - return primary.name(); - } - - @Override - public UUID getLeaseholderId() { - return primary.id(); - } - - @Override - public HybridTimestamp getStartTime() { - return HybridTimestamp.MIN_VALUE; - } - - @Override - public HybridTimestamp getExpirationTime() { - return HybridTimestamp.MAX_VALUE; - } - }); + return CompletableFuture.completedFuture(primary); } @Override diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java index 98ef59118f6..f6e2cad563c 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java @@ -65,6 +65,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -210,6 +211,8 @@ public class PartitionReplicaLifecycleManager extends /** Configuration of rebalance retries delay. */ private final SystemDistributedConfigurationPropertyHolder rebalanceRetryDelayConfiguration; + private final ConcurrentMap zonePartitionRaftListeners = new ConcurrentHashMap<>(); + /** * The constructor. * @@ -427,7 +430,9 @@ private CompletableFuture createZoneReplicationNodes(int zoneId, List createZoneReplicationNodes(int zoneId, List createZonePartitionReplicationNode( - int zoneId, - int partId, + private CompletableFuture createZonePartitionReplicationNode( + ZonePartitionId zonePartitionId, @Nullable Assignment localMemberAssignment, Assignments stableAssignments ) { @@ -461,15 +464,15 @@ private CompletableFuture createZonePartitionReplicationNode( PeersAndLearners stablePeersAndLearners = fromAssignments(stableAssignments.nodes()); - ZonePartitionId replicaGrpId = new ZonePartitionId(zoneId, partId); + var raftGroupListener = new ZonePartitionRaftListener(); - RaftGroupListener raftGroupListener = new ZonePartitionRaftListener(); + zonePartitionRaftListeners.put(zonePartitionId, raftGroupListener); ZoneRebalanceRaftGroupEventsListener raftGroupEventsListener = new ZoneRebalanceRaftGroupEventsListener( metaStorageMgr, - replicaGrpId, + zonePartitionId, busyLock, - createPartitionMover(replicaGrpId), + createPartitionMover(zonePartitionId), rebalanceScheduler, this::calculateZoneAssignments, rebalanceRetryDelayConfiguration @@ -478,20 +481,18 @@ private CompletableFuture createZonePartitionReplicationNode( Supplier> startReplicaSupplier = () -> { try { return replicaMgr.startReplica( - replicaGrpId, - (raftClient) -> new ZonePartitionReplicaListener( + zonePartitionId, + raftClient -> new ZonePartitionReplicaListener( new ExecutorInclinedRaftCommandRunner(raftClient, partitionOperationsExecutor)), new FailFastSnapshotStorageFactory(), stablePeersAndLearners, raftGroupListener, raftGroupEventsListener, busyLock - ).thenCompose(replica -> executeUnderZoneWriteLock(zoneId, () -> { - replicationGroupIds.add(replicaGrpId); + ).thenCompose(replica -> executeUnderZoneWriteLock(zonePartitionId.zoneId(), () -> { + replicationGroupIds.add(zonePartitionId); - var eventParams = new LocalPartitionReplicaEventParameters( - new ZonePartitionId(replicaGrpId.zoneId(), replicaGrpId.partitionId()) - ); + var eventParams = new LocalPartitionReplicaEventParameters(zonePartitionId); return fireEvent(LocalPartitionReplicaEvent.AFTER_REPLICA_STARTED, eventParams); })) @@ -501,17 +502,12 @@ private CompletableFuture createZonePartitionReplicationNode( } }; - return replicaMgr.weakStartReplica( - replicaGrpId, - startReplicaSupplier, - forcedAssignments - ).handle((res, ex) -> { - if (ex != null) { - LOG.warn("Unable to update raft groups on the node [zoneId={}, partitionId={}]", ex, zoneId, partId); - } - - return null; - }); + return replicaMgr.weakStartReplica(zonePartitionId, startReplicaSupplier, forcedAssignments) + .whenComplete((res, ex) -> { + if (ex != null) { + LOG.warn("Unable to update raft groups on the node [zonePartitionId={}]", ex, zonePartitionId); + } + }); } private CompletableFuture> calculateZoneAssignments( @@ -986,15 +982,11 @@ private CompletableFuture handleChangePendingAssignmentEvent( computedStableAssignments = stableAssignments; } - int partitionId = replicaGrpId.partitionId(); - int zoneId = replicaGrpId.zoneId(); - - CompletableFuture localServicesStartFuture; + CompletableFuture localServicesStartFuture; if (shouldStartLocalGroupNode) { localServicesStartFuture = createZonePartitionReplicationNode( - zoneId, - partitionId, + replicaGrpId, localMemberAssignment, computedStableAssignments ); @@ -1271,22 +1263,28 @@ public void unlockZoneForRead(int zoneId, long stamp) { /** * Load a new table partition listener to the zone replica. * - *

Important: This method must be called only with the guarantee, that the replica is exist at the current moment. + *

Important: This method must be called only with the guarantee, that the replica exists at the current moment. * * @param zonePartitionId Zone partition id. * @param tablePartitionId Table partition id. - * @param createListener Lazy replica listener from RAFT command runner builder. + * @param tablePartitionReplicaListenerFactory Factory for creating table-specific partition replicas. + * @param tablePartitionRaftListener Raft group listener for the table-specific partition. */ public void loadTableListenerToZoneReplica( ZonePartitionId zonePartitionId, TablePartitionId tablePartitionId, - Function createListener + Function tablePartitionReplicaListenerFactory, + RaftGroupListener tablePartitionRaftListener ) { CompletableFuture replicaFut = replicaMgr.replica(zonePartitionId); assert replicaFut != null && replicaFut.isDone(); - ((ZonePartitionReplicaListener) replicaFut.join().listener()).addTableReplicaListener(tablePartitionId, createListener); + var zonePartitionReplicaListener = (ZonePartitionReplicaListener) replicaFut.join().listener(); + + zonePartitionReplicaListener.addTableReplicaListener(tablePartitionId, tablePartitionReplicaListenerFactory); + + zonePartitionRaftListeners.get(zonePartitionId).addTablePartitionRaftListener(tablePartitionId, tablePartitionRaftListener); } private CompletableFuture executeUnderZoneWriteLock(int zoneId, Supplier> action) { diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionRaftListener.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionRaftListener.java index 0b536b38e59..15f6fc451c9 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionRaftListener.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionRaftListener.java @@ -22,27 +22,46 @@ import java.io.Serializable; import java.nio.file.Path; +import java.util.Collections; import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommand; +import org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommand; +import org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand; import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.raft.ReadCommand; import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.internal.raft.service.CommandClosure; +import org.apache.ignite.internal.raft.service.CommittedConfiguration; import org.apache.ignite.internal.raft.service.RaftGroupListener; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.replicator.message.PrimaryReplicaChangeCommand; import org.apache.ignite.internal.tx.TransactionResult; /** * RAFT listener for the zone partition. */ -public class ZonePartitionRaftListener implements RaftGroupListener { +class ZonePartitionRaftListener implements RaftGroupListener { private static final IgniteLogger LOG = Loggers.forClass(ZonePartitionRaftListener.class); + private final Map tablePartitionRaftListeners = new ConcurrentHashMap<>(); + + /** + * Latest committed configuration of the zone-wide Raft group. + * + *

Multi-threaded access is guarded by {@link #commitedConfigurationLock}. + */ + private CommittedConfiguration currentCommitedConfiguration; + + private final Object commitedConfigurationLock = new Object(); + @Override public void onRead(Iterator> iterator) { - iterator.forEachRemaining((CommandClosure clo) -> { + iterator.forEachRemaining(clo -> { Command command = clo.command(); assert false : "No read commands expected, [cmd=" + command + ']'; @@ -51,47 +70,121 @@ public void onRead(Iterator> iterator) { @Override public void onWrite(Iterator> iterator) { - iterator.forEachRemaining((CommandClosure clo) -> { - Command command = clo.command(); - - Serializable result = null; - + iterator.forEachRemaining(clo -> { try { - if (command instanceof FinishTxCommand) { - FinishTxCommand cmd = (FinishTxCommand) command; - - result = new TransactionResult(cmd.commit() ? COMMITTED : ABORTED, cmd.commitTimestamp()); - } else { - LOG.debug("Message type " + command.getClass() + " is not supported by the zone partition RAFT listener yet"); - } + processWriteCommand(clo); } catch (Throwable t) { LOG.error( "Unknown error while processing command [commandIndex={}, commandTerm={}, command={}]", t, - clo.index(), clo.index(), command + clo.index(), clo.index(), clo.command() ); clo.result(t); throw t; } - - clo.result(result); }); } + private void processWriteCommand(CommandClosure clo) { + Command command = clo.command(); + + if (command instanceof FinishTxCommand) { + FinishTxCommand cmd = (FinishTxCommand) command; + + clo.result(new TransactionResult(cmd.commit() ? COMMITTED : ABORTED, cmd.commitTimestamp())); + } else if (command instanceof PrimaryReplicaChangeCommand) { + // This is a hack for tests, this command is not issued in production because no zone-wide placement driver exists yet. + CommandClosure idempotentCommandClosure = idempotentCommandClosure(clo); + + tablePartitionRaftListeners.values().forEach(listener -> listener.onWrite(singletonIterator(idempotentCommandClosure))); + + clo.result(null); + } else if (command instanceof UpdateCommand) { + TablePartitionId tablePartitionId = ((UpdateCommand) command).tablePartitionId().asTablePartitionId(); + + processTableSpecificCommand(tablePartitionId, clo); + } else if (command instanceof UpdateAllCommand) { + TablePartitionId tablePartitionId = ((UpdateAllCommand) command).tablePartitionId().asTablePartitionId(); + + processTableSpecificCommand(tablePartitionId, clo); + } else { + LOG.info("Message type " + command.getClass() + " is not supported by the zone partition RAFT listener yet"); + + clo.result(null); + } + } + + private void processTableSpecificCommand(TablePartitionId tablePartitionId, CommandClosure clo) { + tablePartitionRaftListeners.get(tablePartitionId).onWrite(singletonIterator(clo)); + } + + private static Iterator singletonIterator(T value) { + return Collections.singleton(value).iterator(); + } + + private static CommandClosure idempotentCommandClosure(CommandClosure clo) { + return new CommandClosure<>() { + @Override + public WriteCommand command() { + return clo.command(); + } + + @Override + public long index() { + return clo.index(); + } + + @Override + public long term() { + return clo.term(); + } + + @Override + public void result(Serializable res) { + } + }; + } + + @Override + public void onConfigurationCommitted(CommittedConfiguration config) { + synchronized (commitedConfigurationLock) { + currentCommitedConfiguration = config; + + tablePartitionRaftListeners.values().forEach(listener -> listener.onConfigurationCommitted(config)); + } + } + @Override public void onSnapshotSave(Path path, Consumer doneClo) { + // TODO: implement, see https://issues.apache.org/jira/browse/IGNITE-22416 throw new UnsupportedOperationException("Snapshotting is not implemented"); } @Override public boolean onSnapshotLoad(Path path) { + // TODO: implement, see https://issues.apache.org/jira/browse/IGNITE-22416 throw new UnsupportedOperationException("Snapshotting is not implemented"); } @Override public void onShutdown() { - // No-op. + tablePartitionRaftListeners.values().forEach(RaftGroupListener::onShutdown); + } + + /** + * Adds a given Table Partition-level Raft listener to the set of managed listeners. + */ + void addTablePartitionRaftListener(TablePartitionId tablePartitionId, RaftGroupListener listener) { + synchronized (commitedConfigurationLock) { + if (currentCommitedConfiguration != null) { + listener.onConfigurationCommitted(currentCommitedConfiguration); + } + + RaftGroupListener prev = tablePartitionRaftListeners.put(tablePartitionId, listener); + + assert prev == null : "Listener for table partition " + tablePartitionId + " already exists"; + } } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 05aefd14be3..0145c936cae 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -162,6 +162,7 @@ import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner; import org.apache.ignite.internal.raft.PeersAndLearners; import org.apache.ignite.internal.raft.RaftGroupEventsListener; +import org.apache.ignite.internal.raft.service.CommittedConfiguration; import org.apache.ignite.internal.raft.service.RaftCommandRunner; import org.apache.ignite.internal.raft.service.RaftGroupListener; import org.apache.ignite.internal.raft.service.RaftGroupService; @@ -701,43 +702,39 @@ private CompletableFuture waitForMetadataCompleteness(long ts) { private CompletableFuture onZoneReplicaCreated(LocalPartitionReplicaEventParameters parameters) { if (!PartitionReplicaLifecycleManager.ENABLED) { - return completedFuture(false); + return falseCompletedFuture(); } return inBusyLockAsync(busyLock, () -> readyToProcessTableStarts .thenCompose(v -> { - Set zoneTables = zoneTables(parameters.zonePartitionId().zoneId()); + ZonePartitionId zonePartitionId = parameters.zonePartitionId(); - PartitionSet singlePartitionIdSet = PartitionSet.of(parameters.zonePartitionId().partitionId()); + Set zoneTables = zoneTables(zonePartitionId.zoneId()); - CompletableFuture[] futures = zoneTables.stream() - .map(tbl -> { - CompletableFuture createStoragesFuture = runAsync( - () -> inBusyLock(busyLock, () -> getOrCreatePartitionStorages(tbl, singlePartitionIdSet)), - ioExecutor - ); + PartitionSet singlePartitionIdSet = PartitionSet.of(zonePartitionId.partitionId()); - return createStoragesFuture + CompletableFuture[] futures = zoneTables.stream() + .map(tbl -> inBusyLockAsync(busyLock, () -> { + return getOrCreatePartitionStorages(tbl, singlePartitionIdSet) .thenRunAsync(() -> inBusyLock(busyLock, () -> { lowWatermark.getLowWatermarkSafe(lwm -> registerIndexesToTable(tbl, catalogService, singlePartitionIdSet, tbl.schemaView(), lwm) ); - preparePartitionResourcesAndLoadToZoneReplica( - tbl, parameters.zonePartitionId().partitionId(), parameters.zonePartitionId().zoneId()); + preparePartitionResourcesAndLoadToZoneReplica(tbl, zonePartitionId); }), ioExecutor); - }) + })) .toArray(CompletableFuture[]::new); return allOf(futures); }) - .thenApply((unused) -> false) + .thenApply(unused -> false) ); } private CompletableFuture onZoneReplicaStopped(LocalPartitionReplicaEventParameters parameters) { if (!PartitionReplicaLifecycleManager.ENABLED) { - return completedFuture(false); + return falseCompletedFuture(); } return inBusyLockAsync(busyLock, () -> { @@ -766,7 +763,7 @@ private CompletableFuture onZoneReplicaStopped(LocalPartitionReplicaEve private CompletableFuture prepareTableResourcesAndLoadToZoneReplica(CreateTableEventParameters parameters) { if (!PartitionReplicaLifecycleManager.ENABLED) { - return completedFuture(false); + return falseCompletedFuture(); } long causalityToken = parameters.causalityToken(); @@ -830,8 +827,10 @@ private CompletableFuture prepareTableResourcesAndLoadToZoneReplica( } for (int i = 0; i < zoneDescriptor.partitions(); i++) { - if (partitionReplicaLifecycleManager.hasLocalPartition(new ZonePartitionId(zoneDescriptor.id(), i))) { - preparePartitionResourcesAndLoadToZoneReplica(table, i, zoneDescriptor.id()); + var zonePartitionId = new ZonePartitionId(zoneDescriptor.id(), i); + + if (partitionReplicaLifecycleManager.hasLocalPartition(zonePartitionId)) { + preparePartitionResourcesAndLoadToZoneReplica(table, zonePartitionId); } } } @@ -857,29 +856,29 @@ private CompletableFuture prepareTableResourcesAndLoadToZoneReplica( * Prepare the table partition resources and load it to the zone-based replica. * * @param table Table. - * @param partId Partition id. - * @param zoneId Zone id. + * @param zonePartitionId Zone Partition ID. */ - private void preparePartitionResourcesAndLoadToZoneReplica( - TableImpl table, - int partId, - int zoneId - ) { + private void preparePartitionResourcesAndLoadToZoneReplica(TableImpl table, ZonePartitionId zonePartitionId) { + int partId = zonePartitionId.partitionId(); + int tableId = table.tableId(); var internalTbl = (InternalTableImpl) table.internalTable(); - TablePartitionId replicaGrpId = new TablePartitionId(tableId, partId); + var tablePartitionId = new TablePartitionId(tableId, partId); inBusyLock(busyLock, () -> { - var safeTimeTracker = new PendingComparableValuesTracker(HybridTimestamp.MIN_VALUE); + var safeTimeTracker = new SafeTimeValuesTracker(HybridTimestamp.MIN_VALUE); var storageIndexTracker = new PendingComparableValuesTracker(0L); PartitionStorages partitionStorages = getPartitionStorages(table, partId); - PartitionDataStorage partitionDataStorage = partitionDataStorage(partitionStorages.getMvPartitionStorage(), - internalTbl, partId); + PartitionDataStorage partitionDataStorage = partitionDataStorage( + partitionStorages.getMvPartitionStorage(), + internalTbl, + partId + ); storageIndexTracker.update(partitionDataStorage.lastAppliedIndex(), null); @@ -893,10 +892,10 @@ private void preparePartitionResourcesAndLoadToZoneReplica( internalTbl.updatePartitionTrackers(partId, safeTimeTracker, storageIndexTracker); - mvGc.addStorage(replicaGrpId, partitionUpdateHandlers.gcUpdateHandler); + mvGc.addStorage(tablePartitionId, partitionUpdateHandlers.gcUpdateHandler); - Function createListener = (raftClient) -> createReplicaListener( - replicaGrpId, + Function createListener = raftClient -> createReplicaListener( + tablePartitionId, table, safeTimeTracker, partitionStorages.getMvPartitionStorage(), @@ -905,9 +904,25 @@ private void preparePartitionResourcesAndLoadToZoneReplica( raftClient ); + var tablePartitionRaftListener = new PartitionListener( + txManager, + partitionDataStorage, + partitionUpdateHandlers.storageUpdateHandler, + partitionStorages.getTxStateStorage(), + safeTimeTracker, + storageIndexTracker, + catalogService, + table.schemaView(), + indexMetaStorage, + topologyService.localMember().id(), + minTimeCollectorService + ); + partitionReplicaLifecycleManager.loadTableListenerToZoneReplica( - new ZonePartitionId(zoneId, partId), - new TablePartitionId(tableId, partId), createListener + zonePartitionId, + tablePartitionId, + createListener, + tablePartitionRaftListener ); }); } @@ -1290,7 +1305,16 @@ private CompletableFuture startPartitionAndStartClient( indexMetaStorage, topologyService.localMember().id(), minTimeCollectorService - ); + ) { + @Override + public void onConfigurationCommitted(CommittedConfiguration config) { + // Disable this method if the Colocation feature is enabled, actual configuration will be propagated + // manually by the Zone Raft Listener. + if (!PartitionReplicaLifecycleManager.ENABLED) { + super.onConfigurationCommitted(config); + } + } + }; minTimeCollectorService.addPartition(new TablePartitionId(tableId, partId)); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java index 12ba362c399..fc3c5f5fe5b 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java @@ -123,7 +123,7 @@ public class PartitionListener implements RaftGroupListener { private final UUID localNodeId; - private Set currentGroupTopology; + private volatile Set currentGroupTopology; private final MinimumRequiredTimeCollectorService minTimeCollectorService; @@ -550,11 +550,6 @@ public void onShutdown() { storage.close(); } - @Override - public void onLeaderStop() { - // No-op. - } - /** * Returns underlying storage. */ diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index a40e47be53b..815d4227da9 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -107,6 +107,7 @@ import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.lowwatermark.LowWatermark; import org.apache.ignite.internal.network.ClusterNodeResolver; +import org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager; import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory; import org.apache.ignite.internal.partition.replicator.network.TimedBinaryRow; import org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommand; @@ -1191,7 +1192,8 @@ private CompletableFuture> processReadOnlyMultiEntryAction( private CompletableFuture processReplicaSafeTimeSyncRequest(Boolean isPrimary) { requireNonNull(isPrimary); - if (!isPrimary) { + // Disable safe-time sync if the Colocation feature is enabled, safe-time is managed on a different level there. + if (!isPrimary || PartitionReplicaLifecycleManager.ENABLED) { return nullCompletedFuture(); } From e61fd6fc729a796d902f64acbe73e4e42262a635 Mon Sep 17 00:00:00 2001 From: Aleksandr Polovtsev Date: Fri, 31 Jan 2025 12:37:33 +0200 Subject: [PATCH 2/2] minor --- .../replicator/PartitionReplicaLifecycleManager.java | 8 +++++--- .../FailFastSnapshotStorageFactory.java | 2 +- .../replicator/{ => raft}/ZonePartitionRaftListener.java | 6 +++--- 3 files changed, 9 insertions(+), 7 deletions(-) rename modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/{snapshot => raft}/FailFastSnapshotStorageFactory.java (97%) rename modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/{ => raft}/ZonePartitionRaftListener.java (96%) diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java index f6e2cad563c..8744f0d8816 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java @@ -110,7 +110,8 @@ import org.apache.ignite.internal.metastorage.dsl.Condition; import org.apache.ignite.internal.metastorage.dsl.Operation; import org.apache.ignite.internal.network.TopologyService; -import org.apache.ignite.internal.partition.replicator.snapshot.FailFastSnapshotStorageFactory; +import org.apache.ignite.internal.partition.replicator.raft.FailFastSnapshotStorageFactory; +import org.apache.ignite.internal.partition.replicator.raft.ZonePartitionRaftListener; import org.apache.ignite.internal.partitiondistribution.Assignment; import org.apache.ignite.internal.partitiondistribution.Assignments; import org.apache.ignite.internal.placementdriver.PlacementDriver; @@ -1190,7 +1191,7 @@ private CompletableFuture weakStopPartition(ZonePartitionId zonePartitionI return replicaMgr.weakStopReplica( zonePartitionId, WeakReplicaStopReason.EXCLUDED_FROM_ASSIGNMENTS, - () -> stopPartition(zonePartitionId).thenAccept(v -> {}) + () -> stopPartition(zonePartitionId) ); } @@ -1200,12 +1201,13 @@ private CompletableFuture weakStopPartition(ZonePartitionId zonePartitionI * @param zonePartitionId Partition ID. * @return Future that will be completed after all resources have been closed. */ - private CompletableFuture stopPartition(ZonePartitionId zonePartitionId) { + private CompletableFuture stopPartition(ZonePartitionId zonePartitionId) { return executeUnderZoneWriteLock(zonePartitionId.zoneId(), () -> { try { return replicaMgr.stopReplica(zonePartitionId) .thenCompose((replicaWasStopped) -> { if (replicaWasStopped) { + zonePartitionRaftListeners.remove(zonePartitionId); replicationGroupIds.remove(zonePartitionId); return fireEvent( diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/snapshot/FailFastSnapshotStorageFactory.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/FailFastSnapshotStorageFactory.java similarity index 97% rename from modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/snapshot/FailFastSnapshotStorageFactory.java rename to modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/FailFastSnapshotStorageFactory.java index 740eec50fb3..1c2fb3f9a8f 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/snapshot/FailFastSnapshotStorageFactory.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/FailFastSnapshotStorageFactory.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.ignite.internal.partition.replicator.snapshot; +package org.apache.ignite.internal.partition.replicator.raft; import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory; import org.apache.ignite.raft.jraft.option.RaftOptions; diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionRaftListener.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java similarity index 96% rename from modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionRaftListener.java rename to modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java index 15f6fc451c9..139181bfb59 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionRaftListener.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.ignite.internal.partition.replicator; +package org.apache.ignite.internal.partition.replicator.raft; import static org.apache.ignite.internal.tx.TxState.ABORTED; import static org.apache.ignite.internal.tx.TxState.COMMITTED; @@ -45,7 +45,7 @@ /** * RAFT listener for the zone partition. */ -class ZonePartitionRaftListener implements RaftGroupListener { +public class ZonePartitionRaftListener implements RaftGroupListener { private static final IgniteLogger LOG = Loggers.forClass(ZonePartitionRaftListener.class); private final Map tablePartitionRaftListeners = new ConcurrentHashMap<>(); @@ -176,7 +176,7 @@ public void onShutdown() { /** * Adds a given Table Partition-level Raft listener to the set of managed listeners. */ - void addTablePartitionRaftListener(TablePartitionId tablePartitionId, RaftGroupListener listener) { + public void addTablePartitionRaftListener(TablePartitionId tablePartitionId, RaftGroupListener listener) { synchronized (commitedConfigurationLock) { if (currentCommitedConfiguration != null) { listener.onConfigurationCommitted(currentCommitedConfiguration);