Skip to content

Commit

Permalink
Merge pull request wildfly#17067 from pferraro/WFLY-18233
Browse files Browse the repository at this point in the history
WFLY-18233 Optimize payload of ATTRIBUTE granularity mapping in distributed session manager using Infinispan's Cache.compute(...)
  • Loading branch information
pferraro authored Oct 5, 2023
2 parents 2fe0ed4 + 58929bb commit 5e96ef5
Show file tree
Hide file tree
Showing 65 changed files with 575 additions and 996 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package org.wildfly.clustering.ee.cache.function;

import java.util.Collections;
import java.util.Map;

/**
Expand All @@ -17,14 +16,10 @@
public class MapPutFunction<K, V> extends MapComputeFunction<K, V> {

public MapPutFunction(K key, V value) {
super(Collections.singletonMap(key, value));
super(Map.of(key, value));
}

MapPutFunction(Map.Entry<K, V> entry) {
this(entry.getKey(), entry.getValue());
}

public Map.Entry<K, V> getEntry() {
return this.getOperand().entrySet().iterator().next();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,4 @@ public class MapRemoveFunction<K, V> extends MapComputeFunction<K, V> {
public MapRemoveFunction(K key) {
super(Collections.singletonMap(key, null));
}

public K getKey() {
return this.getOperand().keySet().iterator().next();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package org.wildfly.clustering.ee.cache.function;

import java.util.Collection;
import java.util.Collections;
import java.util.Set;

/**
Expand All @@ -17,7 +16,7 @@
public class SetAddFunction<V> extends CollectionAddFunction<V, Set<V>> {

public SetAddFunction(V value) {
this(Collections.singleton(value));
this(Set.of(value));
}

public SetAddFunction(Collection<V> values) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package org.wildfly.clustering.ee.cache.function;

import java.util.Collection;
import java.util.Collections;
import java.util.Set;

/**
Expand All @@ -17,7 +16,7 @@
public class SetRemoveFunction<V> extends CollectionRemoveFunction<V, Set<V>> {

public SetRemoveFunction(V value) {
this(Collections.singleton(value));
this(Set.of(value));
}

public SetRemoveFunction(Collection<V> values) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright The WildFly Authors
* SPDX-License-Identifier: Apache-2.0
*/

package org.wildfly.clustering.ee.hotrod;

import java.util.function.BiFunction;
import java.util.function.Function;

import org.infinispan.client.hotrod.RemoteCache;
import org.wildfly.clustering.ee.Mutator;
import org.wildfly.clustering.ee.MutatorFactory;

/**
* Factory that creates compute-based Mutator instances.
* @author Paul Ferraro
* @param <K> the cache key type
* @param <V> the cache value type
* @param <O> the function operand type
*/
public class RemoteCacheComputeMutatorFactory<K, V, O> implements MutatorFactory<K, O> {

private final RemoteCache<K, V> cache;
private final Function<O, BiFunction<Object, V, V>> functionFactory;

public RemoteCacheComputeMutatorFactory(RemoteCache<K, V> cache, Function<O, BiFunction<Object, V, V>> functionFactory) {
this.cache = cache;
this.functionFactory = functionFactory;
}

@Override
public Mutator createMutator(K key, O operand) {
return new RemoteCacheEntryComputeMutator<>(this.cache, key, this.functionFactory.apply(operand));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright The WildFly Authors
* SPDX-License-Identifier: Apache-2.0
*/

package org.wildfly.clustering.ee.hotrod;

import java.util.function.BiFunction;

import org.infinispan.client.hotrod.RemoteCache;
import org.wildfly.clustering.ee.Mutator;

/**
* Mutator for a cache entry using a compute function.
* @author Paul Ferraro
* @param <K> the cache key type
* @param <V> the cache value type
*/
public class RemoteCacheEntryComputeMutator<K, V> implements Mutator {

private final RemoteCache<K, V> cache;
private final K key;
private final BiFunction<Object, V, V> function;

public RemoteCacheEntryComputeMutator(RemoteCache<K, V> cache, K key, BiFunction<Object, V, V> function) {
this.cache = cache;
this.key = key;
this.function = function;
}

@Override
public void mutate() {
this.cache.compute(this.key, this.function);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright The WildFly Authors
* SPDX-License-Identifier: Apache-2.0
*/

package org.wildfly.clustering.ee.infinispan;

import java.util.function.BiFunction;
import java.util.function.Function;

import org.infinispan.Cache;
import org.wildfly.clustering.ee.Mutator;
import org.wildfly.clustering.ee.MutatorFactory;

/**
* Factory that creates compute-based Mutator instances.
* @author Paul Ferraro
* @param <K> the cache key type
* @param <V> the cache value type
* @param <O> the function operand type
*/
public class CacheComputeMutatorFactory<K, V, O> implements MutatorFactory<K, O> {

private final Cache<K, V> cache;
private final Function<O, BiFunction<Object, V, V>> functionFactory;

public CacheComputeMutatorFactory(Cache<K, V> cache, Function<O, BiFunction<Object, V, V>> functionFactory) {
this.cache = cache;
this.functionFactory = functionFactory;
}

@Override
public Mutator createMutator(K key, O operand) {
return new CacheEntryComputeMutator<>(this.cache, key, this.functionFactory.apply(operand));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright The WildFly Authors
* SPDX-License-Identifier: Apache-2.0
*/

package org.wildfly.clustering.ee.infinispan;

import java.util.function.BiFunction;

import org.infinispan.Cache;
import org.infinispan.context.Flag;
import org.wildfly.clustering.ee.Mutator;

/**
* Mutator for a cache entry using a compute function.
* @author Paul Ferraro
* @param <K> the cache key type
* @param <V> the cache value type
*/
public class CacheEntryComputeMutator<K, V> implements Mutator {

private final Cache<K, V> cache;
private final K key;
private final BiFunction<Object, V, V> function;

public CacheEntryComputeMutator(Cache<K, V> cache, K key, BiFunction<Object, V, V> function) {
this.cache = cache;
this.key = key;
this.function = function;
}

@Override
public void mutate() {
// Use FAIL_SILENTLY to prevent mutation from failing locally due to remote exceptions
this.cache.getAdvancedCache().withFlags(Flag.IGNORE_RETURN_VALUES, Flag.FAIL_SILENTLY).compute(this.key, this.function);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@
public class CacheEntryMutator<K, V> implements Mutator {

private final Cache<K, V> cache;
private final K id;
private final K key;
private final V value;
private final AtomicBoolean mutated;

public CacheEntryMutator(Cache<K, V> cache, Map.Entry<K, V> entry) {
this(cache, entry.getKey(), entry.getValue());
}

public CacheEntryMutator(Cache<K, V> cache, K id, V value) {
public CacheEntryMutator(Cache<K, V> cache, K key, V value) {
this.cache = cache;
this.id = id;
this.key = key;
this.value = value;
this.mutated = cache.getCacheConfiguration().transaction().transactionMode().isTransactional() ? new AtomicBoolean(false) : null;
}
Expand All @@ -39,7 +39,7 @@ public void mutate() {
// We only ever have to perform a replace once within a batch
if ((this.mutated == null) || this.mutated.compareAndSet(false, true)) {
// Use FAIL_SILENTLY to prevent mutation from failing locally due to remote exceptions
this.cache.getAdvancedCache().withFlags(Flag.IGNORE_RETURN_VALUES, Flag.FAIL_SILENTLY).put(this.id, this.value);
this.cache.getAdvancedCache().withFlags(Flag.IGNORE_RETURN_VALUES, Flag.FAIL_SILENTLY).put(this.key, this.value);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@
* Factory for creating {@link Mutator} objects for an Infinispan cache.
* @author Paul Ferraro
*/
public class InfinispanMutatorFactory<K, V> implements MutatorFactory<K, V> {
public class CacheMutatorFactory<K, V> implements MutatorFactory<K, V> {

private final Cache<K, V> cache;
private final CacheProperties properties;

public InfinispanMutatorFactory(Cache<K, V> cache) {
public CacheMutatorFactory(Cache<K, V> cache) {
this(cache, new InfinispanCacheProperties(cache.getCacheConfiguration()));
}

public InfinispanMutatorFactory(Cache<K, V> cache, CacheProperties properties) {
public CacheMutatorFactory(Cache<K, V> cache, CacheProperties properties) {
this.cache = cache;
this.properties = properties;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.wildfly.clustering.ee.MutatorFactory;
import org.wildfly.clustering.ee.Remover;
import org.wildfly.clustering.ee.infinispan.InfinispanConfiguration;
import org.wildfly.clustering.ee.infinispan.InfinispanMutatorFactory;
import org.wildfly.clustering.ee.infinispan.CacheMutatorFactory;
import org.wildfly.clustering.ejb.bean.BeanInstance;
import org.wildfly.clustering.ejb.cache.bean.BeanGroupKey;
import org.wildfly.clustering.marshalling.spi.MarshalledValue;
Expand All @@ -34,7 +34,7 @@ public class InfinispanBeanGroupManager<K, V extends BeanInstance<K>, C> impleme
public InfinispanBeanGroupManager(InfinispanConfiguration configuration) {
this.cache = configuration.getCache();
this.removeCache = configuration.getWriteOnlyCache();
this.mutatorFactory = new InfinispanMutatorFactory<>(configuration.getCache());
this.mutatorFactory = new CacheMutatorFactory<>(configuration.getCache());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.wildfly.clustering.ee.Key;
import org.wildfly.clustering.ee.Mutator;
import org.wildfly.clustering.ee.MutatorFactory;
import org.wildfly.clustering.ee.infinispan.InfinispanMutatorFactory;
import org.wildfly.clustering.ee.infinispan.CacheMutatorFactory;
import org.wildfly.clustering.ejb.bean.BeanExpiration;
import org.wildfly.clustering.ejb.bean.BeanInstance;
import org.wildfly.clustering.ejb.bean.BeanMetaData;
Expand Down Expand Up @@ -50,7 +50,7 @@ public InfinispanBeanMetaDataFactory(InfinispanBeanMetaDataFactoryConfiguration
this.expiration = configuration.getExpiration();
boolean scheduledExpiration = (this.expiration != null) && !this.expiration.getTimeout().isZero();
this.accessMetaDataCache = scheduledExpiration ? configuration.getCache() : null;
this.mutatorFactory = (this.accessMetaDataCache != null) ? new InfinispanMutatorFactory<>(this.accessMetaDataCache) : null;
this.mutatorFactory = (this.accessMetaDataCache != null) ? new CacheMutatorFactory<>(this.accessMetaDataCache) : null;
this.beanName = configuration.getBeanName();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,25 @@
*/
public interface Marshaller<V, S> extends Marshallability {

static <T> Marshaller<T, T> identity() {
return new Marshaller<>() {
@Override
public boolean isMarshallable(Object object) {
return true;
}

@Override
public T read(T value) throws IOException {
return value;
}

@Override
public T write(T value) throws IOException {
return value;
}
};
}

/**
* Reads a value from its marshalled form.
* @param value the marshalled form
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,9 @@ public CompletionStage<Void> topologyChanged(TopologyChangedEvent<T, Set<Address
this.executor.execute(() -> {
if (!leftMembers.isEmpty()) {
try (Batch batch = batcher.createBatch()) {
try (CloseableIterator<Map.Entry<T, Set<Address>>> entries = cache.getAdvancedCache().withFlags(Flag.FORCE_WRITE_LOCK).entrySet().iterator()) {
while (entries.hasNext()) {
Map.Entry<T, Set<Address>> entry = entries.next();
Set<Address> addresses = entry.getValue();
if (addresses.removeAll(leftMembers)) {
entry.setValue(addresses);
}
try (CloseableIterator<T> keys = cache.getAdvancedCache().withFlags(Flag.FORCE_WRITE_LOCK).keySet().iterator()) {
while (keys.hasNext()) {
cache.getAdvancedCache().withFlags(Flag.FORCE_SYNCHRONOUS, Flag.IGNORE_RETURN_VALUES).compute(keys.next(), new AddressSetRemoveFunction(leftMembers));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package org.wildfly.clustering.server.infinispan.provider;

import "org.jgroups.stack.proto";
import "org.jgroups.util.proto";
import "org.infinispan.remoting.transport.jgroups.proto";

// IDs: 140 -144
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
package org.wildfly.clustering.web.cache.session;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.wildfly.clustering.web.session.ImmutableSessionAttributes;

Expand All @@ -20,11 +21,11 @@ public class SimpleImmutableSessionAttributes implements ImmutableSessionAttribu
private final Map<String, Object> attributes;

public SimpleImmutableSessionAttributes(ImmutableSessionAttributes attributes) {
Map<String, Object> map = new HashMap<>();
for (String name: attributes.getAttributeNames()) {
map.put(name, attributes.getAttribute(name));
}
this.attributes = Collections.unmodifiableMap(map);
this(attributes.getAttributeNames().stream().collect(Collectors.toMap(Function.identity(), attributes::getAttribute)));
}

public SimpleImmutableSessionAttributes(Map<String, Object> attributes) {
this.attributes = Collections.unmodifiableMap(attributes);
}

@Override
Expand Down
Loading

0 comments on commit 5e96ef5

Please sign in to comment.