Skip to content

Commit

Permalink
Use serializable isolation and handle retries effectively
Browse files Browse the repository at this point in the history
Allows punishing a victim concurrently with a much greater degree
  of reliability.
  • Loading branch information
A248 committed Jan 27, 2024
1 parent 063de6c commit b10c847
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@

import org.jooq.DSLContext;
import org.jooq.exception.DataAccessException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import space.arim.libertybans.core.database.jooq.JooqContext;
import space.arim.omnibus.util.ArraysUtil;
import space.arim.omnibus.util.ThisClass;
import space.arim.omnibus.util.concurrent.CentralisedFuture;
import space.arim.omnibus.util.concurrent.FactoryOfTheFuture;

Expand All @@ -40,6 +43,8 @@ public final class JooqQueryExecutor implements QueryExecutor {
private final FactoryOfTheFuture futuresFactory;
private final Executor threadPool;

private static final Logger logger = LoggerFactory.getLogger(ThisClass.get());

public JooqQueryExecutor(JooqContext jooqContext, DataSource dataSource,
FactoryOfTheFuture futuresFactory, Executor threadPool) {
this.jooqContext = Objects.requireNonNull(jooqContext, "jooqContext");
Expand All @@ -57,6 +62,11 @@ private static <E extends Throwable> E rollbackBeforeThrow(Connection connection
throw reason;
}

private static DataAccessException unableToCommit(Connection connection, SQLException cause) {
throw rollbackBeforeThrow(connection,
new DataAccessException("Unable to commit (" + cause.getSQLState() + ')', cause));
}

private <R> R obtainUnfailing(SQLFunction<R> command) {
try (Connection connection = dataSource.getConnection()) {
if (command.isReadOnly()) {
Expand All @@ -70,11 +80,15 @@ private <R> R obtainUnfailing(SQLFunction<R> command) {
} catch (RuntimeException ex) {
throw rollbackBeforeThrow(connection, ex);
}
connection.commit();
try {
connection.commit();
} catch (SQLException ex) {
throw unableToCommit(connection, ex);
}
return value;

} catch (SQLException ex) {
throw new DataAccessException("Miscellaneous failure", ex);
throw new DataAccessException("Miscellaneous failure (" + ex.getSQLState() + ')', ex);
}
}

Expand All @@ -90,7 +104,7 @@ private static boolean isSerializationFailure(SQLException ex) {
MariaDB and MySQL - ER_LOCK_DEADLOCK
PostgreSQL and CockroachDB - serialization_failure
*/
return ex.getErrorCode() == 40001;
return "40001".equals(ex.getSQLState());
}

private static void exponentialBackoff(int retry) {
Expand Down Expand Up @@ -140,7 +154,13 @@ private <R> R obtainWithRetry(int retryCount, SQLTransactionalFunction<R> comman
serializationFailures = ArraysUtil.expandAndInsert(serializationFailures, ex, 0);
continue;
}
throw rollbackBeforeThrow(connection, new DataAccessException("Unable to commit", ex));
throw unableToCommit(connection, ex);
}
}
if (retry != 0) {
logger.trace("Database operation succeeded after {} tries", retry);
if (retry > retryCount / 2) {
logger.info("Heavy contention detected on the database. Consider upping the retry count.");
}
}
return value;
Expand Down Expand Up @@ -172,7 +192,7 @@ public void executeWithExistingConnection(Connection connection,
try {
connection.commit();
} catch (SQLException ex) {
throw rollbackBeforeThrow(connection, new DataAccessException("Unable to commit", ex));
throw unableToCommit(connection, ex);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ boolean wasNotRolledBack() {
return !rolledBack;
}

@Override
public void setIsolation(int level) {
try {
connection.setTransactionIsolation(level);
} catch (SQLException ex) {
throw new DataAccessException("Failed to set isolation", ex);
}
}

@Override
public void rollback() {
try {
Expand Down Expand Up @@ -88,6 +97,11 @@ private NestedTransaction(Savepoint savepoint) {
this.savepoint = Objects.requireNonNull(savepoint, "savepoint");
}

@Override
public void setIsolation(int level) {
throw new UnsupportedOperationException("Cannot set isolation levels in nested transactions");
}

@Override
public void rollback() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@
*/
public interface Transaction {

/**
* Sets the isolation level on this transaction
*
* @param isolationLevel the isolation level
*/
void setIsolation(int isolationLevel);

/**
* Rolls back the enclosing scope
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ CentralisedFuture<Punishment> enactPunishment(DraftPunishment draftPunishment) {
creator);

return database.queryWithRetry((context, transaction) -> {
// Make sure concurrent executions do not conflict
transaction.setIsolation(Connection.TRANSACTION_SERIALIZABLE);

if (type != PunishmentType.KICK) {
database.clearExpiredPunishments(context, type, start);
}
Expand All @@ -124,6 +127,8 @@ CentralisedFuture<Punishment> calculatePunishment(CalculablePunishment calculabl

InternalDatabase database = dbProvider.get();
return database.queryWithRetry((context, transaction) -> {
// Make sure concurrent executions do not conflict
transaction.setIsolation(Connection.TRANSACTION_SERIALIZABLE);

var calculationResult = calculablePunishment.getCalculator().compute(
escalationTrack, victim,
Expand Down
1 change: 1 addition & 0 deletions bans-core/src/main/resources/contributors
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Folas1337
Franciscoyt94
hawkfalcon
Healthy
Kaludi
LaurenceBarnes
MattVid
mcmdev
Expand Down
3 changes: 3 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@
</goals>
</execution>
</executions>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down

0 comments on commit b10c847

Please sign in to comment.