Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions asyncpg/_testbase/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ def _shutdown_cluster(cluster):


def create_pool(dsn=None, *,
init_size=10,
min_size=10,
max_size=10,
max_queries=50000,
Expand All @@ -281,6 +282,7 @@ def create_pool(dsn=None, *,
**connect_kwargs):
return pool_class(
dsn,
init_size=init_size,
min_size=min_size,
max_size=max_size,
max_queries=max_queries,
Expand Down
55 changes: 49 additions & 6 deletions asyncpg/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,15 @@ def _deactivate_inactive_connection(self) -> None:
'attempting to deactivate an acquired connection')

if self._con is not None:
# The connection is idle and not in use,
# but we have min size limitation. So keep it alive for a while.
if self._pool.get_size() <= self._pool.get_min_size():
# We already in the callback. Clean the field
self._inactive_callback = None
# But next time it can be the case when we have to terminate it
self._setup_inactive_callback()
return

# The connection is idle and not in use, so it's fine to
# use terminate() instead of close().
self._con.terminate()
Expand Down Expand Up @@ -338,14 +347,15 @@ class Pool:
"""

__slots__ = (
'_queue', '_loop', '_minsize', '_maxsize',
'_queue', '_loop', '_initsize', '_minsize', '_maxsize',
'_init', '_connect', '_reset', '_connect_args', '_connect_kwargs',
'_holders', '_initialized', '_initializing', '_closing',
'_closed', '_connection_class', '_record_class', '_generation',
'_setup', '_max_queries', '_max_inactive_connection_lifetime'
)

def __init__(self, *connect_args,
init_size,
min_size,
max_size,
max_queries,
Expand Down Expand Up @@ -381,6 +391,16 @@ def __init__(self, *connect_args,
if min_size > max_size:
raise ValueError('min_size is greater than max_size')

if init_size < 0:
raise ValueError(
'init_size is expected to be greater or equal to zero')

if init_size > max_size:
raise ValueError('init_size is greater than max_size')

if init_size < min_size:
raise ValueError('init_size is smaller than min_size')

if max_queries <= 0:
raise ValueError('max_queries is expected to be greater than zero')

Expand All @@ -399,6 +419,7 @@ def __init__(self, *connect_args,
'record_class is expected to be a subclass of '
'asyncpg.Record, got {!r}'.format(record_class))

self._initsize = init_size
self._minsize = min_size
self._maxsize = max_size

Expand Down Expand Up @@ -454,7 +475,7 @@ async def _initialize(self):
self._holders.append(ch)
self._queue.put_nowait(ch)

if self._minsize:
if self._initsize:
# Since we use a LIFO queue, the first items in the queue will be
# the last ones in `self._holders`. We want to pre-connect the
# first few connections in the queue, therefore we want to walk
Expand All @@ -465,11 +486,11 @@ async def _initialize(self):
first_ch = self._holders[-1] # type: PoolConnectionHolder
await first_ch.connect()

if self._minsize > 1:
if self._initsize > 1:
connect_tasks = []
for i, ch in enumerate(reversed(self._holders[:-1])):
# `minsize - 1` because we already have first_ch
if i >= self._minsize - 1:
# `initsize - 1` because we already have first_ch
if i >= self._initsize - 1:
break
connect_tasks.append(ch.connect())

Expand All @@ -489,10 +510,21 @@ def get_size(self):
"""
return sum(h.is_connected() for h in self._holders)

def get_init_size(self):
"""Return the initial number of connections in this pool.

.. versionadded:: 0.32.0
"""
return self._initsize

def get_min_size(self):
"""Return the minimum number of connections in this pool.

.. versionadded:: 0.25.0

.. versionchanged:: 0.32.0
The parameter now controls the connection floor rather than the
initial pool size (see ``init_size``).
"""
return self._minsize

Expand Down Expand Up @@ -1073,6 +1105,7 @@ def __await__(self):


def create_pool(dsn=None, *,
init_size=10,
min_size=10,
max_size=10,
max_queries=50000,
Expand Down Expand Up @@ -1147,8 +1180,11 @@ def create_pool(dsn=None, *,
the connections in this pool. Must be a subclass of
:class:`~asyncpg.Record`.

:param int init_size:
Number of connections the pool will be initialized with.

:param int min_size:
Number of connection the pool will be initialized with.
Minimum number of connections the pool will keep alive at all times.

:param int max_size:
Max number of connections in the pool.
Expand Down Expand Up @@ -1230,11 +1266,18 @@ def create_pool(dsn=None, *,

.. versionchanged:: 0.30.0
Added the *connect* and *reset* parameters.

.. versionchanged:: 0.32.0
The *min_size* parameter now defines the connection floor (minimum
number of live connections kept at all times). The former role of
*min_size* — setting the initial pool size — is now handled by the
new *init_size* parameter.
"""
return Pool(
dsn,
connection_class=connection_class,
record_class=record_class,
init_size=init_size,
min_size=min_size,
max_size=max_size,
max_queries=max_queries,
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ docs = [
[build-system]
requires = [
"setuptools>=77.0.3",
"Cython(>=3.2.1,<4.0.0)"
"Cython(>=3.2.1,<4.0.0)",
"packaging",
]
build-backend = "setuptools.build_meta"

Expand Down
7 changes: 3 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,6 @@ def finalize_options(self):
need_cythonize = True

if need_cythonize:
import pkg_resources

# Double check Cython presence in case setup_requires
# didn't go into effect (most likely because someone
# imported Cython before setup_requires injected the
Expand All @@ -201,8 +199,9 @@ def finalize_options(self):
'please install {} to compile asyncpg from source'.format(
CYTHON_DEPENDENCY))

cython_dep = pkg_resources.Requirement.parse(CYTHON_DEPENDENCY)
if Cython.__version__ not in cython_dep:
from packaging.requirements import Requirement
cython_dep = Requirement(CYTHON_DEPENDENCY)
if Cython.__version__ not in cython_dep.specifier:
raise RuntimeError(
'asyncpg requires {}, got Cython=={}'.format(
CYTHON_DEPENDENCY, Cython.__version__
Expand Down
7 changes: 4 additions & 3 deletions tests/test_adversity.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async def test_connection_close_timeout(self):
@tb.with_timeout(30.0)
async def test_pool_acquire_timeout(self):
pool = await self.create_pool(
database='postgres', min_size=2, max_size=2)
database='postgres', init_size=2, min_size=0, max_size=2)
try:
self.proxy.trigger_connectivity_loss()
for _ in range(2):
Expand All @@ -46,7 +46,7 @@ async def test_pool_acquire_timeout(self):
@tb.with_timeout(30.0)
async def test_pool_release_timeout(self):
pool = await self.create_pool(
database='postgres', min_size=2, max_size=2)
database='postgres', init_size=2, min_size=0, max_size=2)
try:
with self.assertRaises(asyncio.TimeoutError):
async with pool.acquire(timeout=0.5):
Expand Down Expand Up @@ -74,7 +74,8 @@ def kill_connectivity():
self.proxy.trigger_connectivity_loss()

new_pool = self.create_pool(
database='postgres', min_size=pool_size, max_size=pool_size,
database='postgres',
init_size=pool_size, min_size=0, max_size=pool_size,
timeout=cmd_timeout, command_timeout=cmd_timeout)

with self.assertRunUnder(worst_runtime):
Expand Down
6 changes: 3 additions & 3 deletions tests/test_cache_invalidation.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async def test_prepare_cache_invalidation_in_transaction(self):

async def test_prepare_cache_invalidation_in_pool(self):
pool = await self.create_pool(database='postgres',
min_size=2, max_size=2)
init_size=2, min_size=0, max_size=2)

await self.con.execute('CREATE TABLE tab1(a int, b int)')

Expand Down Expand Up @@ -309,10 +309,10 @@ async def test_type_cache_invalidation_on_change_attr(self):
async def test_type_cache_invalidation_in_pool(self):
await self.con.execute('CREATE DATABASE testdb')
pool = await self.create_pool(database='postgres',
min_size=2, max_size=2)
init_size=2, min_size=0, max_size=2)

pool_chk = await self.create_pool(database='testdb',
min_size=2, max_size=2)
init_size=2, min_size=0, max_size=2)

await self.con.execute('CREATE TYPE typ1 AS (x int, y int)')
await self.con.execute('CREATE TABLE tab1(a int, b typ1)')
Expand Down
2 changes: 2 additions & 0 deletions tests/test_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -1961,6 +1961,7 @@ async def test_ssl_connection_pool(self):
host='localhost',
user='ssl_user',
database='postgres',
init_size=5,
min_size=5,
max_size=10,
ssl=ssl_context)
Expand Down Expand Up @@ -2221,6 +2222,7 @@ async def test_nossl_connection_pool(self):
host='localhost',
user='ssl_user',
database='postgres',
init_size=5,
min_size=5,
max_size=10,
ssl='prefer')
Expand Down
Loading