Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: connect_params refactoring #281

Merged
merged 3 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 34 additions & 90 deletions peewee_async/databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from .connection import connection_context, ConnectionContextManager
from .pool import PoolBackend, PostgresqlPoolBackend, MysqlPoolBackend
from .transactions import Transaction
from .utils import psycopg2, aiopg, pymysql, aiomysql, __log__
from .utils import aiopg, aiomysql, __log__


class AioDatabase(peewee.Database):
Expand All @@ -17,15 +17,31 @@ class AioDatabase(peewee.Database):
pool_backend_cls: Type[PoolBackend]
pool_backend: PoolBackend

@property
def connect_params_async(self) -> Dict[str, Any]:
...
def __init__(self, *args, **kwargs):
self.pool_params = {}
super().__init__(*args, **kwargs)

def init_pool_params_defaults(self):
pass

def init_pool_params(self):
self.init_pool_params_defaults()
self.pool_params.update(
{
"minsize": self.connect_params.pop("min_connections", 1),
"maxsize": self.connect_params.pop("max_connections", 20),
}
)
pool_params = self.connect_params.pop('pool_params', {})
self.pool_params.update(pool_params)
self.pool_params.update(self.connect_params)

def init(self, database: Optional[str], **kwargs: Any) -> None:
super().init(database, **kwargs)
self.init_pool_params()
self.pool_backend = self.pool_backend_cls(
database=self.database,
**self.connect_params_async
**self.pool_params
)

async def aio_connect(self) -> None:
Expand Down Expand Up @@ -136,67 +152,21 @@ async def aio_execute(self, query, fetch_results=None):
return await self.aio_execute_sql(sql, params, fetch_results=fetch_results)


class AioPostgresqlMixin(AioDatabase, peewee.PostgresqlDatabase):
class PooledPostgresqlDatabase(AioDatabase, peewee.PostgresqlDatabase):
"""Extension for `peewee.PostgresqlDatabase` providing extra methods
for managing async connection.
"""

_enable_json: bool
_enable_hstore: bool

pool_backend_cls = PostgresqlPoolBackend

if psycopg2:
Error = psycopg2.Error
def init_pool_params_defaults(self) -> None:
self.pool_params.update({"enable_json": False, "enable_hstore": False})

def init_async(self, enable_json: bool = False, enable_hstore: bool = False) -> None:
def init(self, database: Optional[str], **kwargs: Any) -> None:
if not aiopg:
raise Exception("Error, aiopg is not installed!")
self._enable_json = enable_json
self._enable_hstore = enable_hstore


class PooledPostgresqlDatabase(AioPostgresqlMixin, peewee.PostgresqlDatabase):
"""PostgreSQL database driver providing **single drop-in sync**
connection and **async connections pool** interface.

:param max_connections: connections pool size

Example::

database = PooledPostgresqlDatabase('test', max_connections=20)

See also:
http://peewee.readthedocs.io/en/latest/peewee/api.html#PostgresqlDatabase
"""
min_connections: int = 1
max_connections: int = 20

def init(self, database: Optional[str], **kwargs: Any) -> None:
if min_connections := kwargs.pop('min_connections', False):
self.min_connections = min_connections

if max_connections := kwargs.pop('max_connections', False):
self.max_connections = max_connections

self.init_async()
super().init(database, **kwargs)

@property
def connect_params_async(self):
"""Connection parameters for `aiopg.Connection`
"""
kwargs = self.connect_params.copy()
kwargs.update(
{
'minsize': self.min_connections,
'maxsize': self.max_connections,
'enable_json': self._enable_json,
'enable_hstore': self._enable_hstore,
}
)
return kwargs


class PooledPostgresqlExtDatabase(
PooledPostgresqlDatabase,
Expand All @@ -205,8 +175,8 @@ class PooledPostgresqlExtDatabase(
"""PosgreSQL database extended driver providing **single drop-in sync**
connection and **async connections pool** interface.

JSON fields support is always enabled, HStore supports is enabled by
default, but can be disabled with ``register_hstore=False`` argument.
JSON fields support is enabled by default, HStore supports is disabled by
default, but can be enabled or through pool_params with ``register_hstore=False`` argument.

Example::

Expand All @@ -216,13 +186,11 @@ class PooledPostgresqlExtDatabase(
See also:
https://peewee.readthedocs.io/en/latest/peewee/playhouse.html#PostgresqlExtDatabase
"""

def init(self, database: Optional[str], **kwargs: Any) -> None:
self.init_async(
enable_json=True,
enable_hstore=self._register_hstore
)
super().init(database, **kwargs)
def init_pool_params_defaults(self) -> None:
self.pool_params.update({
"enable_json": True,
"enable_hstore": self._register_hstore
})


class PooledMySQLDatabase(AioDatabase, peewee.MySQLDatabase):
Expand All @@ -238,36 +206,12 @@ class PooledMySQLDatabase(AioDatabase, peewee.MySQLDatabase):
See also:
http://peewee.readthedocs.io/en/latest/peewee/api.html#MySQLDatabase
"""
min_connections: int = 1
max_connections: int = 20

pool_backend_cls = MysqlPoolBackend

if pymysql:
Error = pymysql.Error
def init_pool_params_defaults(self) -> None:
self.pool_params.update({"autocommit": True})

def init(self, database: Optional[str], **kwargs: Any) -> None:
if not aiomysql:
raise Exception("Error, aiomysql is not installed!")

if min_connections := kwargs.pop('min_connections', False):
self.min_connections = min_connections

if max_connections := kwargs.pop('max_connections', False):
self.max_connections = max_connections

super().init(database, **kwargs)

@property
def connect_params_async(self) -> Dict[str, Any]:
"""Connection parameters for `aiomysql.Connection`
"""
kwargs = self.connect_params.copy()
kwargs.update(
{
'minsize': self.min_connections,
'maxsize': self.max_connections,
'autocommit': True,
}
)
return kwargs
5 changes: 5 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ async def db(request):
MYSQL_DBS = ["mysql-pool"]


dbs_mysql = pytest.mark.parametrize(
"db", MYSQL_DBS, indirect=["db"]
)


dbs_postgres = pytest.mark.parametrize(
"db", PG_DBS, indirect=["db"]
)
Expand Down
9 changes: 7 additions & 2 deletions tests/db_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
'port': int(os.environ.get('POSTGRES_PORT', 5432)),
'password': 'postgres',
'user': 'postgres',
'connect_timeout': 30
'min_connections': 1,
'max_connections': 5,
'pool_params': {"timeout": 30, 'pool_recycle': 1.5}
}

MYSQL_DEFAULTS = {
Expand All @@ -16,7 +18,10 @@
'port': int(os.environ.get('MYSQL_PORT', 3306)),
'user': 'root',
'password': 'mysql',
'connect_timeout': 30
'connect_timeout': 30,
'min_connections': 1,
'max_connections': 5,
"pool_params": {"pool_recycle": 2}
}

DB_DEFAULTS = {
Expand Down
50 changes: 49 additions & 1 deletion tests/test_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from peewee_async import connection_context
from peewee_async.databases import AioDatabase
from tests.conftest import dbs_all, MYSQL_DBS, PG_DBS
from tests.conftest import dbs_all, MYSQL_DBS, PG_DBS, dbs_mysql
from tests.db_config import DB_DEFAULTS, DB_CLASSES
from tests.models import TestModel

Expand Down Expand Up @@ -67,3 +67,51 @@ async def test_deferred_init(db_name):
database.init(**DB_DEFAULTS[db_name])

await database.aio_execute_sql(sql='SELECT 1;')
await database.aio_close()


@pytest.mark.parametrize('db_name', PG_DBS + MYSQL_DBS)
async def test_connections_param(db_name):
default_params = DB_DEFAULTS[db_name].copy()
default_params['min_connections'] = 2
default_params['max_connections'] = 3

db_cls = DB_CLASSES[db_name]
database = db_cls(**default_params)
await database.aio_connect()

assert database.pool_backend.pool._minsize == 2
assert database.pool_backend.pool._free.maxlen == 3

await database.aio_close()


@dbs_mysql
async def test_mysql_params(db):
async with db.aio_connection() as connection_1:
assert connection_1.autocommit_mode is True
assert db.pool_backend.pool._recycle == 2


@pytest.mark.parametrize(
"db",
["postgres-pool"], indirect=["db"]
)
async def test_pg_json_hstore__params(db):
await db.aio_connect()
assert db.pool_backend.pool._enable_json is False
assert db.pool_backend.pool._enable_hstore is False
assert db.pool_backend.pool._timeout == 30
assert db.pool_backend.pool._recycle == 1.5


@pytest.mark.parametrize(
"db",
["postgres-pool-ext"], indirect=["db"]
)
async def test_pg_ext_json_hstore__params(db):
await db.aio_connect()
assert db.pool_backend.pool._enable_json is True
assert db.pool_backend.pool._enable_hstore is False
assert db.pool_backend.pool._timeout == 30
assert db.pool_backend.pool._recycle == 1.5
Loading