Introduced service time metrics to OpenSearch-Py client. (#716)

* Introduced service time metrics to opensearch-py client

Signed-off-by: saimedhi <saimedhi@amazon.com>

* Introduced service time metrics to opensearch-py client

Signed-off-by: saimedhi <saimedhi@amazon.com>

* Introduced service time metrics to opensearch-py client

Signed-off-by: saimedhi <saimedhi@amazon.com>

* Added service time metrics

Signed-off-by: saimedhi <saimedhi@amazon.com>

---------

Signed-off-by: saimedhi <saimedhi@amazon.com>
Signed-off-by: Sai Medhini Reddy Maryada <117196660+saimedhi@users.noreply.github.com>
This commit is contained in:
Sai Medhini Reddy Maryada
2024-04-15 04:20:46 -07:00
committed by GitHub
parent ba715b9d3f
commit 58e9b1d66c
13 changed files with 336 additions and 1 deletions
+1
View File
@@ -4,6 +4,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
## [Unreleased] ## [Unreleased]
### Added ### Added
- Added support for Python 3.12 ([#717](https://github.com/opensearch-project/opensearch-py/pull/717)) - Added support for Python 3.12 ([#717](https://github.com/opensearch-project/opensearch-py/pull/717))
- Added service time metrics ([#716](https://github.com/opensearch-project/opensearch-py/pull/716))
### Changed ### Changed
### Deprecated ### Deprecated
### Removed ### Removed
+1
View File
@@ -8,6 +8,7 @@ sphinx_rtd_theme
jinja2 jinja2
pytz pytz
deepmerge deepmerge
Events
setuptools setuptools
# No wheels for Python 3.10 yet! # No wheels for Python 3.10 yet!
+14
View File
@@ -0,0 +1,14 @@
# metrics
```{eval-rst}
.. autoclass:: opensearchpy.Metrics
```
```{eval-rst}
.. autoclass:: opensearchpy.MetricsEvents
```
```{eval-rst}
.. autoclass:: opensearchpy.MetricsNone
```
+4
View File
@@ -133,6 +133,7 @@ from .helpers.search import MultiSearch, Search
from .helpers.update_by_query import UpdateByQuery from .helpers.update_by_query import UpdateByQuery
from .helpers.utils import AttrDict, AttrList, DslBase from .helpers.utils import AttrDict, AttrList, DslBase
from .helpers.wrappers import Range from .helpers.wrappers import Range
from .metrics import Metrics, MetricsEvents, MetricsNone
from .serializer import JSONSerializer from .serializer import JSONSerializer
from .transport import Transport from .transport import Transport
@@ -240,6 +241,9 @@ __all__ = [
"token_filter", "token_filter",
"tokenizer", "tokenizer",
"__versionstr__", "__versionstr__",
"Metrics",
"MetricsEvents",
"MetricsNone",
] ]
try: try:
+10
View File
@@ -36,6 +36,8 @@ try:
except ImportError: except ImportError:
REQUESTS_AVAILABLE = False REQUESTS_AVAILABLE = False
from opensearchpy.metrics import Metrics, MetricsNone
from ..compat import reraise_exceptions, string_types, urlencode from ..compat import reraise_exceptions, string_types, urlencode
from ..exceptions import ( from ..exceptions import (
ConnectionError, ConnectionError,
@@ -69,6 +71,9 @@ class RequestsHttpConnection(Connection):
For tracing all requests made by this transport. For tracing all requests made by this transport.
:arg pool_maxsize: Maximum connection pool size used by pool-manager :arg pool_maxsize: Maximum connection pool size used by pool-manager
For custom connection-pooling on current session For custom connection-pooling on current session
:arg metrics: metrics is an instance of a subclass of the
:class:`~opensearchpy.Metrics` class, used for collecting
and reporting metrics related to the client's operations;
""" """
def __init__( def __init__(
@@ -86,8 +91,10 @@ class RequestsHttpConnection(Connection):
http_compress: Any = None, http_compress: Any = None,
opaque_id: Any = None, opaque_id: Any = None,
pool_maxsize: Any = None, pool_maxsize: Any = None,
metrics: Metrics = MetricsNone(),
**kwargs: Any **kwargs: Any
) -> None: ) -> None:
self.metrics = metrics
if not REQUESTS_AVAILABLE: if not REQUESTS_AVAILABLE:
raise ImproperlyConfigured( raise ImproperlyConfigured(
"Please install requests to use RequestsHttpConnection." "Please install requests to use RequestsHttpConnection."
@@ -188,6 +195,7 @@ class RequestsHttpConnection(Connection):
} }
send_kwargs.update(settings) send_kwargs.update(settings)
try: try:
self.metrics.request_start()
response = self.session.send(prepared_request, **send_kwargs) response = self.session.send(prepared_request, **send_kwargs)
duration = time.time() - start duration = time.time() - start
raw_data = response.content.decode("utf-8", "surrogatepass") raw_data = response.content.decode("utf-8", "surrogatepass")
@@ -207,6 +215,8 @@ class RequestsHttpConnection(Connection):
if isinstance(e, requests.Timeout): if isinstance(e, requests.Timeout):
raise ConnectionTimeout("TIMEOUT", str(e), e) raise ConnectionTimeout("TIMEOUT", str(e), e)
raise ConnectionError("N/A", str(e), e) raise ConnectionError("N/A", str(e), e)
finally:
self.metrics.request_end()
# raise warnings if any from the 'Warnings' header. # raise warnings if any from the 'Warnings' header.
warnings_headers = ( warnings_headers = (
+11
View File
@@ -34,6 +34,8 @@ from urllib3.exceptions import ReadTimeoutError
from urllib3.exceptions import SSLError as UrllibSSLError from urllib3.exceptions import SSLError as UrllibSSLError
from urllib3.util.retry import Retry from urllib3.util.retry import Retry
from opensearchpy.metrics import Metrics, MetricsNone
from ..compat import reraise_exceptions, urlencode from ..compat import reraise_exceptions, urlencode
from ..exceptions import ( from ..exceptions import (
ConnectionError, ConnectionError,
@@ -94,6 +96,9 @@ class Urllib3HttpConnection(Connection):
:arg http_compress: Use gzip compression :arg http_compress: Use gzip compression
:arg opaque_id: Send this value in the 'X-Opaque-Id' HTTP header :arg opaque_id: Send this value in the 'X-Opaque-Id' HTTP header
For tracing all requests made by this transport. For tracing all requests made by this transport.
:arg metrics: metrics is an instance of a subclass of the
:class:`~opensearchpy.Metrics` class, used for collecting
and reporting metrics related to the client's operations;
""" """
def __init__( def __init__(
@@ -115,8 +120,10 @@ class Urllib3HttpConnection(Connection):
ssl_context: Any = None, ssl_context: Any = None,
http_compress: Any = None, http_compress: Any = None,
opaque_id: Any = None, opaque_id: Any = None,
metrics: Metrics = MetricsNone(),
**kwargs: Any **kwargs: Any
) -> None: ) -> None:
self.metrics = metrics
# Initialize headers before calling super().__init__(). # Initialize headers before calling super().__init__().
self.headers = urllib3.make_headers(keep_alive=True) self.headers = urllib3.make_headers(keep_alive=True)
@@ -268,6 +275,8 @@ class Urllib3HttpConnection(Connection):
if isinstance(self.http_auth, Callable): # type: ignore if isinstance(self.http_auth, Callable): # type: ignore
request_headers.update(self.http_auth(method, full_url, body)) request_headers.update(self.http_auth(method, full_url, body))
self.metrics.request_start()
response = self.pool.urlopen( response = self.pool.urlopen(
method, url, body, retries=Retry(False), headers=request_headers, **kw method, url, body, retries=Retry(False), headers=request_headers, **kw
) )
@@ -284,6 +293,8 @@ class Urllib3HttpConnection(Connection):
if isinstance(e, ReadTimeoutError): if isinstance(e, ReadTimeoutError):
raise ConnectionTimeout("TIMEOUT", str(e), e) raise ConnectionTimeout("TIMEOUT", str(e), e)
raise ConnectionError("N/A", str(e), e) raise ConnectionError("N/A", str(e), e)
finally:
self.metrics.request_end()
# raise warnings if any from the 'Warnings' header. # raise warnings if any from the 'Warnings' header.
warning_headers = response.headers.get_all("warning", ()) warning_headers = response.headers.get_all("warning", ())
+18
View File
@@ -0,0 +1,18 @@
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.
from .metrics import Metrics
from .metrics_events import MetricsEvents
from .metrics_none import MetricsNone
__all__ = [
"Metrics",
"MetricsEvents",
"MetricsNone",
]
+42
View File
@@ -0,0 +1,42 @@
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.
from abc import ABC, abstractmethod
from typing import Optional
class Metrics(ABC):
"""
The Metrics class defines methods and properties for managing
request metrics, including start time, end time, and service time,
serving as a blueprint for concrete implementations.
"""
@abstractmethod
def request_start(self) -> None:
pass
@abstractmethod
def request_end(self) -> None:
pass
@property
@abstractmethod
def start_time(self) -> Optional[float]:
pass
@property
@abstractmethod
def end_time(self) -> Optional[float]:
pass
@property
@abstractmethod
def service_time(self) -> Optional[float]:
pass
+61
View File
@@ -0,0 +1,61 @@
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.
import time
from typing import Optional
from events import Events
from opensearchpy.metrics.metrics import Metrics
class MetricsEvents(Metrics):
"""
The MetricsEvents class implements the Metrics abstract base class
and tracks metrics such as start time, end time, and service time
during request processing.
"""
@property
def start_time(self) -> Optional[float]:
return self._start_time
@property
def end_time(self) -> Optional[float]:
return self._end_time
@property
def service_time(self) -> Optional[float]:
return self._service_time
def __init__(self) -> None:
self.events = Events()
self._start_time: Optional[float] = None
self._end_time: Optional[float] = None
self._service_time: Optional[float] = None
# Subscribe to the request_start and request_end events
self.events.request_start += self._on_request_start
self.events.request_end += self._on_request_end
def request_start(self) -> None:
self.events.request_start()
def _on_request_start(self) -> None:
self._start_time = time.perf_counter()
self._end_time = None
self._service_time = None
def request_end(self) -> None:
self.events.request_end()
def _on_request_end(self) -> None:
self._end_time = time.perf_counter()
if self._start_time is not None:
self._service_time = self._end_time - self._start_time
+47
View File
@@ -0,0 +1,47 @@
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.
from typing import Optional
from opensearchpy.metrics.metrics import Metrics
class MetricsNone(Metrics):
"""
Default metrics class. It sets the start time, end time, and service time to None.
"""
@property
def start_time(self) -> Optional[float]:
return self._start_time
@property
def end_time(self) -> Optional[float]:
return self._end_time
@property
def service_time(self) -> Optional[float]:
return self._service_time
def __init__(self) -> None:
self._start_time: Optional[float] = None
self._end_time: Optional[float] = None
self._service_time: Optional[float] = None
# request_start and request_end are placeholders,
# not implementing actual metrics collection in this subclass.
def request_start(self) -> None:
self._start_time = None
self._end_time = None
self._service_time = None
def request_end(self) -> None:
self._end_time = None
self._service_time = None
+9 -1
View File
@@ -29,6 +29,8 @@ import time
from itertools import chain from itertools import chain
from typing import Any, Callable, Collection, Dict, List, Mapping, Optional, Type, Union from typing import Any, Callable, Collection, Dict, List, Mapping, Optional, Type, Union
from opensearchpy.metrics import Metrics, MetricsNone
from .connection import Connection, Urllib3HttpConnection from .connection import Connection, Urllib3HttpConnection
from .connection_pool import ConnectionPool, DummyConnectionPool, EmptyConnectionPool from .connection_pool import ConnectionPool, DummyConnectionPool, EmptyConnectionPool
from .exceptions import ( from .exceptions import (
@@ -91,6 +93,7 @@ class Transport(object):
last_sniff: float last_sniff: float
sniff_timeout: Optional[float] sniff_timeout: Optional[float]
host_info_callback: Any host_info_callback: Any
metrics: Metrics
def __init__( def __init__(
self, self,
@@ -112,6 +115,7 @@ class Transport(object):
retry_on_status: Collection[int] = (502, 503, 504), retry_on_status: Collection[int] = (502, 503, 504),
retry_on_timeout: bool = False, retry_on_timeout: bool = False,
send_get_body_as: str = "GET", send_get_body_as: str = "GET",
metrics: Metrics = MetricsNone(),
**kwargs: Any **kwargs: Any
) -> None: ) -> None:
""" """
@@ -148,11 +152,15 @@ class Transport(object):
will be serialized and passed as a query parameter `source`. will be serialized and passed as a query parameter `source`.
:arg pool_maxsize: Maximum connection pool size used by pool-manager :arg pool_maxsize: Maximum connection pool size used by pool-manager
For custom connection-pooling on current session For custom connection-pooling on current session
:arg metrics: metrics is an instance of a subclass of the
:class:`~opensearchpy.Metrics` class, used for collecting
and reporting metrics related to the client's operations;
Any extra keyword arguments will be passed to the `connection_class` Any extra keyword arguments will be passed to the `connection_class`
when creating and instance unless overridden by that connection's when creating and instance unless overridden by that connection's
options provided as part of the hosts parameter. options provided as part of the hosts parameter.
""" """
self.metrics = metrics
if connection_class is None: if connection_class is None:
connection_class = self.DEFAULT_CONNECTION_CLASS connection_class = self.DEFAULT_CONNECTION_CLASS
@@ -242,7 +250,7 @@ class Transport(object):
kwargs.update(host) kwargs.update(host)
if self.pool_maxsize and isinstance(self.pool_maxsize, int): if self.pool_maxsize and isinstance(self.pool_maxsize, int):
kwargs["pool_maxsize"] = self.pool_maxsize kwargs["pool_maxsize"] = self.pool_maxsize
return self.connection_class(**kwargs) return self.connection_class(metrics=self.metrics, **kwargs)
connections = list(zip(map(_create_connection, hosts), hosts)) connections = list(zip(map(_create_connection, hosts), hosts))
if len(connections) == 1: if len(connections) == 1:
+1
View File
@@ -59,6 +59,7 @@ install_requires = [
"six", "six",
"python-dateutil", "python-dateutil",
"certifi>=2022.12.07", "certifi>=2022.12.07",
"Events",
] ]
tests_require = [ tests_require = [
"requests>=2.0.0, <3.0.0", "requests>=2.0.0, <3.0.0",
@@ -0,0 +1,117 @@
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.
from __future__ import unicode_literals
import time
import pytest
from opensearchpy import RequestsHttpConnection
from opensearchpy.metrics.metrics_events import MetricsEvents
from opensearchpy.metrics.metrics_none import MetricsNone
from . import OpenSearchTestCase, get_client
class TestMetrics(OpenSearchTestCase):
def tearDown(self) -> None:
client = get_client()
client.indices.delete(index=["test-index"], ignore_unavailable=True)
def test_metrics_default_behavior(self) -> None:
# Test default behavior when metrics is not passed to the client
client = get_client()
index_name = "test-index"
index_body = {"settings": {"index": {"number_of_shards": 4}}}
try:
client.indices.create(index=index_name, body=index_body)
except Exception as e:
assert False, f"Error creating index: {e}"
def test_metrics_raises_error_when_value_is_none(self) -> None:
# Test behavior when metrics is given None.
metrics = None
with pytest.raises(AttributeError):
get_client(metrics=metrics)
def test_metrics_none_behavior(self) -> None:
# Test behavior when metrics is an instance of MetricsNone
metrics = MetricsNone()
client = get_client(metrics=metrics)
index_name = "test-index"
index_body = {"settings": {"index": {"number_of_shards": 4}}}
client.indices.create(index=index_name, body=index_body)
assert metrics.service_time is None
class TestMetricsEvents(OpenSearchTestCase):
def tearDown(self) -> None:
client = get_client()
client.indices.delete(index=["test-index"], ignore_unavailable=True)
def test_metrics_events_with_urllib3_connection(self) -> None:
# Test MetricsEvents behavior with urllib3 connection
metrics = MetricsEvents()
client = get_client(metrics=metrics)
# Calculate service time for create index operation
index_name = "test-index"
index_body = {"settings": {"index": {"number_of_shards": 4}}}
start1 = time.perf_counter()
client.indices.create(index=index_name, body=index_body)
duration1 = time.perf_counter() - start1
create_index_service_time = metrics.service_time
assert (
isinstance(create_index_service_time, float)
and create_index_service_time < duration1
)
# Calculate service time for adding document operation
document = {"title": "Moneyball", "director": "Bennett Miller", "year": "2011"}
id = "1"
start2 = time.perf_counter()
client.index(index=index_name, body=document, id=id, refresh=True)
duration2 = time.perf_counter() - start2
assert (
isinstance(metrics.service_time, float)
and metrics.service_time < duration2
and metrics.service_time != create_index_service_time
# Above check is to confirm service time differs from the previous API call.
)
def test_metrics_events_with_requests_http_connection(self) -> None:
# Test MetricsEvents behavior with requests HTTP connection
metrics = MetricsEvents()
client = get_client(metrics=metrics, connection_class=RequestsHttpConnection)
# Calculate service time for create index operation
index_name = "test-index"
index_body = {"settings": {"index": {"number_of_shards": 4}}}
start1 = time.perf_counter()
client.indices.create(index_name, body=index_body)
duration1 = time.perf_counter() - start1
create_index_service_time = metrics.service_time
assert (
isinstance(create_index_service_time, float)
and create_index_service_time < duration1
)
# Calculate service time for adding document operation
document = {"title": "Moneyball", "director": "Bennett Miller", "year": "2011"}
id = "1"
start2 = time.perf_counter()
client.index(index=index_name, body=document, id=id, refresh=True)
duration2 = time.perf_counter() - start2
assert (
isinstance(metrics.service_time, float)
and metrics.service_time < duration2
and metrics.service_time != create_index_service_time
# Above check is to confirm service time differs from the previous API call.
)