Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PR switches remtran 2PC to run over cdb2api instead of legacy remtran plugin #4917

Merged
merged 1 commit into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 132 additions & 14 deletions db/fdb_fend.c
Original file line number Diff line number Diff line change
Expand Up @@ -2163,6 +2163,22 @@ static char *fdb_generate_dist_txnid()
return r;
}

void fdb_init_disttxn(sqlclntstate *clnt)
{
assert(clnt->use_2pc);

/* Preserve timestamp for retries */
if (!clnt->dist_timestamp) {
assert(!clnt->dist_txnid);
bbhrtime_t ts;
clock_gettime(CLOCK_REALTIME, &ts);
clnt->dist_timestamp = bbhrtimens(&ts);
}
if (!clnt->dist_txnid) {
clnt->dist_txnid = fdb_generate_dist_txnid();
}
}

/**
* Used to either open a remote transaction or cursor (fdbc==NULL-> transaction
*begin)
Expand Down Expand Up @@ -2270,21 +2286,17 @@ static int _fdb_send_open_retries(sqlclntstate *clnt, fdb_t *fdb,
tran_flags = 0;

if (clnt->use_2pc) {
/* Preserve timestamp for retries */
if (!clnt->dist_timestamp) {
assert(!clnt->dist_txnid);
bbhrtime_t ts;
clock_gettime(CLOCK_REALTIME, &ts);
clnt->dist_timestamp = bbhrtimens(&ts);
}
if (!clnt->dist_txnid) {
clnt->dist_txnid = fdb_generate_dist_txnid();
}
fdb_init_disttxn(clnt);

char *coordinator_dbname = strdup(gbl_dbname);
char *coordinator_tier = gbl_machine_class ? strdup(gbl_machine_class) : strdup(gbl_myhostname);
char *coordinator_tier = gbl_machine_class ?
strdup(gbl_machine_class) : strdup(gbl_myhostname);
char *dist_txnid = strdup(clnt->dist_txnid);
rc = fdb_send_2pc_begin(clnt, msg, trans, clnt->dbtran.mode, tran_flags, dist_txnid,
coordinator_dbname, coordinator_tier, clnt->dist_timestamp, trans->fcon.sb);

rc = fdb_send_2pc_begin(clnt, msg, trans, clnt->dbtran.mode,
tran_flags, dist_txnid, coordinator_dbname,
coordinator_tier, clnt->dist_timestamp,
trans->fcon.sb);
} else {
rc = fdb_send_begin(clnt, msg, trans, clnt->dbtran.mode, tran_flags, trans->fcon.sb);
}
Expand Down Expand Up @@ -2494,7 +2506,7 @@ static fdb_cursor_if_t *_cursor_open_remote_cdb2api(sqlclntstate *clnt,
if (rootpage == 1) {
rc = cdb2_run_statement(fdbc->fcon.api.hndl, "SET REMSQL_SCHEMA 1");
if (rc) {
logmsg(LOGMSG_ERROR, "%s: %s:%s failed to set remsql_scheam rc %d\n",
logmsg(LOGMSG_ERROR, "%s: %s:%s failed to set remsql_schema rc %d\n",
__func__, fdb->dbname, class, rc);
goto error;
}
Expand All @@ -2509,6 +2521,51 @@ static fdb_cursor_if_t *_cursor_open_remote_cdb2api(sqlclntstate *clnt,
goto done;
}

/**
* SET the options for a distributed 2pc transaction
*
*/
int fdb_2pc_set(sqlclntstate *clnt, fdb_t *fdb, cdb2_hndl_tp *hndl)
{
char str[256];
char *class = gbl_machine_class ? gbl_machine_class : gbl_myhostname;
int rc;

snprintf(str, sizeof(str), "SET REMTRAN_NAME %s", gbl_dbname);
rc = cdb2_run_statement(hndl, str);
if (rc) {
logmsg(LOGMSG_ERROR, "%s: %s:%s failed to set remtran_name rc %d\n",
__func__, fdb->dbname, class, rc);
return -1;
}

snprintf(str, sizeof(str), "SET REMTRAN_TIER %s", class);
rc = cdb2_run_statement(hndl, str);
if (rc) {
logmsg(LOGMSG_ERROR, "%s: %s:%s failed to set remtran_tier rc %d\n",
__func__, fdb->dbname, class, rc);
return -1;
}

snprintf(str, sizeof(str), "SET REMTRAN_TXNID %s", clnt->dist_txnid);
rc = cdb2_run_statement(hndl, str);
if (rc) {
logmsg(LOGMSG_ERROR, "%s: %s:%s failed to set remtran_txnid rc %d\n",
__func__, fdb->dbname, class, rc);
return -1;
}

snprintf(str, sizeof(str), "SET REMTRAN_TSTAMP %lu", clnt->dist_timestamp);
rc = cdb2_run_statement(hndl, str);
if (rc) {
logmsg(LOGMSG_ERROR, "%s: %s:%s failed to set remtran_tstamp rc %d\n",
__func__, fdb->dbname, class, rc);
return -1;
}

return 0;
}

static fdb_cursor_if_t *_cursor_open_remote(sqlclntstate *clnt,
fdb_t *fdb, int server_version,
fdb_tran_t *trans, int flags,
Expand Down Expand Up @@ -5605,6 +5662,7 @@ static int _fdb_client_set_options(sqlclntstate *clnt,
const char *err_precdb2api = "Invalid set command 'REMSQL";
const char *err_cdb2apiold = "need protocol ";
const char *err_tableschemaold = "need table schema ";
const char *err_pre2pc = "Invalid set command 'REMTRAN";

static int _fdb_run_sql(BtCursor *pCur, char *sql)
{
Expand Down Expand Up @@ -5962,6 +6020,66 @@ int process_fdb_set_cdb2api(sqlclntstate *clnt, char *sqlstr, char *err,
return 0;
}

int process_fdb_set_cdb2api_2pc(sqlclntstate *clnt, char *sqlstr, char *err,
int errlen)
{
if (sqlstr)
sqlstr = skipws(sqlstr);

if (!sqlstr) {
snprintf(err, errlen, "missing remsql setting");
return -1;
}

if (gbl_fdb_emulate_old) {
/* we want to emulate a pre-cdb2api failure to parse remsql SET
* options; just return error here, do not set err
*/
return -1;
}

if (strncasecmp(sqlstr, "name ", 5) == 0) {
sqlstr += 5;
if (!sqlstr[0]) {
snprintf(err, errlen, "missing coordinator dbname");
return -1;
}
clnt->coordinator_dbname = strdup(sqlstr);
} else if (strncasecmp(sqlstr, "tier ", 5) == 0) {
sqlstr += 5;
if (!sqlstr[0]) {
snprintf(err, errlen, "missing coordinator tier");
return -1;
}
clnt->coordinator_tier= strdup(sqlstr);
} else if (strncasecmp(sqlstr, "txnid ", 6) == 0) {
sqlstr += 6;
if (!sqlstr[0]) {
snprintf(err, errlen, "missing dist txn id");
return -1;
}
clnt->dist_txnid = strdup(sqlstr);
} else if (strncasecmp(sqlstr, "tstamp ", 7) == 0) {
sqlstr += 7;
if (!sqlstr[0]) {
snprintf(err, errlen, "missing dist timestamp");
return -1;
}
clnt->dist_timestamp = atoll(sqlstr);
} else {
snprintf(err, errlen, "failed to parse 2pc option %s", sqlstr);
return -1;
}

if (clnt->coordinator_dbname && clnt->coordinator_tier && clnt->dist_txnid &&
clnt->dist_timestamp) {
clnt->use_2pc = 1;
clnt->is_participant = 1;
}

return 0;
}

int fdb_default_ver_set(int val)
{
if (val != gbl_fdb_default_ver) {
Expand Down
19 changes: 17 additions & 2 deletions db/fdb_fend.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

#include "comdb2.h"
#include "sql.h"
//#include "sqliteInt.h"
#include "vdbeInt.h"
#include "comdb2uuid.h"
#include "net_int.h"
Expand Down Expand Up @@ -73,8 +72,9 @@
#define FDB_VER_AUTH 6
#define FDB_VER_CDB2API 7
#define FDB_VER_WR_CDB2API 8
#define FDB_VER_2PC_CDB2API 9

#define FDB_VER FDB_VER_WR_CDB2API
#define FDB_VER FDB_VER_2PC_CDB2API

extern int gbl_fdb_default_ver;

Expand Down Expand Up @@ -449,6 +449,9 @@ extern int gbl_fdb_io_error_retries;
int process_fdb_set_cdb2api(sqlclntstate *clnt, char *sqlstr,
char *err, int errlen);

int process_fdb_set_cdb2api_2pc(sqlclntstate *clnt, char *sqlstr,
char *err, int errlen);

/**
* Check that fdb class matches a specific class
*
Expand All @@ -470,5 +473,17 @@ cdb2_hndl_tp *fdb_push_connect(sqlclntstate *clnt, int *client_redir,
*/
void fdb_free_tran(sqlclntstate *clnt, fdb_tran_t *tran);

/**
* Initialize a 2pc coordinator
*
*/
void fdb_init_disttxn(sqlclntstate *clnt);

/**
* SET the options for a distributed 2pc transaction
*
*/
int fdb_2pc_set(sqlclntstate *clnt, fdb_t *fdb, cdb2_hndl_tp *hndl);

#endif

40 changes: 37 additions & 3 deletions db/fdb_push.c
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ int handle_fdb_push_write(sqlclntstate *clnt, struct errstat *err)
fdb_t *fdb;
int created;
int rc;
int set_intrans = 0;

if (!push)
return -2;
Expand Down Expand Up @@ -537,20 +538,33 @@ int handle_fdb_push_write(sqlclntstate *clnt, struct errstat *err)
if (created) {
/* get a connection */
tran->is_cdb2api = 1;
tran->fcon.hndl = _hndl_open(clnt, NULL, 0 /* no sqlite rows for writes */, err);
tran->fcon.hndl = hndl = _hndl_open(clnt, NULL, 0 /* no sqlite rows for writes */, err);
if (!tran->fcon.hndl) {
rc = -2;
goto free;
}
if (clnt->in_client_trans) {
/* if not standalone, and this is the first reachout to this fdb, send begin */
clnt->intrans = 1;
if (!clnt->intrans) {
clnt->intrans = 1;
set_intrans = 1;
}

/* if this is 2pc, we need to send additional info to the participant */
if (clnt->use_2pc) {
fdb_init_disttxn(clnt);

rc = fdb_2pc_set(clnt, fdb, tran->fcon.hndl);
if (rc) {
goto hndl_err;
}
}

rc = cdb2_run_statement(tran->fcon.hndl, "begin");
while (rc == CDB2_OK) {
rc = cdb2_next_record(tran->fcon.hndl);
}
if (rc != CDB2_OK_DONE) {
hndl = tran->fcon.hndl;
goto hndl_err;
}
}
Expand Down Expand Up @@ -628,6 +642,17 @@ int handle_fdb_push_write(sqlclntstate *clnt, struct errstat *err)

hndl_err:
errstr = cdb2_errstr(hndl);
extern const char *err_pre2pc;
if (errstr && !strncasecmp(errstr, err_pre2pc, strlen(err_pre2pc))) {
if (!created) {
/* instead of an assert */
logmsg(LOGMSG_ERROR, "%s remote db %s lost locks\n", __func__,
push->remotedb);
abort();
}
rc = -2; /* lets try a non-2pc version */
goto free;
}
errstat_set_rcstrf(err, rc, "%s", errstr);
rc = write_response(clnt, RESPONSE_ERROR, (void*)errstr, rc);
if (rc) {
Expand All @@ -645,6 +670,15 @@ int handle_fdb_push_write(sqlclntstate *clnt, struct errstat *err)
clnt->fdb_push = NULL;
free(push->remotedb);
free(push);
if (set_intrans) {
/* if this tried to short call to local sqlite3BtreeBeginTrans
* first time we try to push, and we set clnt->intrans,
* and we are gonna retry the legacy mode, need to reset
* that back so we do not skip calling sqlite3BtreeBeginTrans
*/
clnt->intrans = 0;
}

}
return rc;
}
Expand Down
4 changes: 4 additions & 0 deletions db/osqlsqlthr.c
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,10 @@ static int osql_wait(struct sqlclntstate *clnt)
osqlstate_t *osql = &clnt->osql;
errstat_t dummy = {0};

/* if this is a 2pc participant, we don't need to wait here */
if (clnt->is_participant)
return 0;

/* If an error is set (e.g., selectv error from range check), latch it. */
errstat_t *err = (osql->xerr.errval == 0) ? &osql->xerr : &dummy;

Expand Down
4 changes: 0 additions & 4 deletions db/sqlglue.c
Original file line number Diff line number Diff line change
Expand Up @@ -4992,10 +4992,6 @@ int sqlite3BtreeBeginTrans(Vdbe *vdbe, Btree *pBt, int wrflag, int *pSchemaVersi

rc = start_new_transaction(clnt);

/* 2pc on tunable, only here for now */
extern int gbl_2pc;
clnt->use_2pc = gbl_2pc;

done:
if (rc == SQLITE_OK && pSchemaVersion) {
sqlite3BtreeGetMeta(pBt, BTREE_SCHEMA_VERSION, (u32 *)pSchemaVersion);
Expand Down
3 changes: 2 additions & 1 deletion db/sqlinterfaces.c
Original file line number Diff line number Diff line change
Expand Up @@ -5466,7 +5466,8 @@ void reset_clnt(struct sqlclntstate *clnt, int initial)
clnt->num_retry = 0;
clnt->early_retry = 0;

clnt->use_2pc = 0;
extern int gbl_2pc;
clnt->use_2pc = gbl_2pc;
clnt->is_coordinator = 0;
clnt->is_participant = 0;
clear_participants(clnt);
Expand Down
6 changes: 6 additions & 0 deletions plugins/newsql/newsql.c
Original file line number Diff line number Diff line change
Expand Up @@ -2092,6 +2092,12 @@ int process_set_commands(struct sqlclntstate *clnt, CDB2SQLQUERY *sql_query)
if (process_fdb_set_cdb2api(clnt, sqlstr, err, sizeof(err))) {
rc = ii + 1;
}
} else if (strncasecmp(sqlstr, "remtran_", 8) == 0) {
sqlstr += 8;

if (process_fdb_set_cdb2api_2pc(clnt, sqlstr, err, sizeof(err))) {
rc = ii + 1;
}
} else if (strncasecmp(sqlstr, "typessql", 8) == 0) {
sqlstr += 8;
sqlstr = skipws(sqlstr);
Expand Down
2 changes: 1 addition & 1 deletion tests/fdb_compat.test/output.log
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Current fdb version 8
Current fdb version 9
(rows inserted=3)
(rows inserted=3)
(rows deleted=3)
Expand Down
2 changes: 1 addition & 1 deletion tests/tunables.test/t00_all_tunables.expected
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@
(name='externalauth_connect', description='Check for externalauth only once on connect', type='BOOLEAN', value='OFF', read_only='N')
(name='externalauth_warn', description='Warn instead of returning error in case of missing authdata', type='BOOLEAN', value='OFF', read_only='N')
(name='fake_sc_replication_timeout', description='Fake a replication timeout on finalize schemachange. ', type='BOOLEAN', value='OFF', read_only='N')
(name='fdb_default_version', description='Override the default fdb version', type='INTEGER', value='8', read_only='N')
(name='fdb_default_version', description='Override the default fdb version', type='INTEGER', value='9', read_only='N')
(name='fdb_io_error_retries', description='Number of retries for io error remsql', type='INTEGER', value='16', read_only='N')
(name='fdb_io_error_retries_phase_1', description='Number of immediate retries; capped by fdb_io_error_retries', type='INTEGER', value='6', read_only='N')
(name='fdb_io_error_retries_phase_2_poll', description='Poll initial value for slow retries in phase 2; doubled for each retry', type='INTEGER', value='100', read_only='N')
Expand Down