Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyang377 committed Oct 19, 2024
2 parents 525a2af + f2ebcab commit 370ef05
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.channel;

import org.apache.shardingsphere.data.pipeline.core.channel.memory.MemoryPipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
import org.apache.shardingsphere.infra.algorithm.core.config.AlgorithmConfiguration;
import org.junit.jupiter.api.Test;

import java.util.Properties;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;

class IncrementalChannelCreatorTest {

@Test
void assertCreate() {
assertThat(IncrementalChannelCreator.create(new AlgorithmConfiguration("MEMORY", new Properties()), mock(IncrementalTaskProgress.class)), instanceOf(MemoryPipelineChannel.class));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.channel;

import org.apache.shardingsphere.data.pipeline.core.channel.memory.MemoryPipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import org.apache.shardingsphere.infra.algorithm.core.config.AlgorithmConfiguration;
import org.junit.jupiter.api.Test;

import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;

class InventoryChannelCreatorTest {

@Test
void assertCreate() {
assertThat(InventoryChannelCreator.create(new AlgorithmConfiguration("MEMORY", new Properties()), 1, new AtomicReference<>(mock(IngestPosition.class))),
instanceOf(MemoryPipelineChannel.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

class MemoryPipelineChannelTest {

Expand All @@ -58,4 +60,38 @@ void assertFetchWithZeroTimeout() {
channel.push(records);
assertThat(channel.fetch(10, 0L), is(records));
}

@Test
void assertPeekWithRecords() {
MemoryPipelineChannel channel = new MemoryPipelineChannel(100, new InventoryTaskAckCallback(new AtomicReference<>()));
List<Record> records = Collections.singletonList(new PlaceholderRecord(new IngestFinishedPosition()));
channel.push(records);
assertThat(channel.peek(), is(records));
}

@Test
void assertPeekWithoutRecords() {
assertThat(new MemoryPipelineChannel(100, new InventoryTaskAckCallback(new AtomicReference<>())).peek(), is(Collections.emptyList()));
}

@Test
void assertPollWithRecords() {
MemoryPipelineChannel channel = new MemoryPipelineChannel(100, new InventoryTaskAckCallback(new AtomicReference<>()));
List<Record> records = Collections.singletonList(new PlaceholderRecord(new IngestFinishedPosition()));
channel.push(records);
assertThat(channel.poll(), is(records));
}

@Test
void assertPollWithoutRecords() {
assertThat(new MemoryPipelineChannel(100, new InventoryTaskAckCallback(new AtomicReference<>())).poll(), is(Collections.emptyList()));
}

@Test
void assertAck() {
InventoryTaskAckCallback callback = mock(InventoryTaskAckCallback.class);
List<Record> records = Collections.singletonList(new PlaceholderRecord(new IngestFinishedPosition()));
new MemoryPipelineChannel(100, callback).ack(records);
verify(callback).onAck(records);
}
}

0 comments on commit 370ef05

Please sign in to comment.