From 4fa54b2e4cf4f21d92bc8403e1b10f215d024e6d Mon Sep 17 00:00:00 2001 From: Dorin Hogea Date: Tue, 24 Sep 2024 15:50:06 -0400 Subject: [PATCH] PR switches remtran 2PC to run over cdb2api instead of legacy remtran plugin. This extends the port of legacy remtran to cdb2api for 2PC transactional mode. NOTE: this handles the replicant to replicant communication. The master to master is already using cdb2api. Signed-off-by: Dorin Hogea --- db/fdb_fend.c | 146 ++++++++++++++++-- db/fdb_fend.h | 19 ++- db/fdb_push.c | 40 ++++- db/osqlsqlthr.c | 4 + db/sqlglue.c | 4 - db/sqlinterfaces.c | 3 +- plugins/newsql/newsql.c | 6 + tests/fdb_compat.test/output.log | 2 +- tests/tunables.test/t00_all_tunables.expected | 2 +- 9 files changed, 200 insertions(+), 26 deletions(-) diff --git a/db/fdb_fend.c b/db/fdb_fend.c index 9052958a7c..6a4a3da70e 100644 --- a/db/fdb_fend.c +++ b/db/fdb_fend.c @@ -2163,6 +2163,22 @@ static char *fdb_generate_dist_txnid() return r; } +void fdb_init_disttxn(sqlclntstate *clnt) +{ + assert(clnt->use_2pc); + + /* Preserve timestamp for retries */ + if (!clnt->dist_timestamp) { + assert(!clnt->dist_txnid); + bbhrtime_t ts; + clock_gettime(CLOCK_REALTIME, &ts); + clnt->dist_timestamp = bbhrtimens(&ts); + } + if (!clnt->dist_txnid) { + clnt->dist_txnid = fdb_generate_dist_txnid(); + } +} + /** * Used to either open a remote transaction or cursor (fdbc==NULL-> transaction *begin) @@ -2270,21 +2286,17 @@ static int _fdb_send_open_retries(sqlclntstate *clnt, fdb_t *fdb, tran_flags = 0; if (clnt->use_2pc) { - /* Preserve timestamp for retries */ - if (!clnt->dist_timestamp) { - assert(!clnt->dist_txnid); - bbhrtime_t ts; - clock_gettime(CLOCK_REALTIME, &ts); - clnt->dist_timestamp = bbhrtimens(&ts); - } - if (!clnt->dist_txnid) { - clnt->dist_txnid = fdb_generate_dist_txnid(); - } + fdb_init_disttxn(clnt); + char *coordinator_dbname = strdup(gbl_dbname); - char *coordinator_tier = gbl_machine_class ? strdup(gbl_machine_class) : strdup(gbl_myhostname); + char *coordinator_tier = gbl_machine_class ? + strdup(gbl_machine_class) : strdup(gbl_myhostname); char *dist_txnid = strdup(clnt->dist_txnid); - rc = fdb_send_2pc_begin(clnt, msg, trans, clnt->dbtran.mode, tran_flags, dist_txnid, - coordinator_dbname, coordinator_tier, clnt->dist_timestamp, trans->fcon.sb); + + rc = fdb_send_2pc_begin(clnt, msg, trans, clnt->dbtran.mode, + tran_flags, dist_txnid, coordinator_dbname, + coordinator_tier, clnt->dist_timestamp, + trans->fcon.sb); } else { rc = fdb_send_begin(clnt, msg, trans, clnt->dbtran.mode, tran_flags, trans->fcon.sb); } @@ -2494,7 +2506,7 @@ static fdb_cursor_if_t *_cursor_open_remote_cdb2api(sqlclntstate *clnt, if (rootpage == 1) { rc = cdb2_run_statement(fdbc->fcon.api.hndl, "SET REMSQL_SCHEMA 1"); if (rc) { - logmsg(LOGMSG_ERROR, "%s: %s:%s failed to set remsql_scheam rc %d\n", + logmsg(LOGMSG_ERROR, "%s: %s:%s failed to set remsql_schema rc %d\n", __func__, fdb->dbname, class, rc); goto error; } @@ -2509,6 +2521,51 @@ static fdb_cursor_if_t *_cursor_open_remote_cdb2api(sqlclntstate *clnt, goto done; } +/** + * SET the options for a distributed 2pc transaction + * + */ +int fdb_2pc_set(sqlclntstate *clnt, fdb_t *fdb, cdb2_hndl_tp *hndl) +{ + char str[256]; + char *class = gbl_machine_class ? gbl_machine_class : gbl_myhostname; + int rc; + + snprintf(str, sizeof(str), "SET REMTRAN_NAME %s", gbl_dbname); + rc = cdb2_run_statement(hndl, str); + if (rc) { + logmsg(LOGMSG_ERROR, "%s: %s:%s failed to set remtran_name rc %d\n", + __func__, fdb->dbname, class, rc); + return -1; + } + + snprintf(str, sizeof(str), "SET REMTRAN_TIER %s", class); + rc = cdb2_run_statement(hndl, str); + if (rc) { + logmsg(LOGMSG_ERROR, "%s: %s:%s failed to set remtran_tier rc %d\n", + __func__, fdb->dbname, class, rc); + return -1; + } + + snprintf(str, sizeof(str), "SET REMTRAN_TXNID %s", clnt->dist_txnid); + rc = cdb2_run_statement(hndl, str); + if (rc) { + logmsg(LOGMSG_ERROR, "%s: %s:%s failed to set remtran_txnid rc %d\n", + __func__, fdb->dbname, class, rc); + return -1; + } + + snprintf(str, sizeof(str), "SET REMTRAN_TSTAMP %lu", clnt->dist_timestamp); + rc = cdb2_run_statement(hndl, str); + if (rc) { + logmsg(LOGMSG_ERROR, "%s: %s:%s failed to set remtran_tstamp rc %d\n", + __func__, fdb->dbname, class, rc); + return -1; + } + + return 0; +} + static fdb_cursor_if_t *_cursor_open_remote(sqlclntstate *clnt, fdb_t *fdb, int server_version, fdb_tran_t *trans, int flags, @@ -5605,6 +5662,7 @@ static int _fdb_client_set_options(sqlclntstate *clnt, const char *err_precdb2api = "Invalid set command 'REMSQL"; const char *err_cdb2apiold = "need protocol "; const char *err_tableschemaold = "need table schema "; +const char *err_pre2pc = "Invalid set command 'REMTRAN"; static int _fdb_run_sql(BtCursor *pCur, char *sql) { @@ -5962,6 +6020,66 @@ int process_fdb_set_cdb2api(sqlclntstate *clnt, char *sqlstr, char *err, return 0; } +int process_fdb_set_cdb2api_2pc(sqlclntstate *clnt, char *sqlstr, char *err, + int errlen) +{ + if (sqlstr) + sqlstr = skipws(sqlstr); + + if (!sqlstr) { + snprintf(err, errlen, "missing remsql setting"); + return -1; + } + + if (gbl_fdb_emulate_old) { + /* we want to emulate a pre-cdb2api failure to parse remsql SET + * options; just return error here, do not set err + */ + return -1; + } + + if (strncasecmp(sqlstr, "name ", 5) == 0) { + sqlstr += 5; + if (!sqlstr[0]) { + snprintf(err, errlen, "missing coordinator dbname"); + return -1; + } + clnt->coordinator_dbname = strdup(sqlstr); + } else if (strncasecmp(sqlstr, "tier ", 5) == 0) { + sqlstr += 5; + if (!sqlstr[0]) { + snprintf(err, errlen, "missing coordinator tier"); + return -1; + } + clnt->coordinator_tier= strdup(sqlstr); + } else if (strncasecmp(sqlstr, "txnid ", 6) == 0) { + sqlstr += 6; + if (!sqlstr[0]) { + snprintf(err, errlen, "missing dist txn id"); + return -1; + } + clnt->dist_txnid = strdup(sqlstr); + } else if (strncasecmp(sqlstr, "tstamp ", 7) == 0) { + sqlstr += 7; + if (!sqlstr[0]) { + snprintf(err, errlen, "missing dist timestamp"); + return -1; + } + clnt->dist_timestamp = atoll(sqlstr); + } else { + snprintf(err, errlen, "failed to parse 2pc option %s", sqlstr); + return -1; + } + + if (clnt->coordinator_dbname && clnt->coordinator_tier && clnt->dist_txnid && + clnt->dist_timestamp) { + clnt->use_2pc = 1; + clnt->is_participant = 1; + } + + return 0; +} + int fdb_default_ver_set(int val) { if (val != gbl_fdb_default_ver) { diff --git a/db/fdb_fend.h b/db/fdb_fend.h index 418d9598a0..6edd9df571 100644 --- a/db/fdb_fend.h +++ b/db/fdb_fend.h @@ -30,7 +30,6 @@ #include "comdb2.h" #include "sql.h" -//#include "sqliteInt.h" #include "vdbeInt.h" #include "comdb2uuid.h" #include "net_int.h" @@ -73,8 +72,9 @@ #define FDB_VER_AUTH 6 #define FDB_VER_CDB2API 7 #define FDB_VER_WR_CDB2API 8 +#define FDB_VER_2PC_CDB2API 9 -#define FDB_VER FDB_VER_WR_CDB2API +#define FDB_VER FDB_VER_2PC_CDB2API extern int gbl_fdb_default_ver; @@ -449,6 +449,9 @@ extern int gbl_fdb_io_error_retries; int process_fdb_set_cdb2api(sqlclntstate *clnt, char *sqlstr, char *err, int errlen); +int process_fdb_set_cdb2api_2pc(sqlclntstate *clnt, char *sqlstr, + char *err, int errlen); + /** * Check that fdb class matches a specific class * @@ -470,5 +473,17 @@ cdb2_hndl_tp *fdb_push_connect(sqlclntstate *clnt, int *client_redir, */ void fdb_free_tran(sqlclntstate *clnt, fdb_tran_t *tran); +/** + * Initialize a 2pc coordinator + * + */ +void fdb_init_disttxn(sqlclntstate *clnt); + +/** + * SET the options for a distributed 2pc transaction + * + */ +int fdb_2pc_set(sqlclntstate *clnt, fdb_t *fdb, cdb2_hndl_tp *hndl); + #endif diff --git a/db/fdb_push.c b/db/fdb_push.c index 6469aeb64e..dee649d8e6 100644 --- a/db/fdb_push.c +++ b/db/fdb_push.c @@ -505,6 +505,7 @@ int handle_fdb_push_write(sqlclntstate *clnt, struct errstat *err) fdb_t *fdb; int created; int rc; + int set_intrans = 0; if (!push) return -2; @@ -537,20 +538,33 @@ int handle_fdb_push_write(sqlclntstate *clnt, struct errstat *err) if (created) { /* get a connection */ tran->is_cdb2api = 1; - tran->fcon.hndl = _hndl_open(clnt, NULL, 0 /* no sqlite rows for writes */, err); + tran->fcon.hndl = hndl = _hndl_open(clnt, NULL, 0 /* no sqlite rows for writes */, err); if (!tran->fcon.hndl) { rc = -2; goto free; } if (clnt->in_client_trans) { /* if not standalone, and this is the first reachout to this fdb, send begin */ - clnt->intrans = 1; + if (!clnt->intrans) { + clnt->intrans = 1; + set_intrans = 1; + } + + /* if this is 2pc, we need to send additional info to the participant */ + if (clnt->use_2pc) { + fdb_init_disttxn(clnt); + + rc = fdb_2pc_set(clnt, fdb, tran->fcon.hndl); + if (rc) { + goto hndl_err; + } + } + rc = cdb2_run_statement(tran->fcon.hndl, "begin"); while (rc == CDB2_OK) { rc = cdb2_next_record(tran->fcon.hndl); } if (rc != CDB2_OK_DONE) { - hndl = tran->fcon.hndl; goto hndl_err; } } @@ -628,6 +642,17 @@ int handle_fdb_push_write(sqlclntstate *clnt, struct errstat *err) hndl_err: errstr = cdb2_errstr(hndl); + extern const char *err_pre2pc; + if (errstr && !strncasecmp(errstr, err_pre2pc, strlen(err_pre2pc))) { + if (!created) { + /* instead of an assert */ + logmsg(LOGMSG_ERROR, "%s remote db %s lost locks\n", __func__, + push->remotedb); + abort(); + } + rc = -2; /* lets try a non-2pc version */ + goto free; + } errstat_set_rcstrf(err, rc, "%s", errstr); rc = write_response(clnt, RESPONSE_ERROR, (void*)errstr, rc); if (rc) { @@ -645,6 +670,15 @@ int handle_fdb_push_write(sqlclntstate *clnt, struct errstat *err) clnt->fdb_push = NULL; free(push->remotedb); free(push); + if (set_intrans) { + /* if this tried to short call to local sqlite3BtreeBeginTrans + * first time we try to push, and we set clnt->intrans, + * and we are gonna retry the legacy mode, need to reset + * that back so we do not skip calling sqlite3BtreeBeginTrans + */ + clnt->intrans = 0; + } + } return rc; } diff --git a/db/osqlsqlthr.c b/db/osqlsqlthr.c index 148b29dab6..50969625cf 100644 --- a/db/osqlsqlthr.c +++ b/db/osqlsqlthr.c @@ -392,6 +392,10 @@ static int osql_wait(struct sqlclntstate *clnt) osqlstate_t *osql = &clnt->osql; errstat_t dummy = {0}; + /* if this is a 2pc participant, we don't need to wait here */ + if (clnt->is_participant) + return 0; + /* If an error is set (e.g., selectv error from range check), latch it. */ errstat_t *err = (osql->xerr.errval == 0) ? &osql->xerr : &dummy; diff --git a/db/sqlglue.c b/db/sqlglue.c index a2821a0b9d..5233c98471 100644 --- a/db/sqlglue.c +++ b/db/sqlglue.c @@ -4992,10 +4992,6 @@ int sqlite3BtreeBeginTrans(Vdbe *vdbe, Btree *pBt, int wrflag, int *pSchemaVersi rc = start_new_transaction(clnt); - /* 2pc on tunable, only here for now */ - extern int gbl_2pc; - clnt->use_2pc = gbl_2pc; - done: if (rc == SQLITE_OK && pSchemaVersion) { sqlite3BtreeGetMeta(pBt, BTREE_SCHEMA_VERSION, (u32 *)pSchemaVersion); diff --git a/db/sqlinterfaces.c b/db/sqlinterfaces.c index d1d7a2b0ea..4f89bc53c0 100644 --- a/db/sqlinterfaces.c +++ b/db/sqlinterfaces.c @@ -5466,7 +5466,8 @@ void reset_clnt(struct sqlclntstate *clnt, int initial) clnt->num_retry = 0; clnt->early_retry = 0; - clnt->use_2pc = 0; + extern int gbl_2pc; + clnt->use_2pc = gbl_2pc; clnt->is_coordinator = 0; clnt->is_participant = 0; clear_participants(clnt); diff --git a/plugins/newsql/newsql.c b/plugins/newsql/newsql.c index d995ce11a1..54e1157e1b 100644 --- a/plugins/newsql/newsql.c +++ b/plugins/newsql/newsql.c @@ -2092,6 +2092,12 @@ int process_set_commands(struct sqlclntstate *clnt, CDB2SQLQUERY *sql_query) if (process_fdb_set_cdb2api(clnt, sqlstr, err, sizeof(err))) { rc = ii + 1; } + } else if (strncasecmp(sqlstr, "remtran_", 8) == 0) { + sqlstr += 8; + + if (process_fdb_set_cdb2api_2pc(clnt, sqlstr, err, sizeof(err))) { + rc = ii + 1; + } } else if (strncasecmp(sqlstr, "typessql", 8) == 0) { sqlstr += 8; sqlstr = skipws(sqlstr); diff --git a/tests/fdb_compat.test/output.log b/tests/fdb_compat.test/output.log index a1592ed51c..f005837fe3 100644 --- a/tests/fdb_compat.test/output.log +++ b/tests/fdb_compat.test/output.log @@ -1,4 +1,4 @@ -Current fdb version 8 +Current fdb version 9 (rows inserted=3) (rows inserted=3) (rows deleted=3) diff --git a/tests/tunables.test/t00_all_tunables.expected b/tests/tunables.test/t00_all_tunables.expected index 3f4a0e6cc0..64399faaf4 100644 --- a/tests/tunables.test/t00_all_tunables.expected +++ b/tests/tunables.test/t00_all_tunables.expected @@ -350,7 +350,7 @@ (name='externalauth_connect', description='Check for externalauth only once on connect', type='BOOLEAN', value='OFF', read_only='N') (name='externalauth_warn', description='Warn instead of returning error in case of missing authdata', type='BOOLEAN', value='OFF', read_only='N') (name='fake_sc_replication_timeout', description='Fake a replication timeout on finalize schemachange. ', type='BOOLEAN', value='OFF', read_only='N') -(name='fdb_default_version', description='Override the default fdb version', type='INTEGER', value='8', read_only='N') +(name='fdb_default_version', description='Override the default fdb version', type='INTEGER', value='9', read_only='N') (name='fdb_io_error_retries', description='Number of retries for io error remsql', type='INTEGER', value='16', read_only='N') (name='fdb_io_error_retries_phase_1', description='Number of immediate retries; capped by fdb_io_error_retries', type='INTEGER', value='6', read_only='N') (name='fdb_io_error_retries_phase_2_poll', description='Poll initial value for slow retries in phase 2; doubled for each retry', type='INTEGER', value='100', read_only='N')