-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathredshift_remove_duplicates.py
180 lines (145 loc) · 4.76 KB
/
redshift_remove_duplicates.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# @contributors Carlos Barreto & Rodrigo Vieira
import psycopg2
import getopt
import os
import sys
db_connections = {}
db = None
db_user = None
db_pwd = None
db_host = None
db_port = None
def execute_query(statement):
print("Statement %s" % statement)
conn = get_pg_conn()
result = None
try:
cur = conn.cursor()
cur.execute(statement)
query_result = cur.fetchall()
if query_result is not None:
result = query_result
print('Query Execution returned %s Results' % (len(result)))
except Exception as e:
print(str(e))
return result
def get_pg_conn():
pid = str(os.getpid())
conn = None
# get the database connection for this PID
try:
conn = db_connections[pid]
except KeyError:
pass
if conn == None:
try:
options = 'keepalives=1 keepalives_idle=200 keepalives_interval=200 keepalives_count=5'
connection_string = "host=%s port=%s dbname=%s user=%s password=%s %s" % (db_host, db_port, db, db_user, db_pwd, options)
print(connection_string)
conn = psycopg2.connect(connection_string)
except Exception as e:
print(str(e.pgerror))
conn.close()
# cache the connection
db_connections[pid] = conn
return conn
#return table names into a definied schema
def get_tables(schemaname):
statement = """
SELECT tablename FROM pg_tables WHERE schemaname='{schemaname}';
""".format(schemaname=schemaname)
rows = execute_query(statement)
response = []
for row in rows:
response.append(row[0])
return response
def get_table_metainfo(table):
statement = """
SELECT "column" FROM pg_table_def WHERE schemaname = 'public'
AND tablename = '{table}'
""".format(table=table)
rows = execute_query(statement)
response = []
for row in rows:
response.append(row[0])
return response
def create_storage_table(table):
ddl = """
SELECT ddl FROM admin.v_generate_tbl_ddl
WHERE tablename = '{table}' AND schemaname = 'public' AND seq > 2
""".format(table=table)
table_ddl = execute_query(ddl)
create_temp_table = """CREATE TEMP TABLE {table}_tmp """.format(table=table)
for row in table_ddl:
create_temp_table += " " + row[0]
print("DDL to be executed")
execute_query(create_temp_table)
def insert_into_storage(table):
metainfo = get_table_metainfo(table)
columns = ",".join(metainfo)
copy_original_to_temp = """
INSERT INTO {table}_tmp (
SELECT {columns}
FROM (
SELECT *,ROW_NUMBER() OVER (PARTITION BY {id} ORDER BY {id} ASC) rownum
FROM {table})
WHERE rownum = 1
)
""".format(table=table,columns=columns,id=metainfo[0])
execute_query(copy_original_to_temp)
def truncate_original_table(table):
drop_table = """TRUNCATE {table}""".format(table=table)
execute_query(drop_table)
def insert_into_table(table):
rename_table = """INSERT INTO {table} (SELECT * FROM {table}_tmp)""".format(table=table)
execute_query(rename_table)
def remove_duplicates(table):
#create temp table
print("create storage table")
create_storage_table(table)
#insert in temp table without duplicate entries
print("insert into storage table")
insert_into_storage(table)
#truncate original table
print("truncate original table")
truncate_original_table(table)
#insert temp table to original table
print("insert into original table")
insert_into_table(table)
def main(argv):
global db
global db_user
global db_pwd
global db_host
global db_port
supported_args = """db= db-user= db-pwd= db-host= db-port= schema-name="""
try:
optlist, remaining = getopt.getopt(argv[1:], "", supported_args.split())
except getopt.GetoptError as err:
print(str(err))
for arg, value in optlist:
if arg == "--db":
db = value
if arg == "--db-user":
db_user = value
if arg == "--db-pwd":
db_pwd = value
if arg == "--db-host":
db_host = value
if arg == "--db-port":
db_port = value
if arg == "--schema-name":
schema_name = value
try:
conn = get_pg_conn()
tables = get_tables(schema_name)
for table in tables:
remove_duplicates(table)
conn.commit()
except Exception as err:
print(str(err))
conn.rollback()
finally:
conn.close()
if __name__ == "__main__":
main(sys.argv)