Skip to content

Commit

Permalink
PR switches remtran 2PC to run over cdb2api instead of legacy remtran…
Browse files Browse the repository at this point in the history
… plugin.

This extends the port of legacy remtran to cdb2api for  2PC transactional mode.

NOTE: this handles the replicant to replicant communication.  The master to master is already using cdb2api.

Signed-off-by: Dorin Hogea <[email protected]>
  • Loading branch information
dorinhogea committed Jan 21, 2025
1 parent 6796390 commit 4fa54b2
Show file tree
Hide file tree
Showing 9 changed files with 200 additions and 26 deletions.
146 changes: 132 additions & 14 deletions db/fdb_fend.c
Original file line number Diff line number Diff line change
Expand Up @@ -2163,6 +2163,22 @@ static char *fdb_generate_dist_txnid()
return r;
}

void fdb_init_disttxn(sqlclntstate *clnt)
{
assert(clnt->use_2pc);

/* Preserve timestamp for retries */
if (!clnt->dist_timestamp) {
assert(!clnt->dist_txnid);
bbhrtime_t ts;
clock_gettime(CLOCK_REALTIME, &ts);
clnt->dist_timestamp = bbhrtimens(&ts);
}
if (!clnt->dist_txnid) {
clnt->dist_txnid = fdb_generate_dist_txnid();
}
}

/**
* Used to either open a remote transaction or cursor (fdbc==NULL-> transaction
*begin)
Expand Down Expand Up @@ -2270,21 +2286,17 @@ static int _fdb_send_open_retries(sqlclntstate *clnt, fdb_t *fdb,
tran_flags = 0;

if (clnt->use_2pc) {
/* Preserve timestamp for retries */
if (!clnt->dist_timestamp) {
assert(!clnt->dist_txnid);
bbhrtime_t ts;
clock_gettime(CLOCK_REALTIME, &ts);
clnt->dist_timestamp = bbhrtimens(&ts);
}
if (!clnt->dist_txnid) {
clnt->dist_txnid = fdb_generate_dist_txnid();
}
fdb_init_disttxn(clnt);

char *coordinator_dbname = strdup(gbl_dbname);
char *coordinator_tier = gbl_machine_class ? strdup(gbl_machine_class) : strdup(gbl_myhostname);
char *coordinator_tier = gbl_machine_class ?
strdup(gbl_machine_class) : strdup(gbl_myhostname);
char *dist_txnid = strdup(clnt->dist_txnid);
rc = fdb_send_2pc_begin(clnt, msg, trans, clnt->dbtran.mode, tran_flags, dist_txnid,
coordinator_dbname, coordinator_tier, clnt->dist_timestamp, trans->fcon.sb);

rc = fdb_send_2pc_begin(clnt, msg, trans, clnt->dbtran.mode,
tran_flags, dist_txnid, coordinator_dbname,
coordinator_tier, clnt->dist_timestamp,
trans->fcon.sb);
} else {
rc = fdb_send_begin(clnt, msg, trans, clnt->dbtran.mode, tran_flags, trans->fcon.sb);
}
Expand Down Expand Up @@ -2494,7 +2506,7 @@ static fdb_cursor_if_t *_cursor_open_remote_cdb2api(sqlclntstate *clnt,
if (rootpage == 1) {
rc = cdb2_run_statement(fdbc->fcon.api.hndl, "SET REMSQL_SCHEMA 1");
if (rc) {
logmsg(LOGMSG_ERROR, "%s: %s:%s failed to set remsql_scheam rc %d\n",
logmsg(LOGMSG_ERROR, "%s: %s:%s failed to set remsql_schema rc %d\n",
__func__, fdb->dbname, class, rc);
goto error;
}
Expand All @@ -2509,6 +2521,51 @@ static fdb_cursor_if_t *_cursor_open_remote_cdb2api(sqlclntstate *clnt,
goto done;
}

/**
* SET the options for a distributed 2pc transaction
*
*/
int fdb_2pc_set(sqlclntstate *clnt, fdb_t *fdb, cdb2_hndl_tp *hndl)
{
char str[256];
char *class = gbl_machine_class ? gbl_machine_class : gbl_myhostname;
int rc;

snprintf(str, sizeof(str), "SET REMTRAN_NAME %s", gbl_dbname);
rc = cdb2_run_statement(hndl, str);
if (rc) {
logmsg(LOGMSG_ERROR, "%s: %s:%s failed to set remtran_name rc %d\n",
__func__, fdb->dbname, class, rc);
return -1;
}

snprintf(str, sizeof(str), "SET REMTRAN_TIER %s", class);
rc = cdb2_run_statement(hndl, str);
if (rc) {
logmsg(LOGMSG_ERROR, "%s: %s:%s failed to set remtran_tier rc %d\n",
__func__, fdb->dbname, class, rc);
return -1;
}

snprintf(str, sizeof(str), "SET REMTRAN_TXNID %s", clnt->dist_txnid);
rc = cdb2_run_statement(hndl, str);
if (rc) {
logmsg(LOGMSG_ERROR, "%s: %s:%s failed to set remtran_txnid rc %d\n",
__func__, fdb->dbname, class, rc);
return -1;
}

snprintf(str, sizeof(str), "SET REMTRAN_TSTAMP %lu", clnt->dist_timestamp);
rc = cdb2_run_statement(hndl, str);
if (rc) {
logmsg(LOGMSG_ERROR, "%s: %s:%s failed to set remtran_tstamp rc %d\n",
__func__, fdb->dbname, class, rc);
return -1;
}

return 0;
}

static fdb_cursor_if_t *_cursor_open_remote(sqlclntstate *clnt,
fdb_t *fdb, int server_version,
fdb_tran_t *trans, int flags,
Expand Down Expand Up @@ -5605,6 +5662,7 @@ static int _fdb_client_set_options(sqlclntstate *clnt,
const char *err_precdb2api = "Invalid set command 'REMSQL";
const char *err_cdb2apiold = "need protocol ";
const char *err_tableschemaold = "need table schema ";
const char *err_pre2pc = "Invalid set command 'REMTRAN";

static int _fdb_run_sql(BtCursor *pCur, char *sql)
{
Expand Down Expand Up @@ -5962,6 +6020,66 @@ int process_fdb_set_cdb2api(sqlclntstate *clnt, char *sqlstr, char *err,
return 0;
}

int process_fdb_set_cdb2api_2pc(sqlclntstate *clnt, char *sqlstr, char *err,
int errlen)
{
if (sqlstr)
sqlstr = skipws(sqlstr);

if (!sqlstr) {
snprintf(err, errlen, "missing remsql setting");
return -1;
}

if (gbl_fdb_emulate_old) {
/* we want to emulate a pre-cdb2api failure to parse remsql SET
* options; just return error here, do not set err
*/
return -1;
}

if (strncasecmp(sqlstr, "name ", 5) == 0) {
sqlstr += 5;
if (!sqlstr[0]) {
snprintf(err, errlen, "missing coordinator dbname");
return -1;
}
clnt->coordinator_dbname = strdup(sqlstr);
} else if (strncasecmp(sqlstr, "tier ", 5) == 0) {
sqlstr += 5;
if (!sqlstr[0]) {
snprintf(err, errlen, "missing coordinator tier");
return -1;
}
clnt->coordinator_tier= strdup(sqlstr);
} else if (strncasecmp(sqlstr, "txnid ", 6) == 0) {
sqlstr += 6;
if (!sqlstr[0]) {
snprintf(err, errlen, "missing dist txn id");
return -1;
}
clnt->dist_txnid = strdup(sqlstr);
} else if (strncasecmp(sqlstr, "tstamp ", 7) == 0) {
sqlstr += 7;
if (!sqlstr[0]) {
snprintf(err, errlen, "missing dist timestamp");
return -1;
}
clnt->dist_timestamp = atoll(sqlstr);
} else {
snprintf(err, errlen, "failed to parse 2pc option %s", sqlstr);
return -1;
}

if (clnt->coordinator_dbname && clnt->coordinator_tier && clnt->dist_txnid &&
clnt->dist_timestamp) {
clnt->use_2pc = 1;
clnt->is_participant = 1;
}

return 0;
}

int fdb_default_ver_set(int val)
{
if (val != gbl_fdb_default_ver) {
Expand Down
19 changes: 17 additions & 2 deletions db/fdb_fend.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

#include "comdb2.h"
#include "sql.h"
//#include "sqliteInt.h"
#include "vdbeInt.h"
#include "comdb2uuid.h"
#include "net_int.h"
Expand Down Expand Up @@ -73,8 +72,9 @@
#define FDB_VER_AUTH 6
#define FDB_VER_CDB2API 7
#define FDB_VER_WR_CDB2API 8
#define FDB_VER_2PC_CDB2API 9

#define FDB_VER FDB_VER_WR_CDB2API
#define FDB_VER FDB_VER_2PC_CDB2API

extern int gbl_fdb_default_ver;

Expand Down Expand Up @@ -449,6 +449,9 @@ extern int gbl_fdb_io_error_retries;
int process_fdb_set_cdb2api(sqlclntstate *clnt, char *sqlstr,
char *err, int errlen);

int process_fdb_set_cdb2api_2pc(sqlclntstate *clnt, char *sqlstr,
char *err, int errlen);

/**
* Check that fdb class matches a specific class
*
Expand All @@ -470,5 +473,17 @@ cdb2_hndl_tp *fdb_push_connect(sqlclntstate *clnt, int *client_redir,
*/
void fdb_free_tran(sqlclntstate *clnt, fdb_tran_t *tran);

/**
* Initialize a 2pc coordinator
*
*/
void fdb_init_disttxn(sqlclntstate *clnt);

/**
* SET the options for a distributed 2pc transaction
*
*/
int fdb_2pc_set(sqlclntstate *clnt, fdb_t *fdb, cdb2_hndl_tp *hndl);

#endif

40 changes: 37 additions & 3 deletions db/fdb_push.c
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ int handle_fdb_push_write(sqlclntstate *clnt, struct errstat *err)
fdb_t *fdb;
int created;
int rc;
int set_intrans = 0;

if (!push)
return -2;
Expand Down Expand Up @@ -537,20 +538,33 @@ int handle_fdb_push_write(sqlclntstate *clnt, struct errstat *err)
if (created) {
/* get a connection */
tran->is_cdb2api = 1;
tran->fcon.hndl = _hndl_open(clnt, NULL, 0 /* no sqlite rows for writes */, err);
tran->fcon.hndl = hndl = _hndl_open(clnt, NULL, 0 /* no sqlite rows for writes */, err);
if (!tran->fcon.hndl) {
rc = -2;
goto free;
}
if (clnt->in_client_trans) {
/* if not standalone, and this is the first reachout to this fdb, send begin */
clnt->intrans = 1;
if (!clnt->intrans) {
clnt->intrans = 1;
set_intrans = 1;
}

/* if this is 2pc, we need to send additional info to the participant */
if (clnt->use_2pc) {
fdb_init_disttxn(clnt);

rc = fdb_2pc_set(clnt, fdb, tran->fcon.hndl);
if (rc) {
goto hndl_err;
}
}

rc = cdb2_run_statement(tran->fcon.hndl, "begin");
while (rc == CDB2_OK) {
rc = cdb2_next_record(tran->fcon.hndl);
}
if (rc != CDB2_OK_DONE) {
hndl = tran->fcon.hndl;
goto hndl_err;
}
}
Expand Down Expand Up @@ -628,6 +642,17 @@ int handle_fdb_push_write(sqlclntstate *clnt, struct errstat *err)

hndl_err:
errstr = cdb2_errstr(hndl);
extern const char *err_pre2pc;
if (errstr && !strncasecmp(errstr, err_pre2pc, strlen(err_pre2pc))) {
if (!created) {
/* instead of an assert */
logmsg(LOGMSG_ERROR, "%s remote db %s lost locks\n", __func__,
push->remotedb);
abort();
}
rc = -2; /* lets try a non-2pc version */
goto free;
}
errstat_set_rcstrf(err, rc, "%s", errstr);
rc = write_response(clnt, RESPONSE_ERROR, (void*)errstr, rc);
if (rc) {
Expand All @@ -645,6 +670,15 @@ int handle_fdb_push_write(sqlclntstate *clnt, struct errstat *err)
clnt->fdb_push = NULL;
free(push->remotedb);
free(push);
if (set_intrans) {
/* if this tried to short call to local sqlite3BtreeBeginTrans
* first time we try to push, and we set clnt->intrans,
* and we are gonna retry the legacy mode, need to reset
* that back so we do not skip calling sqlite3BtreeBeginTrans
*/
clnt->intrans = 0;
}

}
return rc;
}
Expand Down
4 changes: 4 additions & 0 deletions db/osqlsqlthr.c
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,10 @@ static int osql_wait(struct sqlclntstate *clnt)
osqlstate_t *osql = &clnt->osql;
errstat_t dummy = {0};

/* if this is a 2pc participant, we don't need to wait here */
if (clnt->is_participant)
return 0;

/* If an error is set (e.g., selectv error from range check), latch it. */
errstat_t *err = (osql->xerr.errval == 0) ? &osql->xerr : &dummy;

Expand Down
4 changes: 0 additions & 4 deletions db/sqlglue.c
Original file line number Diff line number Diff line change
Expand Up @@ -4992,10 +4992,6 @@ int sqlite3BtreeBeginTrans(Vdbe *vdbe, Btree *pBt, int wrflag, int *pSchemaVersi

rc = start_new_transaction(clnt);

/* 2pc on tunable, only here for now */
extern int gbl_2pc;
clnt->use_2pc = gbl_2pc;

done:
if (rc == SQLITE_OK && pSchemaVersion) {
sqlite3BtreeGetMeta(pBt, BTREE_SCHEMA_VERSION, (u32 *)pSchemaVersion);
Expand Down
3 changes: 2 additions & 1 deletion db/sqlinterfaces.c
Original file line number Diff line number Diff line change
Expand Up @@ -5466,7 +5466,8 @@ void reset_clnt(struct sqlclntstate *clnt, int initial)
clnt->num_retry = 0;
clnt->early_retry = 0;

clnt->use_2pc = 0;
extern int gbl_2pc;
clnt->use_2pc = gbl_2pc;
clnt->is_coordinator = 0;
clnt->is_participant = 0;
clear_participants(clnt);
Expand Down
6 changes: 6 additions & 0 deletions plugins/newsql/newsql.c
Original file line number Diff line number Diff line change
Expand Up @@ -2092,6 +2092,12 @@ int process_set_commands(struct sqlclntstate *clnt, CDB2SQLQUERY *sql_query)
if (process_fdb_set_cdb2api(clnt, sqlstr, err, sizeof(err))) {
rc = ii + 1;
}
} else if (strncasecmp(sqlstr, "remtran_", 8) == 0) {
sqlstr += 8;

if (process_fdb_set_cdb2api_2pc(clnt, sqlstr, err, sizeof(err))) {
rc = ii + 1;
}
} else if (strncasecmp(sqlstr, "typessql", 8) == 0) {
sqlstr += 8;
sqlstr = skipws(sqlstr);
Expand Down
2 changes: 1 addition & 1 deletion tests/fdb_compat.test/output.log
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Current fdb version 8
Current fdb version 9
(rows inserted=3)
(rows inserted=3)
(rows deleted=3)
Expand Down
2 changes: 1 addition & 1 deletion tests/tunables.test/t00_all_tunables.expected
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@
(name='externalauth_connect', description='Check for externalauth only once on connect', type='BOOLEAN', value='OFF', read_only='N')
(name='externalauth_warn', description='Warn instead of returning error in case of missing authdata', type='BOOLEAN', value='OFF', read_only='N')
(name='fake_sc_replication_timeout', description='Fake a replication timeout on finalize schemachange. ', type='BOOLEAN', value='OFF', read_only='N')
(name='fdb_default_version', description='Override the default fdb version', type='INTEGER', value='8', read_only='N')
(name='fdb_default_version', description='Override the default fdb version', type='INTEGER', value='9', read_only='N')
(name='fdb_io_error_retries', description='Number of retries for io error remsql', type='INTEGER', value='16', read_only='N')
(name='fdb_io_error_retries_phase_1', description='Number of immediate retries; capped by fdb_io_error_retries', type='INTEGER', value='6', read_only='N')
(name='fdb_io_error_retries_phase_2_poll', description='Poll initial value for slow retries in phase 2; doubled for each retry', type='INTEGER', value='100', read_only='N')
Expand Down

0 comments on commit 4fa54b2

Please sign in to comment.