Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add BulkImport APIs and cron #211

Open
wants to merge 12 commits into
base: feat/bulk-import-base
Choose a base branch
from

Conversation

anku255
Copy link
Contributor

@anku255 anku255 commented Mar 20, 2024

Summary of change

(A few sentences about this PR)

Related issues

Test Plan

(Write your test plan here. If you changed any code, please provide us with clear instructions on how you verified your changes work. Bonus points for screenshots and videos!)

Documentation changes

(If relevant, please create a PR in our docs repo, or create a checklist here highlighting the necessary changes)

Checklist for important updates

  • Changelog has been updated
  • pluginInterfaceSupported.json file has been updated (if needed)
  • Changes to the version if needed
    • In build.gradle
  • Had installed and ran the pre-commit hook
  • If there are new dependencies that have been added in build.gradle, please make sure to add them in implementationDependencies.json.
  • Issue this PR against the latest non released version branch.
    • To know which one it is, run find the latest released tag (git tag) in the format vX.Y.Z, and then find the latest branch (git branch --all) whose X.Y is greater than the latest released tag.
    • If no such branch exists, then create one from the latest released branch.
  • When adding new recipes, ensure that its performance is being measured in the OneMillionUsersTest

CHANGELOG.md Show resolved Hide resolved
build.gradle Outdated
version = "7.0.0"
version = "7.1.0"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update branch with latest and update versions accordingly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// We are overriding the loadConfig method to set the connection pool size
// to 1 to avoid creating many connections for the bulk import cronjob
configJson.addProperty("postgresql_connection_pool_size", 1);
Config.loadConfig(this, configJson, logLevels, tenantIdentifier);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to just call super.loadConfig instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 591 to 599
{
String DROP_QUERY = "DROP INDEX IF EXISTS bulk_import_users_status_updated_at_index";
update(start, DROP_QUERY, NO_OP_SETTER);
}
{
String DROP_QUERY = "DROP INDEX IF EXISTS bulk_import_users_created_at_index";
update(start, DROP_QUERY, NO_OP_SETTER);
}
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are creating 3 indexes, but dropping just 2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

return "CREATE TABLE IF NOT EXISTS " + tableName + " ("
+ "id CHAR(36),"
+ "app_id VARCHAR(64) NOT NULL DEFAULT 'public',"
+ "primary_user_id VARCHAR(64),"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this 64 chars? SuperTokens user id is 36 chats

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated it to 36 chars.

Comment on lines +56 to +57
+ "CONSTRAINT " + Utils.getConstraintName(schema, tableName, "app_id", "fkey") + " "
+ "FOREIGN KEY(app_id) "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i assume all the entries of all apps will be in the public storage? In that case, if an app is pointing to a different storage, that app's app_id won't be in the public storage's apps table. Have you tested this? Maybe im wrong.

update(start, queryBuilder.toString(), pst -> {
int parameterIndex = 1;
for (BulkImportUser user : users) {
pst.setString(parameterIndex++, user.id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there is duplicate user id here, will the whole query fail?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the whole query will fail.

@@ -3071,4 +3097,83 @@ public int getDbActivityCount(String dbname) throws SQLException, StorageQueryEx
return -1;
});
}

@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the functions below are all supposed to be called NOT on the bulkimportproxystorage instance right? In that case, please assert in these that the instance of this is not of type bulkimportproxystorage

// NOTE: On average, we take about 66 seconds to process 1000 users. If, for any reason, the bulk import users were marked as processing but couldn't be processed within 10 minutes, we'll attempt to process them again.
String selectQuery = "SELECT * FROM " + Config.getConfig(start).getBulkImportUsersTable()
+ " WHERE app_id = ?"
+ " AND (status = 'NEW' OR (status = 'PROCESSING' AND updated_at < (EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000) - 10 * 60 * 1000))" /* 10 mins */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is 10 mins a good value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On average we take around 66 seconds to process 1000 users but we chose 10 mins wait time to give us ample amount of time before retrying the users in case the processing was delayed.

The processing can fail because of reasons like (DB failure), 10 mins gives some time for those services to recover before next retry.

String selectQuery = "SELECT * FROM " + Config.getConfig(start).getBulkImportUsersTable()
+ " WHERE app_id = ?"
+ " AND (status = 'NEW' OR (status = 'PROCESSING' AND updated_at < (EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000) - 10 * 60 * 1000))" /* 10 mins */
+ " LIMIT ? FOR UPDATE SKIP LOCKED";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we had decided not to use SKIP LOCKED?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had decided to use SKIP LOCKED and we tested that it was safe to do so. I have added comment explaining why we need this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants