This repository has been archived by the owner on Mar 13, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 17
/
netezza_dialect.py
325 lines (268 loc) · 10.2 KB
/
netezza_dialect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
'''SQLAlchemy dialect for Netezza'''
from sqlalchemy.dialects import registry
from sqlalchemy.engine import reflection
from sqlalchemy.connectors.pyodbc import PyODBCConnector
from sqlalchemy.dialects.postgresql.base import (
PGDialect, PGTypeCompiler, PGCompiler, PGDDLCompiler, DOUBLE_PRECISION,
INTERVAL, TIME, TIMESTAMP)
import sqlalchemy.types as sqltypes
from sqlalchemy.schema import DDLElement, SchemaItem
from sqlalchemy.sql import text, bindparam
import pyodbc
import re
from sqlalchemy.ext.compiler import compiles
# pylint:disable=R0901,W0212
class ST_GEOMETRY(sqltypes.Binary):
__visit_name__ = 'ST_GEOMETRY'
class BYTEINT(sqltypes.INTEGER):
__visit_name__ = 'BYTEINT'
class NVARCHAR(sqltypes.NVARCHAR):
'''Netezza NVARCHAR'''
def __init__(self, length=None, collation=None,
convert_unicode='force',
unicode_error=None):
super(NVARCHAR, self).__init__(
length,
collation=collation,
convert_unicode=convert_unicode,
unicode_error='ignore')
class OID(sqltypes.BigInteger):
'''System table only type'''
__visit_name__ = 'OID'
class NAME(NVARCHAR):
'''System table only type'''
__visit_name__ = 'NAME'
class ABSTIME(sqltypes.TIME):
'''System table only type'''
__visit_name__ = 'ABSTIME'
# Weird types gleaned from _v_datatype
ischema_names = {
'st_geometry': ST_GEOMETRY,
'byteint': BYTEINT,
'oid': OID,
'name': NAME,
}
class NetezzaTypeCompiler(PGTypeCompiler):
'''Fills out unique netezza types'''
def visit_ST_GEOMETRY(self, type_):
return 'ST_GEOMETRY({})'.format(type_.length)
def visit_BYTEINT(self, _type):
return 'BYTEINT'
def visit_OID(self, _type):
return 'OID'
def visit_NAME(self, _type):
return 'NAME'
def visit_ABSTIME(self, _type):
return 'ABSTIME'
class NetezzaCompiler(PGCompiler):
'''Handles some quirks of netezza queries'''
def limit_clause(self, select):
'''Netezza doesn't allow sql params in the limit/offset piece'''
text = ""
if select._limit is not None:
text += " \n LIMIT {limit}".format(limit=int(select._limit))
if select._offset is not None:
if select._limit is None:
text += " \n LIMIT ALL"
text += " OFFSET {offset}".format(offset=int(select._offset))
return text
class DistributeOn(SchemaItem):
'''Represents a distribute on clause'''
def __init__(self, *column_names):
'''Use like:
my_table_1 = Table('my_table_1', metadata,
Column('id_key', BIGINT),
Column('nbr', BIGINT),
DistributeOn('id_key')
)
my_table_2 = Table('my_table_2', metadata,
Column('id_key', BIGINT),
Column('nbr', BIGINT),
DistributeOn('random')
)
'''
self.column_names = column_names if column_names else ('RANDOM',)
def _set_parent(self, parent):
self.parent = parent
parent.distribute_on = self
class NetezzaDDLCompiler(PGDDLCompiler):
'''Adds Netezza specific DDL clauses'''
def post_create_table(self, table):
'''Adds the `distribute on` clause to create table expressions'''
clause = ' DISTRIBUTE ON {columns}'
if hasattr(table, 'distribute_on') and \
table.distribute_on.column_names[0].lower() != 'random':
column_list = ','.join(table.distribute_on.column_names)
columns = '({})'.format(column_list)
else:
columns = 'RANDOM'
return clause.format(columns=columns)
# Maps type ids to sqlalchemy types, plus whether they have variable precision
oid_datatype_map = {
16: (sqltypes.Boolean, False),
18: (sqltypes.CHAR, False),
19: (NAME, False),
20: (sqltypes.BigInteger, False),
21: (sqltypes.SmallInteger, False),
23: (sqltypes.Integer, False),
25: (sqltypes.TEXT, False),
26: (OID, False),
700: (sqltypes.REAL, False),
701: (DOUBLE_PRECISION, False),
702: (ABSTIME, False),
1042: (sqltypes.CHAR, True),
1043: (sqltypes.String, True),
1082: (sqltypes.Date, False),
1083: (TIME, False),
1184: (TIMESTAMP, False),
1186: (INTERVAL, False),
1266: (TIMESTAMP, False),
1700: (sqltypes.Numeric, False),
2500: (BYTEINT, False),
2522: (sqltypes.NCHAR, True),
2530: (sqltypes.NVARCHAR, True),
2552: (ST_GEOMETRY, True),
2568: (sqltypes.VARBINARY, True),
}
class NetezzaODBC(PyODBCConnector, PGDialect):
'''Attempts to reuse as much as possible from the postgresql and pyodbc
dialects.
'''
name = 'netezza'
encoding = 'latin9'
default_paramstyle = 'qmark'
returns_unicode_strings = False
supports_native_enum = False
supports_sequences = True
sequences_optional = False
isolation_level = 'READ COMMITTED'
max_identifier_length = 128
type_compiler = NetezzaTypeCompiler
statement_compiler = NetezzaCompiler
ddl_compiler = NetezzaDDLCompiler
description_encoding = None
def initialize(self, connection):
super(NetezzaODBC, self).initialize(connection)
# PyODBC connector tries to set these to true...
self.supports_unicode_statements = False
self.supports_unicode_binds = False
self.returns_unicode_strings = True
self.convert_unicode = 'ignore'
self.encoding = 'latin9'
self.ischema_names.update(ischema_names)
def has_table(self, connection, tablename, schema=None):
'''Checks if the table exists in the current database'''
# Have to filter by database name because the table could exist in
# another database on the same machine
dbname = connection.connection.getinfo(pyodbc.SQL_DATABASE_NAME)
sql = ('select count(*) from _v_object_data where objname = ? '
'and dbname = ?')
result = connection.execute(sql, (str(tablename), dbname)).scalar()
return bool(result)
def get_table_names(self, connection, schema=None, **kw):
result = connection.execute(
"select tablename as name from _v_table "
"where tablename not like '_t_%'")
table_names = [r[0] for r in result]
return table_names
@reflection.cache
def get_columns(self, connection, table_name, schema=None, **kw):
SQL_COLS = """
SELECT CAST(a.attname AS VARCHAR(128)) as name,
a.atttypid as typeid,
not a.attnotnull as nullable,
a.attcolleng as length,
a.format_type
FROM _v_relation_column a
WHERE a.name = :tablename
ORDER BY a.attnum
"""
s = text(SQL_COLS,
bindparams=[bindparam('tablename', type_=sqltypes.String)],
typemap={'name': NAME,
'typeid': sqltypes.Integer,
'nullable': sqltypes.Boolean,
'length': sqltypes.Integer,
'format_type': sqltypes.String,
})
c = connection.execute(s, tablename=table_name)
rows = c.fetchall()
# format columns
columns = []
for name, typeid, nullable, length, format_type in rows:
coltype_class, has_length = oid_datatype_map[typeid]
if coltype_class is sqltypes.Numeric:
precision, scale = re.match(
r'numeric\((\d+),(\d+)\)', format_type).groups()
coltype = coltype_class(int(precision), int(scale))
elif has_length:
coltype = coltype_class(length)
else:
coltype = coltype_class()
columns.append({
'name': name,
'type': coltype,
'nullable': nullable,
})
return columns
@reflection.cache
def get_pk_constraint(self, connection, table_name, schema=None, **kw):
'''Netezza doesn't have PK/unique constraints'''
return {'constrained_columns': [], 'name': None}
@reflection.cache
def get_foreign_keys(self, connection, table_name, schema=None, **kw):
'''Netezza doesn't have foreign keys'''
return []
@reflection.cache
def get_indexes(self, connection, table_name, schema=None, **kw):
'''Netezza doesn't have indexes'''
return []
@reflection.cache
def get_view_names(self, connection, schema=None, **kw):
result = connection.execute(
"select viewname as name from _v_view"
"where viewname not like '_v_%'")
return [r[0] for r in result]
def get_isolation_level(self, connection):
return self.isolation_level
def _get_default_schema_name(self, connection):
'''Netezza doesn't use schemas'''
raise NotImplementedError
def _check_unicode_returns(self, connection):
'''Netezza doesn't *do* unicode (except in nchar & nvarchar)'''
pass
class CreateTableAs(DDLElement):
"""Create a CREATE TABLE AS SELECT ... statement."""
def __init__(self,
new_table_name,
selectable,
temporary=False,
distribute_on='random'):
'''Distribute_on may be a tuple of column names'''
super(CreateTableAs, self).__init__()
self.selectable = selectable
self.temporary = temporary
self.new_table_name = new_table_name
self.distribute_on = distribute_on
def distribute_clause(self):
if self.distribute_on.lower() != 'random':
column_list = ','.join(self.distribute_on)
return '({})'.format(column_list)
else:
return 'RANDOM'
@compiles(CreateTableAs)
def visit_create_table_as(element, compiler, **_kwargs):
'''compiles a ctas statement'''
return """
CREATE {tmp} TABLE {name} AS (
{select}
) DISTRIBUTE ON {distribute}
""".format(
tmp='TEMP' if element.temporary else '',
name=element.new_table_name,
select=compiler.sql_compiler.process(element.selectable),
distribute=element.distribute_clause(),
)
registry.register("netezza", "netezza_dialect", "NetezzaODBC")
registry.register(
"netezza.pyodbc", "netezza_dialect", "NetezzaODBC")