Skip to content

Commit

Permalink
Refactor DeliverEventSubscriber (#34239)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Jan 3, 2025
1 parent 2849b62 commit 3fd7097
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 18 deletions.
2 changes: 1 addition & 1 deletion features/readwrite-splitting/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-mode-api</artifactId>
<artifactId>shardingsphere-mode-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@
import com.google.common.eventbus.Subscribe;
import lombok.Setter;
import org.apache.shardingsphere.metadata.persist.node.QualifiedDataSourceNode;
import org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriber;
import org.apache.shardingsphere.mode.spi.PersistRepository;
import org.apache.shardingsphere.mode.deliver.DeliverEventSubscriber;
import org.apache.shardingsphere.mode.manager.ContextManager;

/**
* Readwrite-splitting qualified data source deleted subscriber.
*/
@Setter
public final class ReadwriteSplittingQualifiedDataSourceDeletedSubscriber implements DeliverEventSubscriber {

private PersistRepository repository;
private ContextManager contextManager;

/**
* Delete qualified data source.
Expand All @@ -38,6 +38,6 @@ public final class ReadwriteSplittingQualifiedDataSourceDeletedSubscriber implem
*/
@Subscribe
public void delete(final QualifiedDataSourceDeletedEvent event) {
repository.delete(QualifiedDataSourceNode.getQualifiedDataSourceNodePath(event.getQualifiedDataSource()));
contextManager.getPersistServiceFacade().getRepository().delete(QualifiedDataSourceNode.getQualifiedDataSourceNodePath(event.getQualifiedDataSource()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,34 @@
package org.apache.shardingsphere.readwritesplitting.cluster;

import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.spi.PersistRepository;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class ReadwriteSplittingQualifiedDataSourceDeletedSubscriberTest {

private ReadwriteSplittingQualifiedDataSourceDeletedSubscriber subscriber;

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private ContextManager contextManager;

@Mock
private PersistRepository repository;

@BeforeEach
void setUp() {
subscriber = new ReadwriteSplittingQualifiedDataSourceDeletedSubscriber();
subscriber.setRepository(repository);
when(contextManager.getPersistServiceFacade().getRepository()).thenReturn(repository);
subscriber.setContextManager(contextManager);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
* limitations under the License.
*/

package org.apache.shardingsphere.mode.event.deliver;
package org.apache.shardingsphere.mode.deliver;

import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.spi.PersistRepository;
import org.apache.shardingsphere.mode.manager.ContextManager;

/**
* Deliver event subscriber factory.
Expand All @@ -28,9 +28,9 @@
public interface DeliverEventSubscriber extends EventSubscriber {

/**
* Set persist repository.
* Set context manager.
*
* @param repository persist repository
* @param contextManager context manager
*/
void setRepository(PersistRepository repository);
void setContextManager(ContextManager contextManager);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.metadata.persist.MetaDataPersistService;
import org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriber;
import org.apache.shardingsphere.mode.deliver.DeliverEventSubscriber;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
Expand Down Expand Up @@ -63,7 +63,7 @@ public ContextManager build(final ContextManagerBuilderParameter param, final Ev
MetaDataPersistService metaDataPersistService = new MetaDataPersistService(repository);
MetaDataContexts metaDataContexts = MetaDataContextsFactory.create(metaDataPersistService, param, computeNodeInstanceContext);
ContextManager result = new ContextManager(metaDataContexts, computeNodeInstanceContext, repository);
registerOnline(computeNodeInstanceContext, param, result, repository);
registerOnline(computeNodeInstanceContext, param, result);
return result;
}

Expand All @@ -74,14 +74,13 @@ private ClusterPersistRepository getClusterPersistRepository(final ClusterPersis
return result;
}

private void registerOnline(final ComputeNodeInstanceContext computeNodeInstanceContext, final ContextManagerBuilderParameter param, final ContextManager contextManager,
final ClusterPersistRepository repository) {
private void registerOnline(final ComputeNodeInstanceContext computeNodeInstanceContext, final ContextManagerBuilderParameter param, final ContextManager contextManager) {
contextManager.getPersistServiceFacade().getComputeNodePersistService().registerOnline(computeNodeInstanceContext.getInstance());
contextManager.getComputeNodeInstanceContext().getClusterInstanceRegistry().getAllClusterInstances()
.addAll(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadAllComputeNodeInstances());
new DataChangedEventListenerRegistry(contextManager, getDatabaseNames(param, contextManager.getPersistServiceFacade().getMetaDataPersistService())).register();
ClusterEventSubscriberRegistry eventSubscriberRegistry = new ClusterEventSubscriberRegistry(contextManager.getComputeNodeInstanceContext().getEventBusContext());
eventSubscriberRegistry.register(createDeliverEventSubscribers(repository));
eventSubscriberRegistry.register(createDeliverEventSubscribers(contextManager));
}

private Collection<String> getDatabaseNames(final ContextManagerBuilderParameter param, final MetaDataPersistService metaDataPersistService) {
Expand All @@ -90,10 +89,10 @@ private Collection<String> getDatabaseNames(final ContextManagerBuilderParameter
: metaDataPersistService.getDatabaseMetaDataFacade().getDatabase().loadAllDatabaseNames();
}

private Collection<EventSubscriber> createDeliverEventSubscribers(final ClusterPersistRepository repository) {
private Collection<EventSubscriber> createDeliverEventSubscribers(final ContextManager contextManager) {
Collection<EventSubscriber> result = new LinkedList<>();
for (DeliverEventSubscriber each : ShardingSphereServiceLoader.getServiceInstances(DeliverEventSubscriber.class)) {
each.setRepository(repository);
each.setContextManager(contextManager);
result.add(each);
}
return result;
Expand Down

0 comments on commit 3fd7097

Please sign in to comment.