Skip to content

Commit

Permalink
Basic compatibility with Drill < 1.19 for drill+sadrill (#71)
Browse files Browse the repository at this point in the history
* Add CHANGELOG with note about drill+sadrill and Drill < 1.19.

* Basic compat with Drill < 1.19 for drill+sadrill.
  • Loading branch information
jnturton authored Jul 30, 2021
1 parent e67dde1 commit 2d60846
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 83 deletions.
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
## [Unreleased]

## [1.1.1] - 2021-07-28

### Fixed

- Backwards compatibility with Drill < 1.19, limited to returning
all data values as strings. Users not able to upgrade to >= 1.19
must implement their own typecasting or use sqlalchemy-drill 0.3.

## [1.1.0] - 2021-07-21

**N.B.**: The drill+sadrill dialect in this release is not compatible with Drill
< 1.19.

### Changed

- Rewrite the drill+sadrill dialect using the ijson streaming parser.
75 changes: 54 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
# Apache Drill dialect for SQLAlchemy.

---

The primary purpose of this is to have a working dialect for Apache Drill that can be used with Apache Superset.

https://superset.incubator.apache.org

Obviously, a working, robust dialect for Drill serves other purposes as well, but most of the iterative planning for this REPO will be based on working with Superset. Other changes will gladly be incorporated, as long as it doesn't hurt Superset integration.
Obviously, a working, robust dialect for Drill serves other purposes as well, but most of the iterative planning for this REPO will be based on working with Superset. Other changes will gladly be incorporated, as long as it doesn't hurt Superset integration.

## Installation
Installing the dialect is straightforward. Simply:
## Installation

Installing the dialect is straightforward. Simply:

```
pip install sqlalchemy-drill
Expand All @@ -20,28 +23,48 @@ python3 -m pip install git+https://github.com/JohnOmernik/sqlalchemy-drill.git
```

## Usage

To use Drill with SQLAlchemy you will need to craft a connection string in the format below:

```
drill+sadrill://<username>:<password>@<host>:<port>/<storage_plugin>?use_ssl=True
```

To connect to Drill running on a local machine running in embedded mode you can use the following connection string.
To connect to Drill running on a local machine running in embedded mode you can use the following connection string.

```
drill+sadrill://localhost:8047/dfs?use_ssl=False
```

Query result metadata returned by the Drill REST API is stored in the `result_md` field of the DB-API Cursor object. Note that any trailing metadata, i.e. metadata which comes after result row data, will only be populated after you have iterated through all of the returned rows. If you need this trailing metadata you can make the cursor object reachable after it has been completely iterated by obtaining a reference to it beforehand, as follows.
```python
r = engine.execute('select current_timestamp')
r.cursor.result_md # access metadata, but only leading metadata
cur = r.cursor # obtain a reference for use later
r.fetchall() # iterate through all result data
cur.result_md # access metadata, including trailing metadata
del cur # optionally delete the reference when done
```

### Changes in Drill 1.19 affecting drill+sadrill

In versions of Drill earlier than 1.19, all data values are serialised to JSON strings and column type metadata comes after the data itself. As a result, for these versions of Drill, the drill+sadrill dialect returns every data value as a string. To convert non-string data to its native type you need to typecast it yourself.

In Drill 1.19 the REST API began making use of numeric types in JSON for numbers and times, the latter via a UNIX time representation. As a result, the drill+sadrill dialect is able to return appropriate types for numbers and times when used with Drill >= 1.19.

## Usage with JDBC

Connecting to Drill via JDBC is a little more complicated than a local installation and complete instructions can be found on the Drill documentation here: https://drill.apache.org/docs/using-the-jdbc-driver/.

In order to configure SQLAlchemy to work with Drill via JDBC you must:
* Download the latest JDBC Driver available here: http://apache.osuosl.org/drill/
* Copy this driver to your classpath or other known path
* Set an environment variable called `DRILL_JDBC_DRIVER_PATH` to the full path of your driver location
* Set an environment variable called `DRILL_JDBC_JAR_NAME` to the name of the `.jar` file for the Drill driver.

Additionally, you will need to install `JayDeBeApi` as well as jPype version 0.6.3.
These modules are listed as optional dependencies and will not be installed by the default installer.
- Download the latest JDBC Driver available here: http://apache.osuosl.org/drill/
- Copy this driver to your classpath or other known path
- Set an environment variable called `DRILL_JDBC_DRIVER_PATH` to the full path of your driver location
- Set an environment variable called `DRILL_JDBC_JAR_NAME` to the name of the `.jar` file for the Drill driver.

Additionally, you will need to install `JayDeBeApi` as well as jPype version 0.6.3.
These modules are listed as optional dependencies and will not be installed by the default installer.

If the JDBC driver is not available, the dialect will throw errors when trying
to connect. In addition, sqlalchemy-drill will not launch a JVM for you so you
Expand All @@ -55,52 +78,62 @@ jpype.startJVM("-ea", classpath="lib/*")
```
drill+jdbc://<username>:<passsword>@<host>:<port>
```

For a simple installation, this might look like:

```
drill+jdbc://admin:password@localhost:31010
```

## Usage with ODBC

In order to configure SQLAlchemy to work with Drill via ODBC you must:
* Install latest Drill ODBC Driver: https://drill.apache.org/docs/installing-the-driver-on-linux/
* Ensure that you have ODBC support in your system (`unixODBC` package for RedHat-based systems).
* Install `pyodbc` Python package.
This module is listed as an optional dependency and will not be installed by the default installer.

- Install latest Drill ODBC Driver: https://drill.apache.org/docs/installing-the-driver-on-linux/
- Ensure that you have ODBC support in your system (`unixODBC` package for RedHat-based systems).
- Install `pyodbc` Python package.
This module is listed as an optional dependency and will not be installed by the default installer.

To connect to Drill with SQLAlchemy use the following connection string:

```
drill+odbc:///?<ODBC connection parameters>
```

Connection properties are available in the official documentation: https://drill.apache.org/docs/odbc-configuration-reference/

For a simple installation, this might look like:

```
drill+odbc:///?Driver=/opt/mapr/drill/lib/64/libdrillodbc_sb64.so&ConnectionType=Direct&HOST=localhost&PORT=31010&AuthenticationType=Plain&UID=admin&PWD=password
```

or for the case when you have DSN configured in `odbc.ini`:

```
drill+odbc:///?DSN=drill_dsn_name
```

**Note:** it's better to avoid using connection string with `hostname:port` or `username`/`password`, like 'drill+odbc://admin:password@localhost:31010/' but use only ODBC properties instead to avoid any misinterpretation between these parameters.


## Usage with Superset
For a complete tutorial on how to use Superset with Drill, read the tutorial on @cgivre's blog available here: http://thedataist.com/visualize-anything-with-superset-and-drill/.

For a complete tutorial on how to use Superset with Drill, read the tutorial on @cgivre's blog available here: http://thedataist.com/visualize-anything-with-superset-and-drill/.

## Current Status/Development Approach

Currently we can connect to drill, and issue queries for most visualizations and get results. We also enumerate table columns for some times of tables. Here are things that are working as some larger issues to work out. (Individual issues are tracked under issues)

* Connection to Drill via the databases tab in Superset succeeds
* You can do basic queries for most types of viz/tables
* There may be issues with advanced queries/joins. As you learn about new ones, please track in issues
- Connection to Drill via the databases tab in Superset succeeds
- You can do basic queries for most types of viz/tables
- There may be issues with advanced queries/joins. As you learn about new ones, please track in issues

### Many thanks
to drillpy and pydrill for code used in creating the `drilldbapi.py` code for connecting!

### Docker
to drillpy and pydrill for code used in creating the original `drilldbapi.py` code for connecting!

### Docker

It is recommended to extend [the official Docker image](https://hub.docker.com/r/apache/superset) to include this Apache Drill driver:

```dockerfile
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
long_description = f.read()

setup(name='sqlalchemy_drill',
version='1.1.0',
version='1.1.1',
description="Apache Drill for SQLAlchemy",
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down Expand Up @@ -64,7 +64,7 @@
license='MIT',
url='https://github.com/JohnOmernik/sqlalchemy-drill',
download_url='https://github.com/JohnOmernik/sqlalchemy-drill/archive/'
'1.1.0.tar.gz',
'1.1.1.tar.gz',
packages=find_packages(),
include_package_data=True,
tests_require=['nose >= 0.11'],
Expand Down
2 changes: 1 addition & 1 deletion sqlalchemy_drill/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.

__version__ = '1.1.0'
__version__ = '1.1.1'
from sqlalchemy.dialects import registry

registry.register("drill", "sqlalchemy_drill.sadrill", "DrillDialect_sadrill")
Expand Down
133 changes: 74 additions & 59 deletions sqlalchemy_drill/drilldbapi/_drilldbapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def __init__(self, conn):

self._is_open: bool = True
self._result_event_stream = self._row_stream = None
self._typecasters: list = None
self._typecaster_list: list = None

def is_open(func):
'''Decorator for methods which require a connection'''
Expand All @@ -74,9 +74,19 @@ def func_wrapper(self, *args, **kwargs):

return func_wrapper

def _typecast(self, row) -> tuple:
'''Internal method to cast REST API values to Python types'''
return tuple((f(v) for f, v in zip(self._typecasters, row)))
def _gen_description(self, col_types):
blank = [None] * len(self.result_md['columns'])
self.description = tuple(
zip(
self.result_md['columns'], # name
col_types or blank, # type_code
blank, # display_size
blank, # internal_size
blank, # precision
blank, # scale
blank # null_ok
)
)

def _report_query_state(self):
md = self.result_md
Expand Down Expand Up @@ -187,33 +197,36 @@ def execute(self, operation, parameters=()):

self._result_event_stream = parse(RequestsStreamWrapper(resp))
row_data_present = self._outer_parsing_loop()
# The leading result metadata has now been parsed.

# The leading, and possibly all, result metadata has now been parsed.
md = self.result_md
logger.info(f'received Drill query ID {md.get("queryId", None)}.')
logger.info(
f'received Drill query ID {self.result_md.get("queryId", None)}.'
)

if row_data_present:
# strip size information in column types e.g. in VARCHAR(10)
coltypes = list(
map(lambda m: re.sub(r'\(.*\)', '', m), md.pop('metadata'))
)
ncols = len(coltypes)

md['column_types'] = coltypes
self._typecasters = [TYPECASTERS.get(
m, lambda v: v) for m in coltypes]
self.description = tuple(
zip(
md['columns'], # name
md['column_types'], # type_code
[None] * ncols, # display_size
[None] * ncols, # internal_size
[None] * ncols, # precision
[None] * ncols, # scale
[None] * ncols # null_ok
)
if not row_data_present:
return

cols = self.result_md['columns']
# Column metadata could be trailing or entirely absent
if 'metadata' in self.result_md:
md = self.result_md['metadata']
# strip size information from column types e.g. VARCHAR(10)
basic_coltypes = [re.sub(r'\(.*\)', '', m) for m in md]
self._gen_description(basic_coltypes)

self._typecaster_list = [
self.connection.typecasters.get(col, lambda v: v) for
col in basic_coltypes
]
else:
self._gen_description(None)
logger.warn(
'encountered data before metadata, typecasting during '
'streaming by this module will not take place. Upgrade '
'to Drill >= 1.19 or apply your own typecasting.'
)
logger.info(f'opened a row data stream of {ncols} columns.')

logger.info(f'opened a row data stream of {len(cols)} columns.')

@is_open
def executemany(self, operation, seq_of_parameters):
Expand Down Expand Up @@ -246,8 +259,14 @@ def fetchmany(self, size: int = None):

try:
while self.rownumber != fetch_until:
row = self._typecast(tuple(next(self._row_stream).values()))
results.append(row)
row_dict = next(self._row_stream)
# values ordered according to self.result_md['columns']
row = [row_dict[col] for col in self.result_md['columns']]

if self._typecaster_list is not None:
row = (f(v) for f, v in zip(self._typecaster_list, row))

results.append(tuple(row))
self.rownumber += 1

if self.rownumber % api_globals._PROGRESS_LOG_N == 0:
Expand Down Expand Up @@ -310,22 +329,37 @@ def __iter__(self):
class Connection(object):
def __init__(self,
host: str,
db: str,
port: int,
proto: str,
session: Session):
if session is None:
raise ProgrammingError('A Requests session is required.', None)

self.host = host
self.db = db
self.proto = proto
self.port = port
self._base_url = f'{proto}{host}:{port}'
self._session = session
self._connected = True

def submit_query(self, query):
logger.debug('queries Drill\'s version number...')
resp = self.submit_query(
'select min(version) version from sys.drillbits'
)
self.drill_version = resp.json()['rows'][0]['version']
logger.info(f'has connected to Drill version {self.drill_version}.')

if self.drill_version < '1.19':
self.typecasters = {}
else:
# Starting in 1.19 the Drill REST API returns UNIX times
self.typecasters = {
'DATE': lambda v: DateFromTicks(v/1000),
'TIME': lambda v: TimeFromTicks(v/1000),
'TIMESTAMP': lambda v: TimestampFromTicks(v/1000)
}
logger.debug(
'sets up typecasting functions for Drill >= 1.19.'
)

def submit_query(self, query: str):
payload = api_globals._PAYLOAD.copy()
# TODO: autoLimit, defaultSchema
payload['query'] = query
Expand All @@ -342,6 +376,7 @@ def submit_query(self, query):
)

# Decorator for methods which require connection

def connected(func):

def func_wrapper(self, *args, **kwargs):
Expand Down Expand Up @@ -429,25 +464,11 @@ def connect(host: str,
logger.error('failed to authenticate to Drill.')
raise AuthError(str(raw_data), response.status_code)

conn = Connection(host, port, proto, session)
if db is not None:
payload = api_globals._PAYLOAD.copy()
payload['query'] = f'USE {db}'
# payload['query'] = "SELECT 'test' FROM (VALUES(1))"

response = session.post(
f'{base_url}/query.json',
data=dumps(payload),
headers=api_globals._HEADER
)
conn.submit_query(f'USE {db}')

if response.status_code != 200:
logger.error(f'received an error when trying to USE {db}')
raise DatabaseError(
str(response.json().get('errorMessage', None)),
response.status_code
)

return Connection(host, db, port, proto, session)
return conn


class RequestsStreamWrapper(object):
Expand Down Expand Up @@ -510,12 +531,6 @@ def __cmp__(self, other):
return -1


TYPECASTERS = {
'DATE': lambda v: DateFromTicks(v/1000),
'TIME': lambda v: TimeFromTicks(v/1000),
'TIMESTAMP': lambda v: TimestampFromTicks(v/1000)
}

# Mandatory type objects defined by DB-API 2 specs.

STRING = DBAPITypeObject('VARCHAR')
Expand Down

0 comments on commit 2d60846

Please sign in to comment.