Skip to content

Commit

Permalink
Fix depreactions
Browse files Browse the repository at this point in the history
  • Loading branch information
olegz committed Sep 27, 2024
1 parent 9aabfbe commit 6f9809e
Show file tree
Hide file tree
Showing 17 changed files with 5 additions and 353 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,27 +86,13 @@ public enum StandardHeaders {

}

/**
* When true the offset is committed after each record, otherwise the offsets for the complete set of records
* received from the poll() are committed after all records have been processed.
*/
@Deprecated
private boolean ackEachRecord;

/**
* When true, topic partitions is automatically rebalanced between the members of a consumer group.
* When false, each consumer is assigned a fixed set of partitions based on spring.cloud.stream.instanceCount and spring.cloud.stream.instanceIndex.
*/
private boolean autoRebalanceEnabled = true;

/**
* Whether to autocommit offsets when a message has been processed.
* If set to false, a header with the key kafka_acknowledgment of the type org.springframework.kafka.support.Acknowledgment header
* is present in the inbound message. Applications may use this header for acknowledging messages.
*/
@Deprecated
private boolean autoCommitOffset = true;

/**
* Controlling the container acknowledgement mode. This is the preferred way to control the ack mode on the
* container instead of the deprecated autoCommitOffset property.
Expand Down Expand Up @@ -152,12 +138,6 @@ public enum StandardHeaders {
*/
private KafkaProducerProperties dlqProducerProperties = new KafkaProducerProperties();

/**
* @deprecated No longer used by the binder.
*/
@Deprecated
private int recoveryInterval = 5000;

/**
* List of trusted packages to provide the header mapper.
*/
Expand Down Expand Up @@ -230,53 +210,6 @@ public enum StandardHeaders {
*/
private boolean reactiveAtMostOnce;

/**
* @return if each record needs to be acknowledged.
*
* When true the offset is committed after each record, otherwise the offsets for the complete set of records
* received from the poll() are committed after all records have been processed.
*
* @deprecated since 3.1 in favor of using {@link #ackMode}
*/
@Deprecated
public boolean isAckEachRecord() {
return this.ackEachRecord;
}

/**
* @param ackEachRecord
*
* @deprecated in favor of using {@link #ackMode}
*/
@Deprecated
public void setAckEachRecord(boolean ackEachRecord) {
this.ackEachRecord = ackEachRecord;
}

/**
* @return is autocommit offset enabled
*
* Whether to autocommit offsets when a message has been processed.
* If set to false, a header with the key kafka_acknowledgment of the type org.springframework.kafka.support.Acknowledgment header
* is present in the inbound message. Applications may use this header for acknowledging messages.
*
* @deprecated since 3.1 in favor of using {@link #ackMode}
*/
@Deprecated
public boolean isAutoCommitOffset() {
return this.autoCommitOffset;
}

/**
* @param autoCommitOffset
*
* @deprecated in favor of using {@link #ackMode}
*/
@Deprecated
public void setAutoCommitOffset(boolean autoCommitOffset) {
this.autoCommitOffset = autoCommitOffset;
}

/**
* @return Container's ack mode.
*/
Expand Down Expand Up @@ -348,26 +281,6 @@ public void setAutoCommitOnError(Boolean autoCommitOnError) {
this.autoCommitOnError = autoCommitOnError;
}

/**
* No longer used.
* @return the interval.
* @deprecated No longer used by the binder
*/
@Deprecated
public int getRecoveryInterval() {
return this.recoveryInterval;
}

/**
* No longer used.
* @param recoveryInterval the interval.
* @deprecated No longer needed by the binder
*/
@Deprecated
public void setRecoveryInterval(int recoveryInterval) {
this.recoveryInterval = recoveryInterval;
}

/**
* @return is auto rebalance enabled
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,18 +321,6 @@ public void setProducerListener(ProducerListener<byte[], byte[]> producerListene
this.producerListener = producerListener;
}

/**
* Set a {@link ClientFactoryCustomizer} for the {@link ProducerFactory} and {@link ConsumerFactory} created inside
* the binder.
*
* @param customizer the client factory customizer
* @deprecated in favor of {@link #addClientFactoryCustomizer(ClientFactoryCustomizer)}.
*/
@Deprecated
public void setClientFactoryCustomizer(ClientFactoryCustomizer customizer) {
addClientFactoryCustomizer(customizer);
}

public void addClientFactoryCustomizer(ClientFactoryCustomizer customizer) {
if (customizer != null) {
this.clientFactoryCustomizers.add(customizer);
Expand Down Expand Up @@ -686,17 +674,7 @@ else if (applicationContext != null) {
messageListenerContainer.setBeanName(destination + ".container");
// end of these won't be needed...
ContainerProperties.AckMode ackMode = extendedConsumerProperties.getExtension().getAckMode();
if (ackMode == null) {
if (extendedConsumerProperties.getExtension().isAckEachRecord()) {
ackMode = ContainerProperties.AckMode.RECORD;
}
else {
if (!extendedConsumerProperties.getExtension().isAutoCommitOffset()) {
messageListenerContainer.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL);
}
}
}

if (ackMode != null) {
if ((extendedConsumerProperties.isBatchMode() && ackMode != ContainerProperties.AckMode.RECORD) ||
!extendedConsumerProperties.isBatchMode()) {
Expand Down Expand Up @@ -1425,32 +1403,12 @@ private String getStackTraceAsString(Throwable cause) {
return stringWriter.getBuffer().toString();
}

/**
* Set a {@link ConsumerConfigCustomizer} for the {@link ConsumerFactory} created inside the binder.
* @param consumerConfigCustomizer the consumer config customizer
* @deprecated in favor of {@link #addConsumerConfigCustomizer(ConsumerConfigCustomizer)}.
*/
@Deprecated
public void setConsumerConfigCustomizer(ConsumerConfigCustomizer consumerConfigCustomizer) {
addConsumerConfigCustomizer(consumerConfigCustomizer);
}

public void addConsumerConfigCustomizer(ConsumerConfigCustomizer consumerConfigCustomizer) {
if (consumerConfigCustomizer != null) {
this.consumerConfigCustomizers.add(consumerConfigCustomizer);
}
}

/**
* Set a {@link ProducerConfigCustomizer} for the {@link ProducerFactory} created inside the binder.
* @param producerConfigCustomizer the producer config customizer
* @deprecated in favor of {@link #addProducerConfigCustomizer(ProducerConfigCustomizer)}.
*/
@Deprecated
public void setProducerConfigCustomizer(ProducerConfigCustomizer producerConfigCustomizer) {
addProducerConfigCustomizer(producerConfigCustomizer);
}

public void addProducerConfigCustomizer(ProducerConfigCustomizer producerConfigCustomizer) {
if (producerConfigCustomizer != null) {
this.producerConfigCustomizers.add(producerConfigCustomizer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1515,7 +1515,6 @@ void autoCommitOnErrorWhenManualAcknowledgement() throws Exception {
consumerProperties.setBackOffMaxInterval(150);
//When auto commit is disabled, then the record is committed after publishing to DLQ using the manual acknowledgement.
// (if DLQ is enabled, which is, in this case).
consumerProperties.getExtension().setAutoCommitOffset(false);
consumerProperties.getExtension().setEnableDlq(true);

DirectChannel moduleInputChannel = createBindableChannel("input",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,6 @@ void kafkaBinderExtendedProperties() throws Exception {
customKafkaConsumerProperties.getConfiguration().get("value.serializer"))
.isEqualTo("BarSerializer.class");

assertThat(kafkaConsumerProperties.isAckEachRecord()).isEqualTo(true);
assertThat(customKafkaConsumerProperties.isAckEachRecord()).isEqualTo(false);

RebalanceListener rebalanceListener = context.getBean(RebalanceListener.class);
assertThat(rebalanceListener.latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(rebalanceListener.bindings.keySet()).contains("standard-in",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,6 @@ public void setAdminAddresses(String[] adminAddresses) {
this.adminAddresses = adminAddresses;
}

/**
* @param adminAddresses A comma-separated list of RabbitMQ management plugin URLs.
* @deprecated in favor of {@link #setAdminAddresses(String[])}. Will be removed in a
* future release.
*/
@Deprecated
public void setAdminAdresses(String[] adminAddresses) {
setAdminAddresses(adminAddresses);
}

@Deprecated
public String[] getAdminAdresses() {
return this.adminAddresses;
}

public String[] getNodes() {
return nodes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,42 +180,6 @@ public void setPrefetch(int prefetch) {
this.prefetch = prefetch;
}

/**
* @return the header patterns.
* @deprecated - use {@link #getHeaderPatterns()}.
*/
@Deprecated
public String[] getRequestHeaderPatterns() {
return this.headerPatterns;
}

/**
* @param requestHeaderPatterns request header patterns
* @deprecated - use {@link #setHeaderPatterns(String[])}.
*/
@Deprecated
public void setRequestHeaderPatterns(String[] requestHeaderPatterns) {
this.headerPatterns = requestHeaderPatterns;
}

/**
* @return the tx size.
* @deprecated in favor of {@link #getBatchSize()}
*/
@Deprecated
@Min(value = 1, message = "Tx Size should be greater than zero.")
public int getTxSize() {
return getBatchSize();
}

/**
* @param txSize the tx size
* deprecated in favor of {@link #setBatchSize(int)}.
*/
public void setTxSize(int txSize) {
setBatchSize(txSize);
}

@Min(value = 1, message = "Batch Size should be greater than zero.")
public int getBatchSize() {
return batchSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,24 +163,6 @@ public enum ProducerType {
*/
private boolean superStream;

/**
* @param requestHeaderPatterns the patterns.
* @deprecated - use {@link #setHeaderPatterns(String[])}.
*/
@Deprecated
public void setRequestHeaderPatterns(String[] requestHeaderPatterns) {
this.headerPatterns = requestHeaderPatterns;
}

/**
* @return the header patterns.
* @deprecated - use {@link #getHeaderPatterns()}.
*/
@Deprecated
public String[] getRequestHeaderPatterns() {
return this.headerPatterns;
}

public void setCompress(boolean compress) {
this.compress = compress;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ void consumerProperties() throws Exception {
properties.getExtension().setPrefix("foo.");
properties.getExtension().setPrefetch(20);
properties.getExtension().setHeaderPatterns(new String[] { "foo" });
properties.getExtension().setTxSize(10);
properties.getExtension().setBatchSize(10);
QuorumConfig quorum = properties.getExtension().getQuorum();
quorum.setEnabled(true);
quorum.setDeliveryLimit(10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,6 @@ public void send(Message<?> message) {
this.getChannel(0).send(message);
}

/**
* @param message message to send
* @param inputIndex input index
* @deprecated since 3.0.2 in favor of {@link #receive(long, String)} where you should use the actual binding name (e.g., "foo-in-0")
*/
@Deprecated
public void send(Message<?> message, int inputIndex) {
this.getChannel(inputIndex).send(message);
}

/**
* Allows the {@link Message} to be sent to a Binder's destination.<br>
* This needs a bit of clarification. Just like with any binder, 'destination'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,24 +56,6 @@ public class PartitionHandler {

private volatile int partitionCount;

/**
* Construct a {@code PartitionHandler}.
* @param evaluationContext evaluation context for binder
* @param properties binder properties
* @param partitionKeyExtractorStrategy PartitionKeyExtractor strategy
* @param partitionSelectorStrategy PartitionSelector strategy
*
* @deprecated since 3.0.2. Please use another constructor which allows you to pass an instance of beanFactory
*/
@Deprecated
public PartitionHandler(EvaluationContext evaluationContext,
ProducerProperties properties,
PartitionKeyExtractorStrategy partitionKeyExtractorStrategy,
PartitionSelectorStrategy partitionSelectorStrategy) {

this(evaluationContext, properties, (ConfigurableListableBeanFactory) extractBeanFactoryFromEvaluationContext(evaluationContext));
}

/**
* Construct a {@code PartitionHandler}.
* @param evaluationContext evaluation context for binder
Expand Down
Loading

0 comments on commit 6f9809e

Please sign in to comment.