feat(AsyncOpenSearch): consistent pool_maxsize setting (#845)
Signed-off-by: samypr100 <3933065+samypr100@users.noreply.github.com>
This commit is contained in:
@@ -4,6 +4,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
|
||||
## [Unreleased]
|
||||
### Added
|
||||
- Added `AsyncSearch#collapse` ([827](https://github.com/opensearch-project/opensearch-py/pull/827))
|
||||
- Support `pool_maxsize` in `AsyncOpenSearch` ([845](https://github.com/opensearch-project/opensearch-py/pull/845))
|
||||
### Changed
|
||||
### Deprecated
|
||||
### Removed
|
||||
|
||||
@@ -6,9 +6,8 @@
|
||||
#
|
||||
# Modifications Copyright OpenSearch Contributors. See
|
||||
# GitHub history for details.
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import time
|
||||
from typing import Any
|
||||
from unittest import SkipTest
|
||||
|
||||
@@ -37,7 +36,7 @@ async def get_test_client(nowait: bool = False, **kwargs: Any) -> Any:
|
||||
await client.cluster.health(wait_for_status="yellow")
|
||||
return client
|
||||
except ConnectionError:
|
||||
time.sleep(0.1)
|
||||
await asyncio.sleep(0.1)
|
||||
else:
|
||||
# timeout
|
||||
raise SkipTest("OpenSearch failed to start.")
|
||||
|
||||
@@ -137,6 +137,7 @@ class AIOHttpConnection(AsyncConnection):
|
||||
url_prefix=url_prefix,
|
||||
timeout=timeout,
|
||||
use_ssl=use_ssl,
|
||||
maxsize=maxsize,
|
||||
headers=headers,
|
||||
http_compress=http_compress,
|
||||
opaque_id=opaque_id,
|
||||
@@ -219,6 +220,10 @@ class AIOHttpConnection(AsyncConnection):
|
||||
self.loop = loop
|
||||
self.session = None
|
||||
|
||||
# Align with Sync Interface
|
||||
if "pool_maxsize" in kwargs:
|
||||
maxsize = kwargs.pop("pool_maxsize")
|
||||
|
||||
# Parameters for creating an aiohttp.ClientSession later.
|
||||
self._limit = maxsize
|
||||
self._http_auth = http_auth
|
||||
|
||||
@@ -74,6 +74,7 @@ class AsyncTransport(Transport):
|
||||
serializers: Any = None,
|
||||
default_mimetype: str = "application/json",
|
||||
max_retries: int = 3,
|
||||
pool_maxsize: Optional[int] = None,
|
||||
retry_on_status: Any = (502, 503, 504),
|
||||
retry_on_timeout: bool = False,
|
||||
send_get_body_as: str = "GET",
|
||||
@@ -102,6 +103,8 @@ class AsyncTransport(Transport):
|
||||
:arg default_mimetype: when no mimetype is specified by the server
|
||||
response assume this mimetype, defaults to `'application/json'`
|
||||
:arg max_retries: maximum number of retries before an exception is propagated
|
||||
:arg pool_maxsize: Maximum connection pool size used by pool-manager
|
||||
For custom connection-pooling on current session
|
||||
:arg retry_on_status: set of HTTP status codes on which we should retry
|
||||
on a different node. defaults to ``(502, 503, 504)``
|
||||
:arg retry_on_timeout: should timeout trigger a retry on different
|
||||
@@ -134,6 +137,7 @@ class AsyncTransport(Transport):
|
||||
serializers=serializers,
|
||||
default_mimetype=default_mimetype,
|
||||
max_retries=max_retries,
|
||||
pool_maxsize=pool_maxsize,
|
||||
retry_on_status=retry_on_status,
|
||||
retry_on_timeout=retry_on_timeout,
|
||||
send_get_body_as=send_get_body_as,
|
||||
|
||||
@@ -142,6 +142,10 @@ class AsyncHttpConnection(AIOHttpConnection):
|
||||
self.loop = loop
|
||||
self.session = None
|
||||
|
||||
# Align with Sync Interface
|
||||
if "pool_maxsize" in kwargs:
|
||||
maxsize = kwargs.pop("pool_maxsize")
|
||||
|
||||
# Parameters for creating an aiohttp.ClientSession later.
|
||||
self._limit = maxsize
|
||||
self._http_auth = http_auth
|
||||
|
||||
@@ -0,0 +1,76 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
#
|
||||
# The OpenSearch Contributors require contributions made to
|
||||
# this file be licensed under the Apache-2.0 license or a
|
||||
# compatible open source license.
|
||||
#
|
||||
# Modifications Copyright OpenSearch Contributors. See
|
||||
# GitHub history for details.
|
||||
import os
|
||||
from typing import Type
|
||||
|
||||
import pytest
|
||||
from pytest import MarkDecorator
|
||||
|
||||
from opensearchpy import (
|
||||
AIOHttpConnection,
|
||||
AsyncConnection,
|
||||
AsyncHttpConnection,
|
||||
AsyncOpenSearch,
|
||||
)
|
||||
from opensearchpy._async.helpers.test import get_test_client
|
||||
|
||||
pytestmark: MarkDecorator = pytest.mark.asyncio
|
||||
|
||||
|
||||
class TestAIOHttp:
|
||||
|
||||
def test_default(self) -> None:
|
||||
client = AsyncOpenSearch()
|
||||
assert client.transport.connection_class == AIOHttpConnection
|
||||
assert client.transport.pool_maxsize is None
|
||||
|
||||
def test_connection_class(self) -> None:
|
||||
client = AsyncOpenSearch(connection_class=AsyncHttpConnection)
|
||||
assert client.transport.connection_class == AsyncHttpConnection
|
||||
assert client.transport.pool_maxsize is None
|
||||
|
||||
def test_pool_maxsize(self) -> None:
|
||||
client = AsyncOpenSearch(connection_class=AsyncHttpConnection, pool_maxsize=42)
|
||||
assert client.transport.connection_class == AsyncHttpConnection
|
||||
assert client.transport.pool_maxsize == 42
|
||||
|
||||
@pytest.mark.parametrize( # type: ignore[misc]
|
||||
"connection_class", [AIOHttpConnection, AsyncHttpConnection]
|
||||
)
|
||||
async def test_default_limit(self, connection_class: Type[AsyncConnection]) -> None:
|
||||
client = await get_test_client(
|
||||
connection_class=connection_class,
|
||||
verify_certs=False,
|
||||
http_auth=("admin", os.getenv("OPENSEARCH_PASSWORD", "admin")),
|
||||
)
|
||||
assert isinstance(
|
||||
client.transport.connection_pool.connections[0], connection_class
|
||||
)
|
||||
assert (
|
||||
client.transport.connection_pool.connections[0].session.connector.limit # type: ignore[attr-defined]
|
||||
== 10
|
||||
)
|
||||
|
||||
@pytest.mark.parametrize( # type: ignore[misc]
|
||||
"connection_class", [AIOHttpConnection, AsyncHttpConnection]
|
||||
)
|
||||
async def test_custom_limit(self, connection_class: Type[AsyncConnection]) -> None:
|
||||
client = await get_test_client(
|
||||
connection_class=connection_class,
|
||||
verify_certs=False,
|
||||
pool_maxsize=42,
|
||||
http_auth=("admin", os.getenv("OPENSEARCH_PASSWORD", "admin")),
|
||||
)
|
||||
assert isinstance(
|
||||
client.transport.connection_pool.connections[0], connection_class
|
||||
)
|
||||
assert (
|
||||
client.transport.connection_pool.connections[0].session.connector.limit # type: ignore[attr-defined]
|
||||
== 42
|
||||
)
|
||||
Reference in New Issue
Block a user