-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathschema_manage.py
101 lines (75 loc) · 3.89 KB
/
schema_manage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import sys
import os
import json
import logging
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
logger = logging.getLogger('init_keyspace')
logger.setLevel(logging.INFO)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
logger.addHandler(ch)
def getenv(name, fallback=None):
value = os.getenv(name, default=fallback)
if not value:
raise Exception("The environment variable {0} is undefined".format(name))
return str(value)
def create_table(keyspace, table_name, table_def):
logger.info("Creating {}.{} table".format(keyspace, table_name))
session.execute("CREATE TABLE IF NOT EXISTS {}.{}{}".format(keyspace, table_name, table_def))
def execute_query(keyspace, query):
logger.info("Executing Query {}.{} table".format(keyspace, query))
session.execute("{}".format(query))
def get_schema_version(keyspace):
version = None
rows = session.execute("select version from schemamanager.keyspace_versions where keyspace_name='{}'".format(keyspace))
for row in rows:
version = row.version
return version
def insert_query(keyspace):
logger.info("Initilizing keyspace {}".format(keyspace))
session.execute("INSERT INTO schemamanager.keyspace_versions (keyspace_name, modified_date, version) VALUES ('{0}', unixTimestampOf(now()), 0)".format(keyspace))
def update_query(keyspace,version):
logger.info("Updating keyspace {} to Schema Version:{}".format(keyspace, version))
session.execute("UPDATE schemamanager.keyspace_versions set version={0} WHERE keyspace_name='{1}'".format(int(version), keyspace))
def create_keyspace(keyspace,replication):
logger.info("Creating {}.{} Keyspace".format(keyspace, replication))
session.execute("CREATE KEYSPACE IF NOT EXISTS {0} WITH REPLICATION = {1}".format(keyspace, replication))
def parse_schema_jsons():
path_to_json = '/app/'
json_files = [pos_json for pos_json in os.listdir(path_to_json) if pos_json.endswith('.json')]
for index, js in enumerate(json_files):
with open(os.path.join(path_to_json, js)) as json_file:
json_text = json.load(json_file)
for version ,queries in json_text['schemaversions'].items():
keyspace = json_text['keyspacename']
# Create KeySpace with replication
create_keyspace(keyspace,replication)
#get Schema Version from schemamanager keyspace versions table
if get_schema_version(keyspace) == None:
insert_query(keyspace)
current_version = get_schema_version(keyspace)
if int(version) > int(current_version):
#execute queries
logger.info("Proceeding with Update Schema Version:{0} for Keyspace:{1} ".format(version,keyspace))
for query in queries:
logger.info("Running Query:" + query)
execute_query(keyspace,query)
update_query(keyspace,version)
logger.info("Schema Version:{0} for Keyspace:{1} Status: Updated".format(version,keyspace))
else:
logger.info("Schema Version:{0} for Keyspace:{1} Status: Skipped".format(version,keyspace))
logger.info("Running {}".format(sys.argv[0]))
contact_points = getenv("CONTACT_POINTS").split(",")
replication = getenv("REPLICATION")
logger.info("contact_points = %s", contact_points)
logger.info("replication = %s", replication)
username = getenv("USERNAME", fallback="cassandra")
password = getenv("PASSWORD", fallback="cassandra")
auth_provider = PlainTextAuthProvider(username=username, password=password)
cluster = Cluster(contact_points, auth_provider=auth_provider)
session = cluster.connect()
#create schemamanager keyspace and tables
create_keyspace("schemamanager",replication)
create_table("schemamanager","keyspace_versions","(keyspace_name text, version int, modified_date timestamp, PRIMARY KEY (keyspace_name))")
parse_schema_jsons()