Skip to content

Commit

Permalink
Activated store transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
spoto committed May 1, 2024
1 parent d6e3190 commit d2d7552
Show file tree
Hide file tree
Showing 38 changed files with 556 additions and 926 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public static class DiskNodeConfigBuilderImpl extends AbstractLocalNodeConfigBui
/**
* The number of transactions that fit inside a block.
*/
private long transactionsPerBlock = 5;
private long transactionsPerBlock = 10;

/**
* Creates a builder with default values for the properties.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public DiskNodeConfig getLocalConfig() {

@Override
protected DiskStore mkStore() {
return new DiskStore(getLocalConfig().getDir(), getLocalConfig().getTransactionsPerBlock());
return new DiskStore(getLocalConfig().getDir());
}

@Override
Expand All @@ -93,7 +93,7 @@ protected long getNow() {
}

@Override
protected StoreTransaction<?> getTransaction() {
public StoreTransaction<?> getTransaction() {
return transaction;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,43 +68,26 @@ class DiskStore extends AbstractStore<DiskStore> {
*/
private final AtomicReference<StorageReference> manifest;

/**
* The number of transactions added to the store. This is used to associate
* each transaction to its progressive number.
*/
private final AtomicInteger transactionsCount;

/**
* A map from the transactions added to the store to their progressive number.
* This is needed in order to give a nice presentation of transactions, inside a
* directory for its block.
*/
private final ConcurrentMap<TransactionReference, Integer> progressive;
private final ConcurrentMap<TransactionReference, Path> paths = new ConcurrentHashMap<>();

/**
* The path where the database of the store gets created.
*/
private final Path dir;

/**
* The number of transactions that fit inside a block.
*/
private final long transactionsPerBlock;
private final AtomicInteger blockNumber;

/**
* Creates a state for a node.
*
* @param dir the path where the database of the store gets created
* @param transactionsPerBlock the number of transactions that fit inside a block
*/
DiskStore(Path dir, long transactionsPerBlock) {
DiskStore(Path dir) {
this.dir = dir;
this.transactionsPerBlock = transactionsPerBlock;
this.histories = new ConcurrentHashMap<>();
this.errors = new ConcurrentHashMap<>();
this.progressive = new ConcurrentHashMap<>();
this.manifest = new AtomicReference<>();
this.transactionsCount = new AtomicInteger();
this.blockNumber = new AtomicInteger(0);
}

/**
Expand All @@ -116,18 +99,20 @@ private DiskStore(DiskStore toClone) {
super(toClone);

this.dir = toClone.dir;
this.transactionsPerBlock = toClone.transactionsPerBlock;
this.histories = toClone.histories;
this.errors = toClone.errors;
this.progressive = toClone.progressive;
this.manifest = toClone.manifest;
this.transactionsCount = toClone.transactionsCount;
this.blockNumber = toClone.blockNumber;
}

@Override
public Optional<TransactionResponse> getResponse(TransactionReference reference) {
try {
Path response = getPathFor(reference, "response");
Path path = paths.get(reference);
if (path == null)
return Optional.empty();

Path response = path.resolve("response");
try (var context = NodeUnmarshallingContexts.of(Files.newInputStream(response))) {
return Optional.of(TransactionResponses.from(context));
}
Expand All @@ -137,11 +122,6 @@ public Optional<TransactionResponse> getResponse(TransactionReference reference)
}
}

@Override
public Optional<TransactionResponse> getResponseUncommitted(TransactionReference reference) {
return getResponse(reference);
}

@Override
public Optional<String> getError(TransactionReference reference) {
return Optional.ofNullable(errors.get(reference));
Expand All @@ -153,25 +133,19 @@ public Stream<TransactionReference> getHistory(StorageReference object) {
return history == null ? Stream.empty() : Stream.of(history);
}

@Override
public Stream<TransactionReference> getHistoryUncommitted(StorageReference object) {
return getHistory(object);
}

@Override
public Optional<StorageReference> getManifest() {
return Optional.ofNullable(manifest.get());
}

@Override
public Optional<StorageReference> getManifestUncommitted() {
return getManifest();
}

@Override
public Optional<TransactionRequest<?>> getRequest(TransactionReference reference) {
try {
Path response = getPathFor(reference, "request");
Path path = paths.get(reference);
if (path == null)
return Optional.empty();

Path response = path.resolve("request");
try (var context = NodeUnmarshallingContexts.of(Files.newInputStream(response))) {
return Optional.of(TransactionRequests.from(context));
}
Expand All @@ -196,65 +170,52 @@ protected DiskStore mkClone() {
return new DiskStore(this);
}

@Override
protected DiskStore setRequest(TransactionReference reference, TransactionRequest<?> request) throws StoreException {
protected void setRequest(int progressive, TransactionReference reference, TransactionRequest<?> request) throws StoreException {
try {
progressive.computeIfAbsent(reference, _reference -> transactionsCount.getAndIncrement());
Path requestPath = getPathFor(reference, "request");
Path requestPath = getPathFor(progressive, reference, "request");
Path parent = requestPath.getParent();
paths.put(reference, parent);
ensureDeleted(parent);
Files.createDirectories(parent);

Files.writeString(getPathFor(reference, "request.txt"), request.toString(), StandardCharsets.UTF_8);
Files.writeString(getPathFor(progressive, reference, "request.txt"), request.toString(), StandardCharsets.UTF_8);

try (var context = NodeMarshallingContexts.of(Files.newOutputStream(requestPath))) {
request.into(context);
}

return this;
}
catch (IOException e) {
throw new StoreException(e);
}
}

@Override
protected DiskStore setResponse(TransactionReference reference, TransactionResponse response) throws StoreException {
protected void setResponse(int progressive, TransactionReference reference, TransactionResponse response) throws StoreException {
try {
progressive.computeIfAbsent(reference, _reference -> transactionsCount.getAndIncrement());
Path responsePath = getPathFor(reference, "response");
Path responsePath = getPathFor(progressive, reference, "response");
Path parent = responsePath.getParent();
Files.createDirectories(parent);

Files.writeString(getPathFor(reference, "response.txt"), response.toString(), StandardCharsets.UTF_8);
Files.writeString(getPathFor(progressive, reference, "response.txt"), response.toString(), StandardCharsets.UTF_8);

try (var context = NodeMarshallingContexts.of(Files.newOutputStream(responsePath))) {
response.into(context);
}

return this;
}
catch (IOException e) {
throw new StoreException(e);
}
}

@Override
protected DiskStore setError(TransactionReference reference, String error) {
protected void setError(TransactionReference reference, String error) {
errors.put(reference, error);
return this;
}

@Override
protected DiskStore setHistory(StorageReference object, Stream<TransactionReference> history) {
protected void setHistory(StorageReference object, Stream<TransactionReference> history) {
histories.put(object, history.toArray(TransactionReference[]::new));
return this;
}

@Override
protected DiskStore setManifest(StorageReference manifest) {
protected void setManifest(StorageReference manifest) {
this.manifest.set(manifest);
return this;
}

/**
Expand All @@ -265,12 +226,8 @@ protected DiskStore setManifest(StorageReference manifest) {
* @return the resulting path
* @throws FileNotFoundException if the reference is unknown
*/
private Path getPathFor(TransactionReference reference, String name) throws FileNotFoundException {
Integer progressive = this.progressive.get(reference);
if (progressive == null)
throw new FileNotFoundException("Unknown transaction reference " + reference);

return dir.resolve("b" + progressive / transactionsPerBlock).resolve(progressive % transactionsPerBlock + "-" + reference).resolve(name);
private Path getPathFor(int progressive, TransactionReference reference, String name) throws FileNotFoundException {
return dir.resolve("b" + blockNumber.get()).resolve(progressive + "-" + reference).resolve(name);
}

/**
Expand All @@ -286,4 +243,8 @@ private static void ensureDeleted(Path dir) throws IOException {
.map(Path::toFile)
.forEach(File::delete);
}

void increaseBlockNumber() {
blockNumber.getAndIncrement();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package io.hotmoka.node.disk.internal;

import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

import io.hotmoka.node.api.requests.TransactionRequest;
Expand All @@ -12,52 +15,110 @@

public class DiskStoreTransaction extends AbstractStoreTransaction<DiskStore> {

private final DiskStore store;

private final ConcurrentMap<TransactionReference, TransactionRequest<?>> requests = new ConcurrentHashMap<>();
private final ConcurrentMap<TransactionReference, TransactionResponse> responses = new ConcurrentHashMap<>();

/**
* The histories of the objects created in blockchain. In a real implementation, this must
* be stored in a persistent state.
*/
private final ConcurrentMap<StorageReference, TransactionReference[]> histories = new ConcurrentHashMap<>();

/**
* The errors generated by each transaction (if any). In a real implementation, this must
* be stored in a persistent memory such as a blockchain.
*/
private final ConcurrentMap<TransactionReference, String> errors = new ConcurrentHashMap<>();

/**
* The storage reference of the manifest stored inside the node, if any.
*/
private final AtomicReference<StorageReference> manifest = new AtomicReference<>();

public DiskStoreTransaction(DiskStore store, Object lock) {
super(store, lock);
super(lock);

this.store = store;
}

@Override
public Optional<TransactionResponse> getResponse(TransactionReference reference) {
return store.getResponseUncommitted(reference);
public Optional<TransactionResponse> getResponseUncommitted(TransactionReference reference) {
var uncommittedResponse = responses.get(reference);
if (uncommittedResponse != null)
return Optional.of(uncommittedResponse);
else
return store.getResponse(reference);
}

@Override
public Stream<TransactionReference> getHistory(StorageReference object) throws StoreException {
return store.getHistoryUncommitted(object);
public Stream<TransactionReference> getHistoryUncommitted(StorageReference object) throws StoreException {
var uncommittedHistory = histories.get(object);
if (uncommittedHistory != null)
return Stream.of(uncommittedHistory);
else
return store.getHistory(object);
}

@Override
public Optional<StorageReference> getManifest() throws StoreException {
return store.getManifestUncommitted();
public Optional<StorageReference> getManifestUncommitted() {
var uncommittedManifest = manifest.get();
if (uncommittedManifest != null)
return Optional.of(uncommittedManifest);
else
return store.getManifest();
}

@Override
public DiskStore commit() {
public DiskStore commit() throws StoreException {
// we report all the updates occurred during this transaction into the store
var manifest = this.manifest.get();
if (manifest != null)
store.setManifest(manifest);

for (var entry: errors.entrySet())
store.setError(entry.getKey(), entry.getValue());

for (var entry: histories.entrySet())
store.setHistory(entry.getKey(), Stream.of(entry.getValue()));

int progressive = 0;
for (var entry: requests.entrySet())
store.setRequest(progressive++, entry.getKey(), entry.getValue());

progressive = 0;
for (var entry: responses.entrySet())
store.setResponse(progressive++, entry.getKey(), entry.getValue());

if (progressive > 0)
store.increaseBlockNumber();

return store;
}

@Override
protected void setRequest(TransactionReference reference, TransactionRequest<?> request) throws StoreException {
store = store.setRequest(reference, request);
protected void setRequest(TransactionReference reference, TransactionRequest<?> request) {
requests.put(reference, request);
}

@Override
protected void setResponse(TransactionReference reference, TransactionResponse response) throws StoreException {
store = store.setResponse(reference, response);
protected void setResponse(TransactionReference reference, TransactionResponse response) {
responses.put(reference, response);
}

@Override
protected void setError(TransactionReference reference, String error) throws StoreException {
store = store.setError(reference, error);
protected void setError(TransactionReference reference, String error) {
errors.put(reference, error);
}

@Override
protected void setHistory(StorageReference object, Stream<TransactionReference> history) {
store = store.setHistory(object, history);
histories.put(object, history.toArray(TransactionReference[]::new));
}

@Override
protected void setManifest(StorageReference manifest) throws StoreException {
store = store.setManifest(manifest);
protected void setManifest(StorageReference manifest) {
this.manifest.set(manifest);
}
}
Loading

0 comments on commit d2d7552

Please sign in to comment.