From 503cb9c9c32f9d08154293877e27af465cea26a1 Mon Sep 17 00:00:00 2001 From: Nemanja Milicevic Date: Sat, 30 Nov 2024 22:43:42 +0100 Subject: [PATCH] Add FLUSHDB command --- .../java/kiwi/core/storage/KeyValueStore.java | 2 ++ .../kiwi/core/storage/bitcask/BitcaskStore.java | 9 +++++++-- .../core/storage/bitcask/BitcaskStoreTest.java | 15 +++++++++++++++ .../src/main/java/kiwi/embedded/Kiwi.java | 5 +++++ .../kiwi/server/resp/command/CommandType.java | 1 + .../server/resp/handler/RESPCommandHandler.java | 6 ++++++ 6 files changed, 36 insertions(+), 2 deletions(-) diff --git a/kiwi-core/src/main/java/kiwi/core/storage/KeyValueStore.java b/kiwi-core/src/main/java/kiwi/core/storage/KeyValueStore.java index 1ef8dd7..48d3f97 100644 --- a/kiwi-core/src/main/java/kiwi/core/storage/KeyValueStore.java +++ b/kiwi-core/src/main/java/kiwi/core/storage/KeyValueStore.java @@ -14,4 +14,6 @@ public interface KeyValueStore extends AutoCloseable { boolean contains(K key); int size(); + + void purge(); } diff --git a/kiwi-core/src/main/java/kiwi/core/storage/bitcask/BitcaskStore.java b/kiwi-core/src/main/java/kiwi/core/storage/bitcask/BitcaskStore.java index 8f392ec..94b0009 100644 --- a/kiwi-core/src/main/java/kiwi/core/storage/bitcask/BitcaskStore.java +++ b/kiwi-core/src/main/java/kiwi/core/storage/bitcask/BitcaskStore.java @@ -154,8 +154,9 @@ public int size() { return keyDir.size(); } - private Supplier activeSegmentSupplier() { - return () -> activeSegment; + @Override + public void purge() { + keyDir.keys().asIterator().forEachRemaining(this::delete); } @Override @@ -164,6 +165,10 @@ public void close() { writer.close(); } + private Supplier activeSegmentSupplier() { + return () -> activeSegment; + } + public static Builder Builder() { return new Builder(); } diff --git a/kiwi-core/src/test/java/kiwi/core/storage/bitcask/BitcaskStoreTest.java b/kiwi-core/src/test/java/kiwi/core/storage/bitcask/BitcaskStoreTest.java index a869cd8..63808fc 100644 --- a/kiwi-core/src/test/java/kiwi/core/storage/bitcask/BitcaskStoreTest.java +++ b/kiwi-core/src/test/java/kiwi/core/storage/bitcask/BitcaskStoreTest.java @@ -155,6 +155,21 @@ void testRollActiveSegment() throws IOException { assertEquals("3", store.get(Bytes.wrap("c")).orElseThrow().toString()); } + @Test + void testPurge() { + BitcaskStore store = BitcaskStore.open(root); + store.put(Bytes.wrap("k1"), Bytes.wrap("v1")); + store.put(Bytes.wrap("k2"), Bytes.wrap("v2")); + store.put(Bytes.wrap("k3"), Bytes.wrap("v3")); + store.delete(Bytes.wrap("k1")); + + assertEquals(2, store.size()); + + store.purge(); + + assertEquals(0, store.size()); + } + @Test void testFileClose() throws IOException { FileChannel channel = FileChannel.open(root.resolve("test.log"), StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE); diff --git a/kiwi-embedded/src/main/java/kiwi/embedded/Kiwi.java b/kiwi-embedded/src/main/java/kiwi/embedded/Kiwi.java index 6a1e21c..1a53d0e 100644 --- a/kiwi-embedded/src/main/java/kiwi/embedded/Kiwi.java +++ b/kiwi-embedded/src/main/java/kiwi/embedded/Kiwi.java @@ -58,6 +58,11 @@ public int size() { return store.size(); } + @Override + public void purge() { + store.purge(); + } + @Override public void close() throws Exception { store.close(); diff --git a/kiwi-server/src/main/java/kiwi/server/resp/command/CommandType.java b/kiwi-server/src/main/java/kiwi/server/resp/command/CommandType.java index 2e7c4fc..df00ea1 100644 --- a/kiwi-server/src/main/java/kiwi/server/resp/command/CommandType.java +++ b/kiwi-server/src/main/java/kiwi/server/resp/command/CommandType.java @@ -11,5 +11,6 @@ public enum CommandType { DEL, EXISTS, DBSIZE, + FLUSHDB, UNKNOWN, } diff --git a/kiwi-server/src/main/java/kiwi/server/resp/handler/RESPCommandHandler.java b/kiwi-server/src/main/java/kiwi/server/resp/handler/RESPCommandHandler.java index 0d8b779..d76448a 100644 --- a/kiwi-server/src/main/java/kiwi/server/resp/handler/RESPCommandHandler.java +++ b/kiwi-server/src/main/java/kiwi/server/resp/handler/RESPCommandHandler.java @@ -32,6 +32,7 @@ protected void channelRead0(ChannelHandlerContext ctx, RESPCommand command) { case DEL -> handleDelete(ctx, command); case EXISTS -> handleExists(ctx, command); case DBSIZE -> handleSize(ctx, command); + case FLUSHDB -> handleFlush(ctx, command); case UNKNOWN -> handleUnknown(ctx, command); } } @@ -118,6 +119,11 @@ private void handleSize(ChannelHandlerContext ctx, RESPCommand ignoredCommand) { ctx.writeAndFlush(size); } + private void handleFlush(ChannelHandlerContext ctx, RESPCommand ignoredCommand) { + db.purge(); + ctx.writeAndFlush("OK"); + } + private void handleUnknown(ChannelHandlerContext ctx, RESPCommand command) { ctx.writeAndFlush(new Throwable("unknown command: " + command.commandType())); }