Skip to content

Commit

Permalink
Introduce dual layer (disk / heap) S3 LRU cache for cache nodes (#1123)
Browse files Browse the repository at this point in the history
Co-authored-by: Bryan Burkholder <[email protected]>
  • Loading branch information
bryanlb and bryanlb authored Oct 16, 2024
1 parent 0c61808 commit a5c6ab0
Show file tree
Hide file tree
Showing 9 changed files with 738 additions and 119 deletions.
6 changes: 6 additions & 0 deletions astra/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,12 @@
<version>33.3.1-jre</version>
</dependency>

<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>3.1.8</version>
</dependency>

<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
Expand Down
185 changes: 185 additions & 0 deletions astra/src/main/java/com/slack/astra/blobfs/DiskCachePagingLoader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package com.slack.astra.blobfs;

import static com.slack.astra.blobfs.S3CachePagingLoader.DISK_PAGE_SIZE;
import static com.slack.astra.util.SizeConstant.GB;

import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.github.benmanes.caffeine.cache.Weigher;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.FileTransformerConfiguration;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;

/** Caches S3 data onto the disk, helping to reduce the penalty of S3 read latency. */
public class DiskCachePagingLoader {

// Total size of the disk cache - may temporarily exceed this so some buffer should be available
// on the host
public static final String ASTRA_S3_STREAMING_DISK_CACHE_SIZE = "astra.s3Streaming.diskCacheSize";
protected static final long DISK_CACHE_SIZE =
Long.parseLong(
System.getProperty(ASTRA_S3_STREAMING_DISK_CACHE_SIZE, String.valueOf(200 * GB)));

private static final Logger LOG = LoggerFactory.getLogger(DiskCachePagingLoader.class);

private final LoadingCache<LoadingCacheKey, Path> diskCache =
Caffeine.newBuilder()
.maximumWeight(DISK_CACHE_SIZE)
.scheduler(Scheduler.systemScheduler())
.evictionListener(evictionListener())
.weigher(weigher())
.build(this.bytesCacheLoader());

private final BlobStore blobStore;
private final S3AsyncClient s3AsyncClient;
private final long pageSize;

public DiskCachePagingLoader(BlobStore blobStore, S3AsyncClient s3AsyncClient, long pageSize) {
this.blobStore = blobStore;
this.s3AsyncClient = s3AsyncClient;
this.pageSize = pageSize;
}

private static Weigher<LoadingCacheKey, Path> weigher() {
return (_, value) -> {
try {
// todo - consider reworking weights to use kb instead of bytes? This will fail if files
// exceed 2GB (int overflow)
long fileSize = Files.size(value);
LOG.debug("Calculated size of path {} is {} bytes", value, fileSize);
return Math.toIntExact(fileSize);
} catch (IOException e) {
LOG.error("Error calculating size", e);
// if we can't calculate just use the max page size
return Math.toIntExact(DISK_PAGE_SIZE);
}
};
}

private static RemovalListener<LoadingCacheKey, Path> evictionListener() {
return (cacheKey, path, removalCause) -> {
if (cacheKey != null) {
LOG.debug(
"Evicting from disk cache - chunkID: {} / filename: {}, fromOffset: {}, toOffset: {} / cause: {}",
cacheKey.getChunkId(),
cacheKey.getFilename(),
cacheKey.getFromOffset(),
cacheKey.getToOffset(),
removalCause);
}
if (path != null) {
try {
Files.deleteIfExists(path);
} catch (IOException e) {
LOG.error("Failed to delete file {}, {}, {}", cacheKey, path, removalCause, e);
}
} else {
LOG.error("Path was unexpectedly null, {}, {}, {}", cacheKey, path, removalCause);
}
};
}

private CacheLoader<LoadingCacheKey, Path> bytesCacheLoader() {
return key -> {
LOG.debug(
"Using S3 to load disk cache - chunkID: {} / filename: {}, fromOffset: {}, toOffset: {}",
key.getChunkId(),
key.getFilename(),
key.getFromOffset(),
key.getToOffset());

// todo - consider making this configurable to a different directory (or using the data dir
// value)
Path filePath =
Path.of(
System.getProperty("java.io.tmpdir"),
String.format(
"astra-cache-%s-%s-%s-%s.tmp",
key.getChunkId(), key.getFilename(), key.getFromOffset(), key.getToOffset()));
s3AsyncClient
.getObject(
GetObjectRequest.builder()
.bucket(blobStore.bucketName)
.key(key.getPath())
.range(String.format("bytes=%s-%s", key.getFromOffset(), key.getToOffset()))
.build(),
AsyncResponseTransformer.toFile(
filePath,
FileTransformerConfiguration.builder()
.failureBehavior(FileTransformerConfiguration.FailureBehavior.DELETE)
.fileWriteOption(
FileTransformerConfiguration.FileWriteOption.CREATE_OR_REPLACE_EXISTING)
.build()))
.get();
return filePath;
};
}

public void readBytes(
String chunkId,
String filename,
byte[] b,
int startingOffset,
long originalPointer,
int totalLength)
throws ExecutionException, IOException {
// pointer here is the "global" file pointer
int currentOffset = startingOffset;
long currentPointer = originalPointer;
int remainingLengthToRead = totalLength;

for (LoadingCacheKey cacheKey : getCacheKeys(chunkId, filename, originalPointer, totalLength)) {
// the relative pointer for the file on disk
long relativePointer = currentPointer % pageSize;

try (FileChannel fileChannel =
FileChannel.open(diskCache.get(cacheKey), StandardOpenOption.READ)) {
fileChannel.position(relativePointer);

// if we need to read in everything
if (currentPointer + remainingLengthToRead > cacheKey.getToOffset()) {
// read from the relative pointer to the end
int lengthToRead = Math.toIntExact(pageSize - relativePointer);
ByteBuffer byteBuffer = ByteBuffer.wrap(b, currentOffset, lengthToRead);
fileChannel.read(byteBuffer);

currentOffset += lengthToRead;
currentPointer += lengthToRead;
remainingLengthToRead -= lengthToRead;
} else {
ByteBuffer byteBuffer = ByteBuffer.wrap(b, currentOffset, remainingLengthToRead);
fileChannel.read(byteBuffer);
break;
}
}
}
}

public List<LoadingCacheKey> getCacheKeys(
String chunkId, String filename, long originalPointer, int len) {
long startingPage = Math.floorDiv(originalPointer, pageSize);
long endingPage = Math.ceilDiv(originalPointer + len, pageSize);

List<LoadingCacheKey> cacheKeys = new ArrayList<>(Math.toIntExact(endingPage - startingPage));
for (long i = startingPage; i < endingPage; i++) {
cacheKeys.add(
new LoadingCacheKey(chunkId, filename, i * pageSize, i * pageSize + pageSize - 1));
}
return cacheKeys;
}
}
155 changes: 155 additions & 0 deletions astra/src/main/java/com/slack/astra/blobfs/HeapCachePagingLoader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package com.slack.astra.blobfs;

import static com.slack.astra.util.SizeConstant.GB;

import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.github.benmanes.caffeine.cache.Weigher;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;

/** Caches S3 data into the heap, passing through to the disk cache if the data is not available. */
public class HeapCachePagingLoader {
private static final Logger LOG = LoggerFactory.getLogger(HeapCachePagingLoader.class);

public static final String ASTRA_S3_STREAMING_HEAP_CACHE_SIZE = "astra.s3Streaming.heapCacheSize";
protected static final long HEAP_CACHE_SIZE =
Long.parseLong(
System.getProperty(ASTRA_S3_STREAMING_HEAP_CACHE_SIZE, String.valueOf(1 * GB)));

private final LoadingCache<LoadingCacheKey, byte[]> heapCache =
Caffeine.newBuilder()
.maximumWeight(HEAP_CACHE_SIZE)
.scheduler(Scheduler.systemScheduler())
.removalListener(heapRemovalListener())
.weigher(weigher())
.build(this.bytesCacheLoader());

private final LoadingCache<LoadingCacheKey, Long> fileLengthCache =
Caffeine.newBuilder().maximumSize(25000).build(this.fileLengthLoader());

private final BlobStore blobStore;
private final S3AsyncClient s3AsyncClient;
private final DiskCachePagingLoader diskCachePagingLoader;
private final long pageSize;

public HeapCachePagingLoader(
BlobStore blobStore,
S3AsyncClient s3AsyncClient,
DiskCachePagingLoader diskCachePagingLoader,
long pageSize) {
this.blobStore = blobStore;
this.s3AsyncClient = s3AsyncClient;
this.diskCachePagingLoader = diskCachePagingLoader;
this.pageSize = pageSize;
}

private static Weigher<LoadingCacheKey, byte[]> weigher() {
return (_, value) -> value.length;
}

private static RemovalListener<LoadingCacheKey, byte[]> heapRemovalListener() {
return (key, _, cause) -> {
if (key != null) {
LOG.debug(
"Evicting from heap cache - chunkID: {} / filename: {}, fromOffset: {}, toOffset: {} / cause: {}",
key.getChunkId(),
key.getFilename(),
key.getFromOffset(),
key.getToOffset(),
cause);
}
};
}

private CacheLoader<LoadingCacheKey, byte[]> bytesCacheLoader() {
return key -> {
LOG.debug(
"Using disk cache to load heap cache - chunkID: {} / filename: {}, fromOffset: {}, toOffset: {}",
key.getChunkId(),
key.getFilename(),
key.getFromOffset(),
key.getToOffset());
int length = Math.toIntExact(key.getToOffset() - key.getFromOffset() + 1);
byte[] bytes = new byte[length];
diskCachePagingLoader.readBytes(
key.getChunkId(), key.getFilename(), bytes, 0, key.getFromOffset(), length);
return bytes;
};
}

private CacheLoader<LoadingCacheKey, Long> fileLengthLoader() {
return key ->
s3AsyncClient
.headObject(
HeadObjectRequest.builder().bucket(blobStore.bucketName).key(key.getPath()).build())
.get()
.contentLength();
}

public void readBytes(
String chunkId,
String filename,
byte[] b,
int startingOffset,
long originalPointer,
int totalLength)
throws ExecutionException, IOException {
int currentOffset = startingOffset;
long currentPointer = originalPointer;
int remainingLengthToRead = totalLength;

for (LoadingCacheKey cacheKey : getCacheKeys(chunkId, filename, originalPointer, totalLength)) {
long relativePointer = currentPointer % pageSize;
// if we need to read in everything
if (currentPointer + remainingLengthToRead > cacheKey.getToOffset()) {
// read from the relative pointer to the end
int lengthToRead = Math.toIntExact(pageSize - relativePointer);
System.arraycopy(
heapCache.get(cacheKey),
Math.toIntExact(relativePointer),
b,
currentOffset,
lengthToRead);

currentOffset += lengthToRead;
currentPointer += lengthToRead;
remainingLengthToRead -= lengthToRead;
} else {
System.arraycopy(
heapCache.get(cacheKey),
Math.toIntExact(relativePointer),
b,
currentOffset,
remainingLengthToRead);
break;
}
}
}

public long length(String chunkId, String filename) throws ExecutionException {
return fileLengthCache.get(new LoadingCacheKey(chunkId, filename));
}

public List<LoadingCacheKey> getCacheKeys(
String chunkId, String filename, long originalPointer, int len) {
long startingPage = Math.floorDiv(originalPointer, pageSize);
long endingPage = Math.ceilDiv(originalPointer + len, pageSize);

List<LoadingCacheKey> cacheKeys = new ArrayList<>(Math.toIntExact(endingPage - startingPage));
for (long i = startingPage; i < endingPage; i++) {
cacheKeys.add(
new LoadingCacheKey(chunkId, filename, i * pageSize, i * pageSize + pageSize - 1));
}
return cacheKeys;
}
}
Loading

0 comments on commit a5c6ab0

Please sign in to comment.