Skip to content

Commit

Permalink
Add commit instant tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
Tobias Hafner committed Jan 1, 2025
1 parent 1c59b8c commit a846229
Showing 1 changed file with 27 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
import lombok.NonNull;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.linq4j.tree.Expressions;
import org.jetbrains.annotations.Nullable;
import org.polypheny.db.PolyImplementation;
import org.polypheny.db.adapter.Adapter;
Expand All @@ -47,6 +45,7 @@
import org.polypheny.db.catalog.Catalog;
import org.polypheny.db.catalog.entity.LogicalConstraint;
import org.polypheny.db.catalog.entity.LogicalUser;
import org.polypheny.db.catalog.entity.logical.LogicalEntity;
import org.polypheny.db.catalog.entity.logical.LogicalKey.EnforcementTime;
import org.polypheny.db.catalog.entity.logical.LogicalNamespace;
import org.polypheny.db.catalog.entity.logical.LogicalTable;
Expand Down Expand Up @@ -75,6 +74,8 @@ public class TransactionImpl implements Transaction, Comparable<Object> {

private static final AtomicLong TRANSACTION_COUNTER = new AtomicLong();

private long transactionTimestamp;

@Getter
private final long id;

Expand Down Expand Up @@ -146,6 +147,8 @@ public class TransactionImpl implements Transaction, Comparable<Object> {
this.analyze = analyze;
this.origin = origin;
this.flavor = flavor;
// ToDo TH: temporary dummy to get some values to work with
this.transactionTimestamp = System.nanoTime();
}


Expand Down Expand Up @@ -173,9 +176,10 @@ public void commit() throws TransactionException {
log.trace( "This transaction has already been finished!" );
return;
}
boolean okToCommit = true;

if (!readSet.isEmpty()) {
readSet.forEach( System.out::println );
if ( !readSet.isEmpty() ) {
okToCommit &= validateReadSet();
}

Pair<Boolean, String> isValid = catalog.checkIntegrity();
Expand All @@ -187,7 +191,6 @@ public void commit() throws TransactionException {
catalog.executeCommitActions();

// Prepare to commit changes on all involved adapters and the catalog
boolean okToCommit = true;
if ( RuntimeConfig.TWO_PC_MODE.getBoolean() ) {
for ( Adapter<?> adapter : involvedAdapters ) {
okToCommit &= adapter.prepare( xid );
Expand Down Expand Up @@ -239,6 +242,8 @@ public void commit() throws TransactionException {
// Free resources hold by statements
statements.forEach( Statement::close );

updateCommitInstantLog();

// Release locks
releaseAllLocks();
// Remove transaction
Expand All @@ -249,6 +254,23 @@ public void commit() throws TransactionException {

}

private boolean validateReadSet() {
for ( VersionedEntryIdentifier identifier : readSet ) {
LogicalEntity entity = Catalog.getInstance().getSnapshot().getLogicalEntity( identifier.getEntityId() ).orElseThrow();
if (entity.getEntryCommitInstantsLog().getLastCommit( identifier ) > transactionTimestamp) {
return false;
}
}
return true;
}

private void updateCommitInstantLog() {
for (VersionedEntryIdentifier identifier : readSet ) {
LogicalEntity entity = Catalog.getInstance().getSnapshot().getLogicalEntity( identifier.getEntityId() ).orElseThrow();
entity.getEntryCommitInstantsLog().setOrUpdateLastCommit( identifier, transactionTimestamp );
}
}


@Override
public void rollback( @Nullable String reason ) throws TransactionException {
Expand Down

0 comments on commit a846229

Please sign in to comment.